Browse Source

feat: raise SessionClosedException when session close by user

wangweimin 5 năm trước cách đây
mục cha
commit
c6da57e1a3

+ 3 - 2
pywebio/platform/flask.py

@@ -135,8 +135,9 @@ def _webio_view(target, session_cls, session_expire_seconds, session_cleanup_int
         webio_session = _webio_sessions[webio_session_id]
 
     if request.method == 'POST':  # client push event
-        webio_session.send_client_event(request.json)
-        time.sleep(WAIT_MS_ON_POST / 1000.0)
+        if request.json is not None:
+            webio_session.send_client_event(request.json)
+            time.sleep(WAIT_MS_ON_POST / 1000.0)
     elif request.method == 'GET':  # client pull messages
         pass
 

+ 2 - 1
pywebio/platform/tornado.py

@@ -81,7 +81,8 @@ def _webio_handler(target, session_cls, check_origin_func=_is_same_site):
 
         def on_message(self, message):
             data = json.loads(message)
-            self.session.send_client_event(data)
+            if data is not None:
+                self.session.send_client_event(data)
 
         def close_from_session(self):
             self._close_from_session_tag = True

+ 1 - 0
pywebio/session/base.py

@@ -56,6 +56,7 @@ class AbstractSession:
         raise NotImplementedError
 
     def next_client_event(self) -> dict:
+        """获取来自客户端的下一个事件。阻塞调用,若在等待过程中,会话被用户关闭,则抛出SessionClosedException异常"""
         raise NotImplementedError
 
     def send_client_event(self, event):

+ 14 - 2
pywebio/session/coroutinebased.py

@@ -6,7 +6,7 @@ import traceback
 from contextlib import contextmanager
 
 from .base import AbstractSession
-from ..exceptions import SessionNotFoundException, SessionClosedException
+from ..exceptions import SessionNotFoundException, SessionClosedException, SessionException
 from ..utils import random_str, isgeneratorfunction, iscoroutinefunction, catch_exp_call
 
 logger = logging.getLogger(__name__)
@@ -47,6 +47,10 @@ class CoroutineBasedSession(AbstractSession):
         if _context.current_session is None or \
                 _context.current_session.session_thread_id != threading.current_thread().ident:
             raise SessionNotFoundException("No session found in current context!")
+
+        if _context.current_session.closed():
+            raise SessionClosedException
+
         return _context.current_session
 
     @staticmethod
@@ -117,7 +121,13 @@ class CoroutineBasedSession(AbstractSession):
         self._on_task_command(self)
 
     async def next_client_event(self):
+        # 函数开始不需要判断 self.closed()
+        # 如果会话关闭,对 get_current_session().next_client_event() 的调用会抛出SessionClosedException
+
         res = await WebIOFuture()
+        if res is None:
+            raise SessionClosedException
+
         return res
 
     def send_client_event(self, event):
@@ -140,6 +150,7 @@ class CoroutineBasedSession(AbstractSession):
 
     def _cleanup(self):
         for t in list(self.coros.values()):  # t.close() may cause self.coros changed size
+            t.step(None)  # 接收端接收到None消息会抛出SessionClosedException异常
             t.close()
         self.coros = {}  # delete session tasks
         CoroutineBasedSession._active_session_cnt -= 1
@@ -304,7 +315,8 @@ class Task:
                 logger.debug('Task[%s] finished', self.coro_id)
                 self.on_coro_stop(self)
             except Exception as e:
-                self.session.on_task_exception()
+                if not isinstance(e, SessionException):
+                    self.session.on_task_exception()
                 self.task_closed = True
                 self.on_coro_stop(self)
 

+ 19 - 5
pywebio/session/threadbased.py

@@ -6,7 +6,7 @@ import traceback
 from functools import wraps
 
 from .base import AbstractSession
-from ..exceptions import SessionNotFoundException, SessionClosedException
+from ..exceptions import SessionNotFoundException, SessionClosedException, SessionException
 from ..utils import random_str, LimitedSizeQueue, isgeneratorfunction, iscoroutinefunction, catch_exp_call
 
 logger = logging.getLogger(__name__)
@@ -95,12 +95,16 @@ class ThreadBasedSession(AbstractSession):
             try:
                 target()
             except Exception as e:
-                self.on_task_exception()
+                if not isinstance(e, SessionException):
+                    self.on_task_exception()
             finally:
                 for t in self.threads:
                     if t.is_alive() and t is not threading.current_thread():
                         t.join()
-                self.send_task_command(dict(command='close_session'))
+                try:
+                    self.send_task_command(dict(command='close_session'))
+                except SessionClosedException:
+                    pass
                 self._trigger_close_event()
                 self.close()
 
@@ -126,9 +130,15 @@ class ThreadBasedSession(AbstractSession):
             self._on_task_command(self)
 
     def next_client_event(self):
+        # 函数开始不需要判断 self.closed()
+        # 如果会话关闭,对 get_current_session().next_client_event() 的调用会抛出SessionNotFoundException
+
         task_id = self.get_current_task_id()
         event_mq = self.get_current_session().task_mqs.get(task_id)
-        return event_mq.get()
+        event = event_mq.get()
+        if event is None:
+            raise SessionClosedException
+        return event
 
     def send_client_event(self, event):
         """向会话发送来自用户浏览器的事件️
@@ -157,7 +167,6 @@ class ThreadBasedSession(AbstractSession):
             self._on_session_close()
 
     def _cleanup(self):
-        self.task_mqs = {}
 
         self.unhandled_task_msgs.wait_empty(8)
         if not self.unhandled_task_msgs.empty():
@@ -170,6 +179,11 @@ class ThreadBasedSession(AbstractSession):
         if self.callback_mq is not None:  # 回调功能已经激活
             self.callback_mq.put(None)  # 结束回调线程
 
+        for mq in self.task_mqs.values():
+            mq.put(None)  # 消费端接收到None消息会抛出SessionClosedException异常
+
+        self.task_mqs = {}
+
         ThreadBasedSession._active_session_cnt -= 1
 
     def close(self):