flask.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. """
  2. Flask backend
  3. .. note::
  4. 在 AsyncBasedSession 会话中,若在协程任务函数内调用 asyncio 中的协程函数,需要使用 asyncio_coroutine
  5. .. attention::
  6. PyWebIO 的会话状态保存在进程内,所以不支持多进程部署的Flask。
  7. 比如使用 ``uWSGI`` 部署Flask,并使用 ``--processes n`` 选项设置了多进程;
  8. 或者使用 ``nginx`` 等反向代理将流量负载到多个 Flask 副本上。
  9. A note on run Flask with uWSGI:
  10. If you start uWSGI without threads, the Python GIL will not be enabled,
  11. so threads generated by your application will never run. `uWSGI doc <https://uwsgi-docs.readthedocs.io/en/latest/WSGIquickstart.html#a-note-on-python-threads>`_
  12. 在Flask backend中,PyWebIO使用单独一个线程来运行事件循环。如果程序中没有使用到asyncio中的协程函数,
  13. 可以在 start_flask_server 参数中设置 ``disable_asyncio=False`` 来关闭对asyncio协程函数的支持。
  14. 如果您需要使用asyncio协程函数,那么需要在在uWSGI中使用 ``--enable-thread`` 选项开启线程支持。
  15. """
  16. import asyncio
  17. import fnmatch
  18. import threading
  19. import time
  20. from functools import partial
  21. from typing import Dict
  22. from flask import Flask, request, jsonify, send_from_directory, Response
  23. from ..session import CoroutineBasedSession, get_session_implement, AbstractSession, \
  24. register_session_implement_for_target
  25. from ..utils import STATIC_PATH
  26. from ..utils import random_str, LRUDict
  27. # todo: use lock to avoid thread race condition
  28. # type: Dict[str, AbstractSession]
  29. _webio_sessions = {} # WebIOSessionID -> WebIOSession()
  30. _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp
  31. DEFAULT_SESSION_EXPIRE_SECONDS = 60 # 超过60s会话不活跃则视为会话过期
  32. REMOVE_EXPIRED_SESSIONS_INTERVAL = 20 # 清理过期会话间隔(秒)
  33. WAIT_MS_ON_POST = 100 # 在处理完POST请求时,等待WAIT_MS_ON_POST毫秒再读取返回数据。Task的command可以立即返回
  34. _event_loop = None
  35. def _make_response(webio_session: AbstractSession):
  36. return jsonify(webio_session.get_task_commands())
  37. def _remove_expired_sessions(session_expire_seconds):
  38. while _webio_expire:
  39. sid, active_ts = _webio_expire.popitem(last=False)
  40. if time.time() - active_ts < session_expire_seconds:
  41. _webio_expire[sid] = active_ts
  42. _webio_expire.move_to_end(sid, last=False)
  43. break
  44. del _webio_sessions[sid]
  45. _last_check_session_expire_ts = 0 # 上次检查session有效期的时间戳
  46. def _remove_webio_session(sid):
  47. del _webio_sessions[sid]
  48. del _webio_expire[sid]
  49. def cors_headers(origin, check_origin, headers=None):
  50. if headers is None:
  51. headers = {}
  52. if check_origin(origin):
  53. headers['Access-Control-Allow-Origin'] = origin
  54. headers['Access-Control-Allow-Methods'] = 'GET, POST'
  55. headers['Access-Control-Allow-Headers'] = 'content-type, webio-session-id'
  56. headers['Access-Control-Expose-Headers'] = 'webio-session-id'
  57. headers['Access-Control-Max-Age'] = 1440 * 60
  58. return headers
  59. def _webio_view(target, session_cls, session_expire_seconds, check_origin):
  60. """
  61. :param target:
  62. :param session_expire_seconds:
  63. :return:
  64. """
  65. global _last_check_session_expire_ts, _event_loop
  66. if _event_loop:
  67. asyncio.set_event_loop(_event_loop)
  68. if request.method == 'OPTIONS': # preflight request for CORS
  69. headers = cors_headers(request.headers.get('Origin', ''), check_origin)
  70. return Response('', headers=headers, status=204)
  71. headers = {}
  72. if request.headers.get('Origin'): # set headers for CORS request
  73. headers = cors_headers(request.headers.get('Origin'), check_origin, headers=headers)
  74. if request.args.get('test'): # 测试接口,当会话使用给予http的backend时,返回 ok
  75. return Response('ok', headers=headers)
  76. webio_session_id = None
  77. # webio-session-id 的请求头为空时,创建新 Session
  78. if 'webio-session-id' not in request.headers or not request.headers['webio-session-id']: # start new WebIOSession
  79. webio_session_id = random_str(24)
  80. headers['webio-session-id'] = webio_session_id
  81. webio_session = session_cls(target)
  82. _webio_sessions[webio_session_id] = webio_session
  83. _webio_expire[webio_session_id] = time.time()
  84. elif request.headers['webio-session-id'] not in _webio_sessions: # WebIOSession deleted
  85. return jsonify([dict(command='close_session')])
  86. else:
  87. webio_session_id = request.headers['webio-session-id']
  88. webio_session = _webio_sessions[webio_session_id]
  89. if request.method == 'POST': # client push event
  90. webio_session.send_client_event(request.json)
  91. time.sleep(WAIT_MS_ON_POST / 1000.0)
  92. elif request.method == 'GET': # client pull messages
  93. pass
  94. # clean up at intervals
  95. if time.time() - _last_check_session_expire_ts > REMOVE_EXPIRED_SESSIONS_INTERVAL:
  96. _remove_expired_sessions(session_expire_seconds)
  97. _last_check_session_expire_ts = time.time()
  98. response = _make_response(webio_session)
  99. if webio_session.closed():
  100. _remove_webio_session(webio_session_id)
  101. # set header to response
  102. for k, v in headers.items():
  103. response.headers[k] = v
  104. return response
  105. def webio_view(target, session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS, allowed_origins=None, check_origin=None):
  106. """获取用于与Flask进行整合的view函数
  107. :param target: 任务函数。任务函数为协程函数时,使用 :ref:`基于协程的会话实现 <coroutine_based_session>` ;任务函数为普通函数时,使用基于线程的会话实现。
  108. :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
  109. :param session_expire_seconds: 会话不活跃过期时间。
  110. :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
  111. 来源包含协议和域名和端口部分,允许使用 ``*`` 作为通配符。 比如 ``https://*.example.com`` 、 ``*://*.example.com`` 、
  112. :param callable check_origin: 请求来源检查函数。接收请求来源(包含协议和域名和端口部分)字符串,
  113. 返回 ``True/False`` 。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略
  114. :return: Flask视图函数
  115. """
  116. session_cls = register_session_implement_for_target(target)
  117. if check_origin is None:
  118. check_origin = lambda origin: any(
  119. fnmatch.fnmatch(origin, patten)
  120. for patten in allowed_origins
  121. )
  122. view_func = partial(_webio_view, target=target, session_cls=session_cls,
  123. session_expire_seconds=session_expire_seconds,
  124. check_origin=check_origin)
  125. view_func.__name__ = 'webio_view'
  126. return view_func
  127. def run_event_loop(debug=False):
  128. """运行事件循环
  129. 基于协程的会话在启动Flask服务器之前需要启动一个单独的线程来运行事件循环。
  130. :param debug: Set the debug mode of the event loop.
  131. See also: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
  132. """
  133. global _event_loop
  134. _event_loop = asyncio.new_event_loop()
  135. _event_loop.set_debug(debug)
  136. asyncio.set_event_loop(_event_loop)
  137. _event_loop.run_forever()
  138. def start_server(target, port=8080, host='localhost',
  139. allowed_origins=None, check_origin=None,
  140. disable_asyncio=False,
  141. session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS,
  142. debug=False, **flask_options):
  143. """启动一个 Flask server 来运行PyWebIO的 ``target`` 服务
  144. :param target: task function. It's a coroutine function is use CoroutineBasedSession or
  145. a simple function is use ThreadBasedSession.
  146. :param port: server bind port. set ``0`` to find a free port number to use
  147. :param host: server bind host. ``host`` may be either an IP address or hostname. If it's a hostname,
  148. :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
  149. 来源包含协议和域名和端口部分,允许使用 ``*`` 作为通配符。 比如 ``https://*.example.com`` 、 ``*://*.example.com`` 、
  150. :param callable check_origin: 请求来源检查函数。接收请求来源(包含协议和域名和端口部分)字符串,
  151. 返回 ``True/False`` 。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略
  152. :param disable_asyncio: 禁用 asyncio 函数。仅在当 ``session_type=COROUTINE_BASED`` 时有效。
  153. 在Flask backend中使用asyncio需要单独开启一个线程来运行事件循环,
  154. 若程序中没有使用到asyncio中的异步函数,可以开启此选项来避免不必要的资源浪费
  155. :param session_expire_seconds: 会话过期时间。若 session_expire_seconds 秒内没有收到客户端的请求,则认为会话过期。
  156. :param debug: Flask debug mode
  157. :param flask_options: Additional keyword arguments passed to the constructor of ``flask.Flask.run``.
  158. ref: https://flask.palletsprojects.com/en/1.1.x/api/?highlight=flask%20run#flask.Flask.run
  159. """
  160. app = Flask(__name__)
  161. app.route('/io', methods=['GET', 'POST', 'OPTIONS'])(
  162. webio_view(target, session_expire_seconds,
  163. allowed_origins=allowed_origins,
  164. check_origin=check_origin)
  165. )
  166. @app.route('/')
  167. @app.route('/<path:static_file>')
  168. def serve_static_file(static_file='index.html'):
  169. return send_from_directory(STATIC_PATH, static_file)
  170. if not disable_asyncio and get_session_implement() is CoroutineBasedSession:
  171. threading.Thread(target=run_event_loop, daemon=True).start()
  172. app.run(host=host, port=port, debug=debug, **flask_options)