Browse Source

feat: `session.defer_call(func)` make func invoked when session close

wangweimin 5 years ago
parent
commit
330f76dc57

+ 1 - 1
pywebio/__init__.py

@@ -2,7 +2,7 @@ from .platform import start_server
 from . import input
 from . import output
 from .session import (
-    run_async, run_asyncio_coroutine, register_thread,
+    run_async, run_asyncio_coroutine, register_thread, defer_call
 )
 from .exceptions import SessionException, SessionClosedException, SessionNotFoundException
 from .utils import STATIC_PATH

+ 40 - 18
pywebio/platform/flask.py

@@ -21,6 +21,7 @@ Flask backend
 """
 import asyncio
 import fnmatch
+import logging
 import threading
 import time
 from functools import partial
@@ -33,14 +34,16 @@ from ..session import CoroutineBasedSession, get_session_implement, AbstractSess
 from ..utils import STATIC_PATH
 from ..utils import random_str, LRUDict
 
+logger = logging.getLogger(__name__)
+
 # todo: use lock to avoid thread race condition
 
 # type: Dict[str, AbstractSession]
 _webio_sessions = {}  # WebIOSessionID -> WebIOSession()
-_webio_expire = LRUDict()  # WebIOSessionID -> last active timestamp
+_webio_expire = LRUDict()  # WebIOSessionID -> last active timestamp。按照最后活跃时间递增排列
 
 DEFAULT_SESSION_EXPIRE_SECONDS = 60  # 超过60s会话不活跃则视为会话过期
-REMOVE_EXPIRED_SESSIONS_INTERVAL = 20  # 清理过期会话间隔(秒)
+SESSIONS_CLEANUP_INTERVAL = 20  # 清理过期会话间隔(秒)
 WAIT_MS_ON_POST = 100  # 在处理完POST请求时,等待WAIT_MS_ON_POST毫秒再读取返回数据。Task的command可以立即返回
 
 _event_loop = None
@@ -51,21 +54,31 @@ def _make_response(webio_session: AbstractSession):
 
 
 def _remove_expired_sessions(session_expire_seconds):
+    logger.debug("removing expired sessions")
+    """清除当前会话列表中的过期会话"""
     while _webio_expire:
         sid, active_ts = _webio_expire.popitem(last=False)
+
         if time.time() - active_ts < session_expire_seconds:
+            # 当前session未过期
             _webio_expire[sid] = active_ts
             _webio_expire.move_to_end(sid, last=False)
             break
-        del _webio_sessions[sid]
+
+        # 清理session
+        logger.debug("session %s expired" % sid)
+        session = _webio_sessions.get(sid)
+        if session:
+            session.close()
+            del _webio_sessions[sid]
 
 
 _last_check_session_expire_ts = 0  # 上次检查session有效期的时间戳
 
 
 def _remove_webio_session(sid):
-    del _webio_sessions[sid]
-    del _webio_expire[sid]
+    _webio_sessions.pop(sid, None)
+    _webio_expire.pop(sid, None)
 
 
 def cors_headers(origin, check_origin, headers=None):
@@ -82,10 +95,13 @@ def cors_headers(origin, check_origin, headers=None):
     return headers
 
 
-def _webio_view(target, session_cls, session_expire_seconds, check_origin):
+def _webio_view(target, session_cls, session_expire_seconds, session_cleanup_interval, check_origin):
     """
-    :param target:
-    :param session_expire_seconds:
+    :param target: 任务函数
+    :param session_cls: 会话实现类
+    :param session_expire_seconds: 会话不活跃过期时间。
+    :param session_cleanup_interval: 会话清理间隔。
+    :param callable check_origin: callback(origin) -> bool
     :return:
     """
     global _last_check_session_expire_ts, _event_loop
@@ -126,9 +142,9 @@ def _webio_view(target, session_cls, session_expire_seconds, check_origin):
 
     _webio_expire[webio_session_id] = time.time()
     # clean up at intervals
-    if time.time() - _last_check_session_expire_ts > REMOVE_EXPIRED_SESSIONS_INTERVAL:
-        _remove_expired_sessions(session_expire_seconds)
+    if time.time() - _last_check_session_expire_ts > session_cleanup_interval:
         _last_check_session_expire_ts = time.time()
+        _remove_expired_sessions(session_expire_seconds)
 
     response = _make_response(webio_session)
 
@@ -142,10 +158,15 @@ def _webio_view(target, session_cls, session_expire_seconds, check_origin):
     return response
 
 
-def webio_view(target, session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS, allowed_origins=None, check_origin=None):
+def webio_view(target,
+               session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS,
+               session_cleanup_interval=SESSIONS_CLEANUP_INTERVAL,
+               allowed_origins=None, check_origin=None):
     """获取用于与Flask进行整合的view函数
 
     :param target: 任务函数。任务函数为协程函数时,使用 :ref:`基于协程的会话实现 <coroutine_based_session>` ;任务函数为普通函数时,使用基于线程的会话实现。
