flask.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. """
  2. Flask backend
  3. .. attention::
  4. PyWebIO 的会话状态保存在进程内,所以不支持多进程部署的Flask。
  5. 比如使用 ``uWSGI`` 部署Flask,并使用 ``--processes n`` 选项设置了多进程;
  6. 或者使用 ``nginx`` 等反向代理将流量负载到多个 Flask 副本上。
  7. A note on run Flask with uWSGI:
  8. If you start uWSGI without threads, the Python GIL will not be enabled,
  9. so threads generated by your application will never run. `uWSGI doc <https://uwsgi-docs.readthedocs.io/en/latest/WSGIquickstart.html#a-note-on-python-threads>`_
  10. 在Flask backend中,PyWebIO使用单独一个线程来运行事件循环。如果程序中没有使用到asyncio中的协程函数,
  11. 可以在 start_flask_server 参数中设置 ``disable_asyncio=False`` 来关闭对asyncio协程函数的支持。
  12. 如果您需要使用asyncio协程函数,那么需要在在uWSGI中使用 ``--enable-thread`` 选项开启线程支持。
  13. """
  14. import asyncio
  15. import threading
  16. import time
  17. from functools import partial
  18. from typing import Dict
  19. from flask import Flask, request, jsonify, send_from_directory
  20. from . import STATIC_PATH
  21. from ..session import AsyncBasedSession
  22. from ..utils import random_str, LRUDict
  23. # todo: use lock to avoid thread race condition
  24. _webio_sessions: Dict[str, AsyncBasedSession] = {} # WebIOSessionID -> WebIOSession()
  25. _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp
  26. DEFAULT_SESSION_EXPIRE_SECONDS = 60 * 60 * 4 # 超过4个小时会话不活跃则视为会话过期
  27. REMOVE_EXPIRED_SESSIONS_INTERVAL = 120 # 清理过期会话间隔(秒)
  28. _event_loop = None
  29. def _make_response(webio_session: AsyncBasedSession):
  30. return jsonify(webio_session.get_task_messages())
  31. def _remove_expired_sessions(session_expire_seconds):
  32. while _webio_expire:
  33. sid, active_ts = _webio_expire.popitem(last=False)
  34. if time.time() - active_ts < session_expire_seconds:
  35. _webio_expire[sid] = active_ts
  36. _webio_expire.move_to_end(sid, last=False)
  37. break
  38. del _webio_sessions[sid]
  39. _last_check_session_expire_ts = 0 # 上次检查session有效期的时间戳
  40. def _remove_webio_session(sid):
  41. del _webio_sessions[sid]
  42. del _webio_expire[sid]
  43. def _webio_view(coro_func, session_expire_seconds):
  44. """
  45. todo use cookie instead of session
  46. :param coro_func:
  47. :param session_expire_seconds:
  48. :return:
  49. """
  50. if request.args.get('test'): # 测试接口,当会话使用给予http的backend时,返回 ok
  51. return 'ok'
  52. global _last_check_session_expire_ts, _event_loop
  53. if _event_loop:
  54. asyncio.set_event_loop(_event_loop)
  55. webio_session_id = None
  56. if 'webio_session_id' not in request.cookies: # start new WebIOSession
  57. webio_session_id = random_str(24)
  58. webio_session = AsyncBasedSession(coro_func)
  59. _webio_sessions[webio_session_id] = webio_session
  60. _webio_expire[webio_session_id] = time.time()
  61. elif request.cookies['webio_session_id'] not in _webio_sessions: # WebIOSession deleted
  62. return jsonify([dict(command='close_session')])
  63. else:
  64. webio_session_id = request.cookies['webio_session_id']
  65. webio_session = _webio_sessions[webio_session_id]
  66. if request.method == 'POST': # client push event
  67. webio_session.send_client_event(request.json)
  68. elif request.method == 'GET': # client pull messages
  69. pass
  70. if time.time() - _last_check_session_expire_ts > REMOVE_EXPIRED_SESSIONS_INTERVAL:
  71. _remove_expired_sessions(session_expire_seconds)
  72. _last_check_session_expire_ts = time.time()
  73. response = _make_response(webio_session)
  74. if webio_session.closed():
  75. _remove_webio_session(webio_session_id)
  76. elif 'webio_session_id' not in request.cookies:
  77. response.set_cookie('webio_session_id', webio_session_id)
  78. return response
  79. def webio_view(coro_func, session_expire_seconds):
  80. """获取Flask view"""
  81. view_func = partial(_webio_view, coro_func=coro_func, session_expire_seconds=session_expire_seconds)
  82. view_func.__name__ = 'webio_view'
  83. return view_func
  84. def _setup_event_loop():
  85. global _event_loop
  86. _event_loop = asyncio.new_event_loop()
  87. _event_loop.set_debug(True)
  88. asyncio.set_event_loop(_event_loop)
  89. _event_loop.run_forever()
  90. def start_flask_server(coro_func, port=8080, host='localhost', disable_asyncio=False,
  91. session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS,
  92. debug=False, **flask_options):
  93. """
  94. :param coro_func:
  95. :param port:
  96. :param host:
  97. :param disable_asyncio: 禁用 asyncio 函数。在Flask backend中使用asyncio需要单独开启一个线程来运行事件循环,
  98. 若程序中没有使用到asyncio中的异步函数,可以开启此选项来避免不必要的资源浪费
  99. :param session_expire_seconds:
  100. :param debug:
  101. :param flask_options:
  102. :return:
  103. """
  104. app = Flask(__name__)
  105. app.route('/io', methods=['GET', 'POST'])(webio_view(coro_func, session_expire_seconds))
  106. @app.route('/')
  107. def index_page():
  108. return send_from_directory(STATIC_PATH, 'index.html')
  109. @app.route('/<path:static_file>')
  110. def serve_static_file(static_file):
  111. return send_from_directory(STATIC_PATH, static_file)
  112. if not disable_asyncio:
  113. threading.Thread(target=_setup_event_loop, daemon=True).start()
  114. app.run(host=host, port=port, debug=debug, **flask_options)