Selaa lähdekoodia

maint: refine flask backend

wangweimin 5 vuotta sitten
vanhempi
säilyke
969203c3fd
4 muutettua tiedostoa jossa 285 lisäystä ja 175 poistoa
  1. 2 1
      docs/guide.rst
  2. 4 8
      pywebio/platform/__init__.py
  3. 55 166
      pywebio/platform/flask.py
  4. 224 0
      pywebio/platform/httpbased.py

+ 2 - 1
docs/guide.rst

@@ -433,7 +433,8 @@ PyWebIO的会话实现默认是基于线程的,用户每打开一个和服务
     from flask import Flask, send_from_directory
     from pywebio import STATIC_PATH
     from pywebio.output import *
-    from pywebio.platform.flask import webio_view, run_event_loop
+    from pywebio.platform.flask import webio_view
+    from pywebio.platform.httpbased import run_event_loop
     from pywebio.session import run_asyncio_coroutine
 
     async def hello_word():

+ 4 - 8
pywebio/platform/__init__.py

@@ -11,17 +11,13 @@ Flask相关
 --------------
 
 .. autofunction:: pywebio.platform.flask.webio_view
-.. autofunction:: pywebio.platform.flask.run_event_loop
+
 .. autofunction:: pywebio.platform.flask.start_server
 
+其他
+--------------
+.. autofunction:: pywebio.platform.httpbased.run_event_loop
 
 """
 
-from . import tornado
 from .tornado import start_server
-
-try:
-    from . import flask
-except ImportError:
-    pass
-

+ 55 - 166
pywebio/platform/flask.py

@@ -2,167 +2,75 @@
 Flask backend
 
 .. note::
-    在 AsyncBasedSession 会话中,若在协程任务函数内调用 asyncio 中的协程函数,需要使用 asyncio_coroutine
-
-
-.. attention::
-    PyWebIO 的会话状态保存在进程内,所以不支持多进程部署的Flask。
-        比如使用 ``uWSGI`` 部署Flask,并使用 ``--processes n`` 选项设置了多进程;
-        或者使用 ``nginx`` 等反向代理将流量负载到多个 Flask 副本上。
-
-    A note on run Flask with uWSGI:
-
-    If you start uWSGI without threads, the Python GIL will not be enabled,
-    so threads generated by your application will never run. `uWSGI doc <https://uwsgi-docs.readthedocs.io/en/latest/WSGIquickstart.html#a-note-on-python-threads>`_
-    在Flask backend中,PyWebIO使用单独一个线程来运行事件循环。如果程序中没有使用到asyncio中的协程函数,
-    可以在 start_flask_server 参数中设置 ``disable_asyncio=False`` 来关闭对asyncio协程函数的支持。
-    如果您需要使用asyncio协程函数,那么需要在在uWSGI中使用 ``--enable-thread`` 选项开启线程支持。
-
+    在 CoroutineBasedSession 会话中,若在协程任务函数内调用 asyncio 中的协程函数,需要使用 asyncio_coroutine
 """
-import asyncio
-import fnmatch
+import json
 import logging
 import threading
-import time
-from functools import partial
-from typing import Dict
 
-from flask import Flask, request, jsonify, send_from_directory, Response
+from flask import Flask, request, send_from_directory, Response
 
-from ..session import CoroutineBasedSession, AbstractSession, register_session_implement_for_target
+from .httpbased import HttpContext, HttpHandler, run_event_loop
+from ..session import register_session_implement_for_target
 from ..utils import STATIC_PATH, iscoroutinefunction, isgeneratorfunction
-from ..utils import random_str, LRUDict
 
 logger = logging.getLogger(__name__)
 