+    :param int session_expire_seconds: 会话不活跃过期时间。
+    :param int session_cleanup_interval: 会话清理间隔。
     :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
         来源包含协议和域名和端口部分,允许使用 Unix shell 风格的匹配模式:
 
@@ -155,9 +176,6 @@ def webio_view(target, session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS, al
         - ``[!seq]`` 匹配不在seq内的字符
 
         比如 ``https://*.example.com`` 、 ``*://*.example.com``
-    :param session_expire_seconds: 会话不活跃过期时间。
-    :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
-        来源包含协议和域名和端口部分,允许使用 ``*`` 作为通配符。 比如 ``https://*.example.com`` 、 ``*://*.example.com`` 、
     :param callable check_origin: 请求来源检查函数。接收请求来源(包含协议和域名和端口部分)字符串,
         返回 ``True/False`` 。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略
     :return: Flask视图函数
@@ -173,6 +191,7 @@ def webio_view(target, session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS, al
 
     view_func = partial(_webio_view, target=target, session_cls=session_cls,
                         session_expire_seconds=session_expire_seconds,
+                        session_cleanup_interval=session_cleanup_interval,
                         check_origin=check_origin)
     view_func.__name__ = 'webio_view'
     return view_func
@@ -196,6 +215,7 @@ def run_event_loop(debug=False):
 def start_server(target, port=8080, host='localhost',
                  allowed_origins=None, check_origin=None,
                  disable_asyncio=False,
+                 session_cleanup_interval=SESSIONS_CLEANUP_INTERVAL,
                  session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS,
                  debug=False, **flask_options):
     """启动一个 Flask server 来运行PyWebIO的 ``target`` 服务
@@ -215,18 +235,20 @@ def start_server(target, port=8080, host='localhost',
         比如 ``https://*.example.com`` 、 ``*://*.example.com``
     :param callable check_origin: 请求来源检查函数。接收请求来源(包含协议和域名和端口部分)字符串,
         返回 ``True/False`` 。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略
-    :param disable_asyncio: 禁用 asyncio 函数。仅在当 ``session_type=COROUTINE_BASED`` 时有效。
+    :param bool disable_asyncio: 禁用 asyncio 函数。仅在当 ``session_type=COROUTINE_BASED`` 时有效。
         在Flask backend中使用asyncio需要单独开启一个线程来运行事件循环,
         若程序中没有使用到asyncio中的异步函数,可以开启此选项来避免不必要的资源浪费
-    :param session_expire_seconds: 会话过期时间。若 session_expire_seconds 秒内没有收到客户端的请求,则认为会话过期。
-    :param debug: Flask debug mode
+    :param int session_expire_seconds: 会话过期时间。若 session_expire_seconds 秒内没有收到客户端的请求,则认为会话过期。
+    :param int session_cleanup_interval: 会话清理间隔。
+    :param bool debug: Flask debug mode
     :param flask_options: Additional keyword arguments passed to the constructor of ``flask.Flask.run``.
         ref: https://flask.palletsprojects.com/en/1.1.x/api/?highlight=flask%20run#flask.Flask.run
     """
 
     app = Flask(__name__)
     app.route('/io', methods=['GET', 'POST', 'OPTIONS'])(
-        webio_view(target, session_expire_seconds,
+        webio_view(target, session_expire_seconds=session_expire_seconds,
+                   session_cleanup_interval=session_cleanup_interval,
                    allowed_origins=allowed_origins,
                    check_origin=check_origin)
     )

+ 4 - 0
pywebio/platform/tornado.py

@@ -234,6 +234,10 @@ def start_server_in_current_thread_session():
             )
             await asyncio.sleep(1)
 
+        # 关闭ScriptModeSession。
+        # 主动关闭ioloop时,SingleSessionWSHandler.on_close 并不会被调用,需要手动关闭session
+        SingleSessionWSHandler.session.close()
+
         # Current thread is only one none-daemonic-thread, so exit
         logger.debug('Closing tornado ioloop...')
         tornado.ioloop.IOLoop.current().stop()

+ 10 - 0
pywebio/session/__init__.py

@@ -116,3 +116,13 @@ def register_thread(thread: threading.Thread):
     :param threading.Thread thread: 线程对象
     """
     return get_current_session().register_thread(thread)
+
+
+def defer_call(func):
+    """设置会话结束时调用的函数。无论是用户主动关闭会话还是任务结束会话关闭,设置的函数都会被运行。
+    可以用于资源清理等工作。
+    在会话中可以多次调用 `defer_call()` ,会话结束后将会顺序执行设置的函数。
+
+    :param func: 话结束时调用的函数
+    """
+    return get_current_session().defer_call(func)

+ 9 - 0
pywebio/session/base.py

@@ -11,6 +11,7 @@ class AbstractSession:
         on_task_exception
         register_callback
 
+        defer_call
 
     由Backend调用:
         send_client_event
@@ -79,3 +80,11 @@ class AbstractSession:
         ``callback`` 回调函数被执行, 并传入事件消息中的 ``data`` 字段值作为参数
         """
         raise NotImplementedError
+
+    def defer_call(self, func):
+        """设置会话结束时调用的函数。可以用于资源清理。
+        在会话中可以多次调用 `defer_call()` ,会话结束后将会顺序执行设置的函数。
+
+        :param func: 话结束时调用的函数
+        """
+        raise NotImplementedError

+ 12 - 3
pywebio/session/coroutinebased.py

@@ -7,7 +7,7 @@ from contextlib import contextmanager
 
 from .base import AbstractSession
 from ..exceptions import SessionNotFoundException, SessionClosedException
-from ..utils import random_str, isgeneratorfunction, iscoroutinefunction
+from ..utils import random_str, isgeneratorfunction, iscoroutinefunction, catch_exp_call
 
 logger = logging.getLogger(__name__)
 
@@ -69,6 +69,9 @@ class CoroutineBasedSession(AbstractSession):
         self._on_task_command = on_task_command or (lambda _: None)
         self._on_session_close = on_session_close or (lambda: None)
 
+        # 会话结束时运行的函数
+        self.deferred_functions = []
+
         # 当前会话未被Backend处理的消息
         self.unhandled_task_msgs = []
 
@@ -147,7 +150,9 @@ class CoroutineBasedSession(AbstractSession):
             return
         self._closed = True
         self._cleanup()
-        # todo clean
+        while self.deferred_functions:
+            func = self.deferred_functions.pop()
+            catch_exp_call(func, logger)
 
     def closed(self):
         return self._closed
@@ -220,6 +225,10 @@ class CoroutineBasedSession(AbstractSession):
         res = await WebIOFuture(coro=coro_obj)
         return res
 
+    def defer_call(self, func):
+        """设置会话结束时调用的函数。可以用于资源清理。"""
+        self.deferred_functions.append(func)
+
 
 class TaskHandle:
     """协程任务句柄"""
@@ -329,7 +338,7 @@ class Task:
 
     def __del__(self):
         if not self.task_closed:
-            logger.warning('Task[%s] not finished when destroy', self.coro_id)
+            logger.warning('Task[%s] was destroyed but it is pending!', self.coro_id)
 
     def task_handle(self):
         handle = TaskHandle(close=self.close, closed=lambda: self.task_closed)

+ 17 - 1
pywebio/session/threadbased.py

@@ -7,7 +7,7 @@ from functools import wraps
 
 from .base import AbstractSession
 from ..exceptions import SessionNotFoundException, SessionClosedException
-from ..utils import random_str, LimitedSizeQueue, isgeneratorfunction, iscoroutinefunction
+from ..utils import random_str, LimitedSizeQueue, isgeneratorfunction, iscoroutinefunction, catch_exp_call
 
 logger = logging.getLogger(__name__)
 
@@ -72,6 +72,9 @@ class ThreadBasedSession(AbstractSession):
         self._on_session_close = on_session_close or (lambda: None)
         self._loop = loop
 
+        # 会话结束时运行的函数
+        self.deferred_functions = []
+
         self.threads = []  # 注册到当前会话的线程集合
         self.unhandled_task_msgs = LimitedSizeQueue(maxsize=self.unhandled_task_mq_maxsize)
 
@@ -171,11 +174,17 @@ class ThreadBasedSession(AbstractSession):
 
     def close(self):
         """关闭当前Session。由Backend调用"""
+        # todo self._closed 会有竞争条件
         if self._closed:
             return
         self._closed = True
+
         self._cleanup()
 
+        while self.deferred_functions:
+            func = self.deferred_functions.pop()
+            catch_exp_call(func, logger)
+
     def closed(self):
         return self._closed
 
@@ -266,6 +275,10 @@ class ThreadBasedSession(AbstractSession):
         event_mq = queue.Queue(maxsize=self.event_mq_maxsize)  # 线程内的用户事件队列
         self.task_mqs[self._get_task_id(t)] = event_mq
 
+    def defer_call(self, func):
+        """设置会话结束时调用的函数。可以用于资源清理。"""
+        self.deferred_functions.append(func)
+
 
 class ScriptModeSession(ThreadBasedSession):
     """Script mode的会话实现"""
@@ -305,6 +318,9 @@ class ScriptModeSession(ThreadBasedSession):
         self._on_session_close = lambda: None
         self._loop = loop
 
+        # 会话结束时运行的函数
+        self.deferred_functions = []
+
         self.threads = []  # 当前会话的线程
         self.unhandled_task_msgs = LimitedSizeQueue(maxsize=self.unhandled_task_mq_maxsize)
 

+ 13 - 0
pywebio/utils.py

@@ -15,6 +15,19 @@ project_dir = dirname(abspath(__file__))
 STATIC_PATH = '%s/html' % project_dir
 
 
+def catch_exp_call(func, logger):
+    """运行函数,将捕获异常记录到日志
+
+    :param func: 函数
+    :param logger: 日志
+    :return: ``func`` 返回值
+    """
+    try:
+        return func()
+    except:
+        logger.exception("Error when invoke `%s`" % func)
+
+
 def iscoroutinefunction(object):
     while isinstance(object, functools.partial):
         object = object.func