瀏覽代碼

maint: refine ScriptModeSession

wangweimin 5 年之前
父節點
當前提交
55d4790e35
共有 2 個文件被更改,包括 21 次插入37 次删除
  1. 8 6
      pywebio/session/coroutinebased.py
  2. 13 31
      pywebio/session/threadbased.py

+ 8 - 6
pywebio/session/coroutinebased.py

@@ -75,7 +75,8 @@ class CoroutineBasedSession(AbstractSession):
         assert iscoroutinefunction(target) or isgeneratorfunction(target), ValueError(
             "CoroutineBasedSession accept coroutine function or generator function as task function")
 
-        CoroutineBasedSession._active_session_cnt += 1
+        cls = type(self)
+        cls._active_session_cnt += 1
 
         self.info = session_info
         self._on_task_command = on_task_command or (lambda _: None)
@@ -89,8 +90,8 @@ class CoroutineBasedSession(AbstractSession):
 
         # 在创建第一个CoroutineBasedSession时 event_loop_thread_id 还未被初始化
         # 则当前线程即为运行 event loop 的线程
-        if CoroutineBasedSession.event_loop_thread_id is None:
-            CoroutineBasedSession.event_loop_thread_id = threading.current_thread().ident
+        if cls.event_loop_thread_id is None:
+            cls.event_loop_thread_id = threading.current_thread().ident
 
         # 会话内的协程任务
         self.coros = {}  # coro_task_id -> Task()
@@ -157,7 +158,7 @@ class CoroutineBasedSession(AbstractSession):
             t.step(SessionClosedException, throw_exp=True)
             t.close()
         self.coros = {}  # delete session tasks
-        CoroutineBasedSession._active_session_cnt -= 1
+        type(self)._active_session_cnt -= 1
 
     def close(self):
         """关闭当前Session。由Backend调用"""
@@ -222,9 +223,10 @@ class CoroutineBasedSession(AbstractSession):
                     else:
                         self.run_async(coro)
 
-        callback_task = Task(callback_coro(), CoroutineBasedSession.get_current_session())
+        cls = type(self)
+        callback_task = Task(callback_coro(), cls.get_current_session())
         callback_task.coro.send(None)  # 激活,Non't callback.step() ,导致嵌套调用step  todo 与inactive_coro_instances整合
-        CoroutineBasedSession.get_current_session().coros[callback_task.coro_id] = callback_task
+        cls.get_current_session().coros[callback_task.coro_id] = callback_task
 
         return callback_task.coro_id
 

+ 13 - 31
pywebio/session/threadbased.py

@@ -15,10 +15,9 @@ logger = logging.getLogger(__name__)
 """
 基于线程的会话实现
 
-主任务线程退出后,连接关闭,但不会清理主任务线程产生的其他线程
-
-客户端连接关闭后,后端线程不会退出,但是再次调用输入输出函数会引发异常
-todo: thread 重名
+主任务线程退出后,连接关闭。
+正在等待PyWebIO输入的线程会在输入函数中抛出SessionClosedException异常,
+其他线程若调用PyWebIO输入输出函数会引发异常SessionException
 """
 
 
@@ -41,6 +40,7 @@ class ThreadBasedSession(AbstractSession):
         curr = id(threading.current_thread())
         session = cls.thread2session.get(curr)
         if session is None:
+            logger.debug("SessionNotFoundException in %s", threading.current_thread())
             raise SessionNotFoundException(
                 "Can't find current session. Maybe session closed. Did you forget to use `register_thread` ?")
         return session
@@ -57,17 +57,17 @@ class ThreadBasedSession(AbstractSession):
 
     def __init__(self, target, session_info, on_task_command=None, on_session_close=None, loop=None):
         """
-        :param target: 会话运行的函数
+        :param target: 会话运行的函数. 为None时表示Script mode
         :param on_task_command: 当Task内发送Command给session的时候触发的处理函数
         :param on_session_close: 会话结束的处理函数
         :param loop: 事件循环。若 on_task_command 或者 on_session_close 中有调用使用asyncio事件循环的调用,
             则需要事件循环实例来将回调在事件循环的线程中执行
         """
-        assert (not iscoroutinefunction(target)) and (not isgeneratorfunction(target)), ValueError(
+        assert target is None or (not iscoroutinefunction(target)) and (not isgeneratorfunction(target)), ValueError(
             "ThreadBasedSession only accept a simple function as task function, "
             "not coroutine function or generator function. ")
 
-        ThreadBasedSession._active_session_cnt += 1
+        type(self)._active_session_cnt += 1
 
         self.info = session_info
         self._on_task_command = on_task_command or (lambda _: None)
@@ -88,7 +88,8 @@ class ThreadBasedSession(AbstractSession):
         self.callback_thread = None
         self.callbacks = {}  # callback_id -> (callback_func, is_mutex)
 
-        self._start_main_task(target)
+        if target is not None:
+            self._start_main_task(target)
 
     def _start_main_task(self, target):
 
@@ -169,6 +170,7 @@ class ThreadBasedSession(AbstractSession):
             self._on_session_close()
 
     def _cleanup(self):
+        cls = type(self)
 
         self.unhandled_task_msgs.wait_empty(8)
         if not self.unhandled_task_msgs.empty():
@@ -176,7 +178,7 @@ class ThreadBasedSession(AbstractSession):
             raise RuntimeError('There are unhandled task messages when session close!')
 
         for t in self.threads:
-            del ThreadBasedSession.thread2session[id(t)]
+            del cls.thread2session[id(t)]
 
         if self.callback_mq is not None:  # 回调功能已经激活
             self.callback_mq.put(None)  # 结束回调线程
@@ -186,7 +188,7 @@ class ThreadBasedSession(AbstractSession):
 
         self.task_mqs = {}
 
-        ThreadBasedSession._active_session_cnt -= 1
+        cls._active_session_cnt -= 1
 
     def close(self):
         """关闭当前Session。由Backend调用"""
@@ -320,7 +322,6 @@ class ScriptModeSession(ThreadBasedSession):
 
     def __init__(self, thread, session_info, on_task_command=None, loop=None):
         """
-
         :param thread: 第一次调用PyWebIO交互函数的线程 todo 貌似本参数并不必要
         :param on_task_command: 会话结束的处理函数。后端Backend在相应on_session_close时关闭连接时,
             需要保证会话内的所有消息都传送到了客户端
@@ -331,26 +332,7 @@ class ScriptModeSession(ThreadBasedSession):
             raise RuntimeError("ScriptModeSession can only be created once.")
         ScriptModeSession.instance = self
 
-        ThreadBasedSession._active_session_cnt += 1
-
-        self.info = session_info
-        self._on_task_command = on_task_command or (lambda _: None)
-        self._on_session_close = lambda: None
-        self._loop = loop
-
-        # 会话结束时运行的函数
-        self.deferred_functions = []
-
-        self.threads = []  # 当前会话的线程
-        self.unhandled_task_msgs = LimitedSizeQueue(maxsize=self.unhandled_task_mq_maxsize)
-
-        self.task_mqs = {}  # task_id -> event msg queue
-        self._closed = False
-
-        # 用于实现回调函数的注册
-        self.callback_mq = None
-        self.callback_thread = None
-        self.callbacks = {}  # callback_id -> (callback_func, is_mutex)
+        super().__init__(target=None, session_info=session_info, on_task_command=on_task_command, loop=loop)
 
         tid = id(thread)
         event_mq = queue.Queue(maxsize=self.event_mq_maxsize)