-# todo: use lock to avoid thread race condition
-
-# type: Dict[str, AbstractSession]
-_webio_sessions = {}  # WebIOSessionID -> WebIOSession()
-_webio_expire = LRUDict()  # WebIOSessionID -> last active timestamp。按照最后活跃时间递增排列
-
-DEFAULT_SESSION_EXPIRE_SECONDS = 60  # 超过60s会话不活跃则视为会话过期
-SESSIONS_CLEANUP_INTERVAL = 20  # 清理过期会话间隔(秒)
-WAIT_MS_ON_POST = 100  # 在处理完POST请求时,等待WAIT_MS_ON_POST毫秒再读取返回数据。Task的command可以立即返回
-
-_event_loop = None
-
-
-def _make_response(webio_session: AbstractSession):
-    return jsonify(webio_session.get_task_commands())
-
-
-def _remove_expired_sessions(session_expire_seconds):
-    logger.debug("removing expired sessions")
-    """清除当前会话列表中的过期会话"""
-    while _webio_expire:
-        sid, active_ts = _webio_expire.popitem(last=False)
-
-        if time.time() - active_ts < session_expire_seconds:
-            # 当前session未过期
-            _webio_expire[sid] = active_ts
-            _webio_expire.move_to_end(sid, last=False)
-            break
-
-        # 清理session
-        logger.debug("session %s expired" % sid)
-        session = _webio_sessions.get(sid)
-        if session:
-            session.close()
-            del _webio_sessions[sid]
-
-
-_last_check_session_expire_ts = 0  # 上次检查session有效期的时间戳
-
 
-def _remove_webio_session(sid):
-    _webio_sessions.pop(sid, None)
-    _webio_expire.pop(sid, None)
+class FlaskHttpContext(HttpContext):
+    def __init__(self):
+        self.response = Response()
+        self.request_data = request.get_data()
 
+    def request_method(self):
+        """返回当前请求的方法,大写"""
+        return request.method
 
-def cors_headers(origin, check_origin, headers=None):
-    if headers is None:
-        headers = {}
+    def request_headers(self):
+        """返回当前请求的header字典"""
+        return request.headers
 
-    if check_origin(origin):
-        headers['Access-Control-Allow-Origin'] = origin
-        headers['Access-Control-Allow-Methods'] = 'GET, POST'
-        headers['Access-Control-Allow-Headers'] = 'content-type, webio-session-id'
-        headers['Access-Control-Expose-Headers'] = 'webio-session-id'
-        headers['Access-Control-Max-Age'] = 1440 * 60
+    def request_url_parameter(self, name, default=None):
+        """返回当前请求的URL参数"""
+        return request.args.get(name, default=default)
 
-    return headers
+    def request_json(self):
+        """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None"""
+        try:
+            return json.loads(self.request_data)
+        except Exception:
+            return None
 
+    def set_header(self, name, value):
+        """为当前响应设置header"""
+        self.response.headers[name] = value
 
-def _webio_view(target, session_cls, session_expire_seconds, session_cleanup_interval, check_origin):
-    """
-    :param target: 任务函数
-    :param session_cls: 会话实现类
-    :param session_expire_seconds: 会话不活跃过期时间。
-    :param session_cleanup_interval: 会话清理间隔。
-    :param callable check_origin: callback(origin) -> bool
-    :return:
-    """
-    global _last_check_session_expire_ts, _event_loop
-    if _event_loop:
-        asyncio.set_event_loop(_event_loop)
-
-    if request.method == 'OPTIONS':  # preflight request for CORS
-        headers = cors_headers(request.headers.get('Origin', ''), check_origin)
-        return Response('', headers=headers, status=204)
-
-    headers = {}
-
-    if request.headers.get('Origin'):  # set headers for CORS request
-        headers = cors_headers(request.headers.get('Origin'), check_origin, headers=headers)
-
-    if request.args.get('test'):  # 测试接口,当会话使用给予http的backend时,返回 ok
-        return Response('ok', headers=headers)
-
-    webio_session_id = None
-
-    # webio-session-id 的请求头为空时,创建新 Session
-    if 'webio-session-id' not in request.headers or not request.headers['webio-session-id']:  # start new WebIOSession
-        webio_session_id = random_str(24)
-        headers['webio-session-id'] = webio_session_id
-        webio_session = session_cls(target)
-        _webio_sessions[webio_session_id] = webio_session
-    elif request.headers['webio-session-id'] not in _webio_sessions:  # WebIOSession deleted
-        return jsonify([dict(command='close_session')])
-    else:
-        webio_session_id = request.headers['webio-session-id']
-        webio_session = _webio_sessions[webio_session_id]
-
-    if request.method == 'POST':  # client push event
-        if request.json is not None:
-            webio_session.send_client_event(request.json)
-            time.sleep(WAIT_MS_ON_POST / 1000.0)
-    elif request.method == 'GET':  # client pull messages
-        pass
+    def set_status(self, status: int):
+        """为当前响应设置http status"""
+        self.response.status_code = status
 
