瀏覽代碼

feat: limit threadbased session's task_msg_queue maxsize

wangweimin 5 年之前
父節點
當前提交
b0327904fb
共有 2 個文件被更改,包括 39 次插入11 次删除
  1. 7 11
      pywebio/session/threadbased.py
  2. 32 0
      pywebio/utils.py

+ 7 - 11
pywebio/session/threadbased.py

@@ -8,7 +8,7 @@ import traceback
 
 from .base import AbstractSession
 from ..exceptions import SessionNotFoundException, SessionClosedException
-from ..utils import random_str
+from ..utils import random_str, LimitedSizeQueue
 
 logger = logging.getLogger(__name__)
 
@@ -26,6 +26,7 @@ todo: thread 重名
 class ThreadBasedSession(AbstractSession):
     thread2session = {}  # thread_id -> session
 
+    unhandled_task_mq_maxsize = 1000
     event_mq_maxsize = 100
     callback_mq_maxsize = 100
 
@@ -71,9 +72,8 @@ class ThreadBasedSession(AbstractSession):
         self._on_session_close = on_session_close or (lambda: None)
         self._loop = loop
 
-        self._server_msg_lock = threading.Lock()
         self.threads = []  # 注册到当前会话的线程集合
-        self.unhandled_task_msgs = []
+        self.unhandled_task_msgs = LimitedSizeQueue(maxsize=self.unhandled_task_mq_maxsize)
 
         self.task_mqs = {}  # task_id -> event msg queue
         self._closed = False
@@ -111,8 +111,7 @@ class ThreadBasedSession(AbstractSession):
 
         :param dict command: 消息
         """
-        with self._server_msg_lock:
-            self.unhandled_task_msgs.append(command)
+        self.unhandled_task_msgs.put(command)
 
         if self._loop:
             self._loop.call_soon_threadsafe(self._on_task_command, self)
@@ -141,10 +140,7 @@ class ThreadBasedSession(AbstractSession):
         mq.put(event)
 
     def get_task_commands(self):
-        with self._server_msg_lock:
-            msgs = self.unhandled_task_msgs
-            self.unhandled_task_msgs = []
-        return msgs
+        return self.unhandled_task_msgs.get()
 
     def _trigger_close_event(self):
         """触发Backend on_session_close callback"""
@@ -156,8 +152,8 @@ class ThreadBasedSession(AbstractSession):
     def _cleanup(self):
         self.task_mqs = {}
 
-        # Don't clean unhandled_task_msgs, it may not send to client
-        # self.unhandled_task_msgs = []
+        if not self.unhandled_task_msgs.empty():
+            raise RuntimeError('There are unhandled task msgs when session close!')
 
         for t in self.threads:
             del ThreadBasedSession.thread2session[id(t)]

+ 32 - 0
pywebio/utils.py

@@ -5,6 +5,7 @@ import string
 import time
 from collections import OrderedDict
 from contextlib import closing
+import queue
 
 from os.path import abspath, dirname
 
@@ -13,6 +14,37 @@ project_dir = dirname(abspath(__file__))
 STATIC_PATH = '%s/html' % project_dir
 
 
+class LimitedSizeQueue(queue.Queue):
+    """
+    有限大小的队列
+
+    `get()` 返回全部数据
+    队列满时,再 `put()` 会阻塞
+    """
+    def get(self):
+        """获取队列全部数据"""
+        try:
+            return super().get(block=False)
+        except queue.Empty:
+            return []
+
+    def _init(self, maxsize):
+        self.queue = []
+
+    def _qsize(self):
+        return len(self.queue)
+
+    # Put a new item in the queue
+    def _put(self, item):
+        self.queue.append(item)
+
+    # Get an item from the queue
+    def _get(self):
+        all_data = self.queue
+        self.queue = []
+        return all_data
+
+
 async def wait_host_port(host, port, duration=10, delay=2):
     """Repeatedly try if a port on a host is open until duration seconds passed