flask.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. """
  2. Flask backend
  3. """
  4. import asyncio
  5. import threading
  6. import time
  7. from functools import partial
  8. from typing import Dict
  9. from flask import Flask, request, jsonify, send_from_directory
  10. from . import STATIC_PATH
  11. from ..framework import WebIOSession
  12. from ..utils import random_str, LRUDict
  13. # todo Flask 的线程模型是否会造成竞争条件?
  14. _webio_sessions: Dict[str, WebIOSession] = {} # WebIOSessionID -> WebIOSession()
  15. _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp
  16. DEFAULT_SESSION_EXPIRE_SECONDS = 60 * 60 * 4 # 超过4个小时会话不活跃则视为会话过期
  17. REMOVE_EXPIRED_SESSIONS_INTERVAL = 120 # 清理过期会话间隔(秒)
  18. _event_loop = None
  19. def _make_response(webio_session: WebIOSession):
  20. res = webio_session.unhandled_server_msgs
  21. webio_session.unhandled_server_msgs = []
  22. return jsonify(res)
  23. def _remove_expired_sessions(session_expire_seconds):
  24. while _webio_expire:
  25. sid, active_ts = _webio_expire.popitem(last=False)
  26. if time.time() - active_ts < session_expire_seconds:
  27. _webio_expire[sid] = active_ts
  28. _webio_expire.move_to_end(sid, last=False)
  29. break
  30. del _webio_sessions[sid]
  31. _last_check_session_expire_ts = 0 # 上次检查session有效期的时间戳
  32. def _remove_webio_session(sid):
  33. del _webio_sessions[sid]
  34. del _webio_expire[sid]
  35. def _webio_view(coro_func, session_expire_seconds):
  36. """
  37. todo use cookie instead of session
  38. :param coro_func:
  39. :param session_expire_seconds:
  40. :return:
  41. """
  42. global _last_check_session_expire_ts, _event_loop
  43. if _event_loop:
  44. asyncio.set_event_loop(_event_loop)
  45. webio_session_id = None
  46. if 'webio_session_id' not in request.cookies: # start new WebIOSession
  47. webio_session_id = random_str(24)
  48. webio_session = WebIOSession(coro_func)
  49. _webio_sessions[webio_session_id] = webio_session
  50. _webio_expire[webio_session_id] = time.time()
  51. elif request.cookies['webio_session_id'] not in _webio_sessions: # WebIOSession deleted
  52. return jsonify([dict(command='close_session')])
  53. else:
  54. webio_session_id = request.cookies['webio_session_id']
  55. webio_session = _webio_sessions[webio_session_id]
  56. if request.method == 'POST': # client push event
  57. webio_session.send_client_msg(request.json)
  58. elif request.method == 'GET': # client pull messages
  59. pass
  60. if time.time() - _last_check_session_expire_ts > REMOVE_EXPIRED_SESSIONS_INTERVAL:
  61. _remove_expired_sessions(session_expire_seconds)
  62. _last_check_session_expire_ts = time.time()
  63. response = _make_response(webio_session)
  64. if webio_session.closed():
  65. _remove_webio_session(webio_session_id)
  66. elif 'webio_session_id' not in request.cookies:
  67. response.set_cookie('webio_session_id', webio_session_id)
  68. return response
  69. def webio_view(coro_func, session_expire_seconds):
  70. """获取Flask view"""
  71. view_func = partial(_webio_view, coro_func=coro_func, session_expire_seconds=session_expire_seconds)
  72. view_func.__name__ = 'webio_view'
  73. return view_func
  74. def _setup_event_loop():
  75. global _event_loop
  76. _event_loop = asyncio.new_event_loop()
  77. _event_loop.set_debug(True)
  78. asyncio.set_event_loop(_event_loop)
  79. _event_loop.run_forever()
  80. def start_flask_server(coro_func, port=8080, host='localhost', disable_asyncio=False,
  81. session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS,
  82. debug=False, **flask_options):
  83. """
  84. :param coro_func:
  85. :param port:
  86. :param host:
  87. :param disable_asyncio: 禁用 asyncio 函数。在Flask backend中使用asyncio需要单独开启一个线程来运行事件循环,
  88. 若程序中没有使用到asyncio中的异步函数,可以开启此选项来避免不必要的资源浪费
  89. :param session_expire_seconds:
  90. :param debug:
  91. :param flask_options:
  92. :return:
  93. """
  94. app = Flask(__name__)
  95. app.route('/io', methods=['GET', 'POST'])(webio_view(coro_func, session_expire_seconds))
  96. @app.route('/')
  97. def index_page():
  98. return send_from_directory(STATIC_PATH, 'index.html')
  99. @app.route('/<path:static_file>')
  100. def serve_static_file(static_file):
  101. return send_from_directory(STATIC_PATH, static_file)
  102. if not disable_asyncio:
  103. threading.Thread(target=_setup_event_loop, daemon=True).start()
  104. app.run(host=host, port=port, debug=debug, **flask_options)