-    _webio_expire[webio_session_id] = time.time()
-    # clean up at intervals
-    if time.time() - _last_check_session_expire_ts > session_cleanup_interval:
-        _last_check_session_expire_ts = time.time()
-        _remove_expired_sessions(session_expire_seconds)
+    def set_content(self, content, json_type=False):
+        """设置相应的内容
 
-    response = _make_response(webio_session)
+        :param content:
+        :param bool json_type: content是否要序列化成json格式,并将 content-type 设置为application/json
+        """
+        if json_type:
+            self.set_header('content-type', 'application/json')
+            self.response.data = json.dumps(content)
+        else:
+            self.response.data = content
 
-    if webio_session.closed():
-        _remove_webio_session(webio_session_id)
-
-    # set header to response
-    for k, v in headers.items():
-        response.headers[k] = v
-
-    return response
+    def get_response(self):
+        """获取当前的响应对象,用于在私图函数中返回"""
+        return self.response
 
 
 def webio_view(target,
-               session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS,
-               session_cleanup_interval=SESSIONS_CLEANUP_INTERVAL,
+               session_expire_seconds=None,
+               session_cleanup_interval=None,
                allowed_origins=None, check_origin=None):
-    """获取用于与Flask进行整合的view函数
+    """获取用于与后端实现进行整合的view函数,基于http请求与前端进行通讯
 
     :param target: 任务函数。任务函数为协程函数时,使用 :ref:`基于协程的会话实现 <coroutine_based_session>` ;任务函数为普通函数时,使用基于线程的会话实现。
     :param int session_expire_seconds: 会话不活跃过期时间。
@@ -180,44 +88,25 @@ def webio_view(target,
         返回 ``True/False`` 。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略
     :return: Flask视图函数
     """
-
     session_cls = register_session_implement_for_target(target)
+    handler = HttpHandler(target=target, session_cls=session_cls,
+                          session_expire_seconds=session_expire_seconds,
+                          session_cleanup_interval=session_cleanup_interval,
+                          allowed_origins=allowed_origins, check_origin=check_origin)
 
-    if check_origin is None:
-        check_origin = lambda origin: any(
-            fnmatch.fnmatch(origin, patten)
-            for patten in allowed_origins or []
-        )
+    def view_func():
+        context = FlaskHttpContext()
+        return handler.handle_request(context)
 
-    view_func = partial(_webio_view, target=target, session_cls=session_cls,
-                        session_expire_seconds=session_expire_seconds,
-                        session_cleanup_interval=session_cleanup_interval,
-                        check_origin=check_origin)
     view_func.__name__ = 'webio_view'
     return view_func
 
 
-def run_event_loop(debug=False):
-    """运行事件循环
-
-    基于协程的会话在启动Flask服务器之前需要启动一个单独的线程来运行事件循环。
-
-    :param debug: Set the debug mode of the event loop.
-       See also: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
-    """
-    global _event_loop
-    CoroutineBasedSession.event_loop_thread_id = threading.current_thread().ident
-    _event_loop = asyncio.new_event_loop()
-    _event_loop.set_debug(debug)
-    asyncio.set_event_loop(_event_loop)
-    _event_loop.run_forever()
-
-
 def start_server(target, port=8080, host='localhost',
                  allowed_origins=None, check_origin=None,
                  disable_asyncio=False,
-                 session_cleanup_interval=SESSIONS_CLEANUP_INTERVAL,
-                 session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS,
+                 session_cleanup_interval=None,
+                 session_expire_seconds=None,
                  debug=False, **flask_options):
     """启动一个 Flask server 来运行PyWebIO的 ``target`` 服务
 

+ 224 - 0
pywebio/platform/httpbased.py

@@ -0,0 +1,224 @@
+"""
+本模块提供基于Http轮训的后端通用类和函数
+
+.. attention::
+    PyWebIO 的会话状态保存在进程内,所以不支持多进程部署的后端服务
+        比如使用 ``uWSGI`` 部署后端服务,并使用 ``--processes n`` 选项设置了多进程;
+        或者使用 ``nginx`` 等反向代理将流量负载到多个后端副本上。
+
+    A note on run backend server with uWSGI:
+
+    If you start uWSGI without threads, the Python GIL will not be enabled,
+    so threads generated by your application will never run.
+    `uWSGI doc <https://uwsgi-docs.readthedocs.io/en/latest/WSGIquickstart.html#a-note-on-python-threads>`_
+
+"""
+import asyncio
+import fnmatch
+import logging
+import threading
+from typing import Dict
+
+import time
+
+from ..session import CoroutineBasedSession, AbstractSession, register_session_implement_for_target
+from ..utils import random_str, LRUDict
+
+
+class HttpContext:
+    """一次Http请求的上下文, 不同的后端框架需要根据框架提供的方法实现本类的方法"""
+
+    def request_method(self):
+        """返回当前请求的方法,大写"""
+        pass
+
+    def request_headers(self):
+        """返回当前请求的header字典"""
+        pass
+
+    def request_url_parameter(self, name, default=None):
+        """返回当前请求的URL参数"""
+        pass
+
+    def request_json(self):
+        """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None"""
+        pass
+
+    def set_header(self, name, value):
+        """为当前响应设置header"""
+        pass
+
+    def set_status(self, status):
+        """为当前响应设置http status"""
+        pass
+
+    def set_content(self, content, json_type=False):
+        """设置响应的内容。方法应该仅被调用一次
+
+        :param content:
+        :param bool json_type: content是否要序列化成json格式,并将 content-type 设置为application/json
+        """
+        pass
+
+    def get_response(self):
+        """获取当前的响应对象,用于在私图函数中返回"""
+
+
+logger = logging.getLogger(__name__)
+_event_loop = None
+
+
+# todo: use lock to avoid thread race condition
+class HttpHandler:
+    # type: Dict[str, AbstractSession]
+    _webio_sessions = {}  # WebIOSessionID -> WebIOSession()
+    _webio_expire = LRUDict()  # WebIOSessionID -> last active timestamp。按照最后活跃时间递增排列
+
+    _last_check_session_expire_ts = 0  # 上次检查session有效期的时间戳
+
+    DEFAULT_SESSION_EXPIRE_SECONDS = 60  # 超过60s会话不活跃则视为会话过期
+    SESSIONS_CLEANUP_INTERVAL = 20  # 清理过期会话间隔(秒)
+    WAIT_MS_ON_POST = 100  # 在处理完POST请求时,等待WAIT_MS_ON_POST毫秒再读取返回数据。Task的command可以立即返回
+
+    @classmethod
+    def _remove_expired_sessions(cls, session_expire_seconds):
+        logger.debug("removing expired sessions")
+        """清除当前会话列表中的过期会话"""
+        while cls._webio_expire:
+            sid, active_ts = cls._webio_expire.popitem(last=False)
+
+            if time.time() - active_ts < session_expire_seconds:
+                # 当前session未过期
+                cls._webio_expire[sid] = active_ts
+                cls._webio_expire.move_to_end(sid, last=False)
+                break
+
+            # 清理session
+            logger.debug("session %s expired" % sid)
+            session = cls._webio_sessions.get(sid)
+            if session:
+                session.close()
+                del cls._webio_sessions[sid]
+
+    @classmethod
+    def _remove_webio_session(cls, sid):
+        cls._webio_sessions.pop(sid, None)
+        cls._webio_expire.pop(sid, None)
+
+    def _process_cors(self, context: HttpContext):
+        """处理跨域请求:检查请求来源并根据可访问性设置headers"""
+        origin = context.request_headers().get('Origin', '')
+        if self.check_origin(origin):
+            context.set_header('Access-Control-Allow-Origin', origin)
+            context.set_header('Access-Control-Allow-Methods', 'GET, POST')
+            context.set_header('Access-Control-Allow-Headers', 'content-type, webio-session-id')
+            context.set_header('Access-Control-Expose-Headers', 'webio-session-id')
+            context.set_header('Access-Control-Max-Age', str(1440 * 60))
+
+    def handle_request(self, context: HttpContext):
+        """处理请求"""
+        cls = type(self)
+
+        if _event_loop:
+            asyncio.set_event_loop(_event_loop)
+
+        request_headers = context.request_headers()
+
+        if context.request_method() == 'OPTIONS':  # preflight request for CORS
+            self._process_cors(context)
+            context.set_status(204)
+            return context.get_response()
+
+        if request_headers.get('Origin'):  # set headers for CORS request
+            self._process_cors(context)
+
+        if context.request_url_parameter('test'):  # 测试接口,当会话使用给予http的backend时,返回 ok
+            context.set_content('ok')
+            return context.get_response()
+
+        webio_session_id = None
+
+        # webio-session-id 的请求头为空时,创建新 Session
+        if 'webio-session-id' not in request_headers or not request_headers[
+            'webio-session-id']:  # start new WebIOSession
+            webio_session_id = random_str(24)
+            context.set_header('webio-session-id', webio_session_id)
+            webio_session = self.session_cls(self.target)
+            cls._webio_sessions[webio_session_id] = webio_session
+        elif request_headers['webio-session-id'] not in cls._webio_sessions:  # WebIOSession deleted
+            context.set_content([dict(command='close_session')], json_type=True)
+            return context.get_response()
+        else:
+            webio_session_id = request_headers['webio-session-id']
+            webio_session = cls._webio_sessions[webio_session_id]
+
+        if context.request_method() == 'POST':  # client push event
+            if context.request_json() is not None:
+                webio_session.send_client_event(context.request_json())
+                time.sleep(cls.WAIT_MS_ON_POST / 1000.0)
+        elif context.request_method() == 'GET':  # client pull messages
+            pass
+
+        cls._webio_expire[webio_session_id] = time.time()
+        # clean up at intervals
+        if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
+            cls._last_check_session_expire_ts = time.time()
+            self._remove_expired_sessions(self.session_expire_seconds)
+
+        context.set_content(webio_session.get_task_commands(), json_type=True)
+
+        if webio_session.closed():
+            self._remove_webio_session(webio_session_id)
+
+        return context.get_response()
+
+    def __init__(self, target,session_cls,
+                 session_expire_seconds=None,
+                 session_cleanup_interval=None,
+                 allowed_origins=None, check_origin=None):
+        """获取用于与后端实现进行整合的view函数,基于http请求与前端进行通讯
+
+        :param target: 任务函数。任务函数为协程函数时,使用 :ref:`基于协程的会话实现 <coroutine_based_session>` ;任务函数为普通函数时,使用基于线程的会话实现。
+        :param int session_expire_seconds: 会话不活跃过期时间。
+        :param int session_cleanup_interval: 会话清理间隔。
+        :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
+            来源包含协议和域名和端口部分,允许使用 Unix shell 风格的匹配模式:
+
+            - ``*`` 为通配符
+            - ``?`` 匹配单个字符
+            - ``[seq]`` 匹配seq内的字符
+            - ``[!seq]`` 匹配不在seq内的字符
+
+            比如 ``https://*.example.com`` 、 ``*://*.example.com``
+        :param callable check_origin: 请求来源检查函数。接收请求来源(包含协议和域名和端口部分)字符串,
+            返回 ``True/False`` 。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略
+        """
+        cls = type(self)
+
+        self.target = target
+        self.session_cls = session_cls
+        self.check_origin = check_origin
+        self.session_expire_seconds = session_expire_seconds or cls.DEFAULT_SESSION_EXPIRE_SECONDS
+        self.session_cleanup_interval = session_cleanup_interval or cls.SESSIONS_CLEANUP_INTERVAL
+
+        if check_origin is None:
+            self.check_origin = lambda origin: any(
+                fnmatch.fnmatch(origin, patten)
+                for patten in allowed_origins or []
+            )
+
+
+def run_event_loop(debug=False):
+    """运行事件循环
+
+    基于协程的会话在启动基于线程的http服务器之前需要启动一个单独的线程来运行事件循环。
+
+    :param debug: Set the debug mode of the event loop.
+       See also: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
+    """
+    global _event_loop
+    CoroutineBasedSession.event_loop_thread_id = threading.current_thread().ident
+    _event_loop = asyncio.new_event_loop()
+    _event_loop.set_debug(debug)
+    asyncio.set_event_loop(_event_loop)
+    _event_loop.run_forever()