flask.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. """
  2. Flask backend
  3. """
  4. import asyncio
  5. import threading
  6. import time
  7. from functools import partial
  8. from typing import Dict
  9. from flask import Flask, request, jsonify, send_from_directory
  10. from . import STATIC_PATH
  11. from ..framework import WebIOSession
  12. from ..utils import random_str, LRUDict
  13. # todo Flask 的线程模型是否会造成竞争条件?
  14. _webio_sessions: Dict[str, WebIOSession] = {} # WebIOSessionID -> WebIOSession()
  15. _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp
  16. DEFAULT_SESSION_EXPIRE_SECONDS = 60 * 60 * 4 # 超过4个小时会话不活跃则视为会话过期
  17. REMOVE_EXPIRED_SESSIONS_INTERVAL = 120 # 清理过期会话间隔(秒)
  18. _event_loop = None
  19. def _make_response(webio_session: WebIOSession):
  20. res = webio_session.unhandled_server_msgs
  21. webio_session.unhandled_server_msgs = []
  22. return jsonify(res)
  23. def _remove_expired_sessions(session_expire_seconds):
  24. while _webio_expire:
  25. sid, active_ts = _webio_expire.popitem(last=False)
  26. if time.time() - active_ts < session_expire_seconds:
  27. _webio_expire[sid] = active_ts
  28. _webio_expire.move_to_end(sid, last=False)
  29. break
  30. del _webio_sessions[sid]
  31. _last_check_session_expire_ts = 0 # 上次检查session有效期的时间戳
  32. def _remove_webio_session(sid):
  33. del _webio_sessions[sid]
  34. del _webio_expire[sid]
  35. def _webio_view(coro_func, session_expire_seconds):
  36. """
  37. todo use cookie instead of session
  38. :param coro_func:
  39. :param session_expire_seconds:
  40. :return:
  41. """
  42. if request.args.get('test'): # 测试接口,当会话使用给予http的backend时,返回 ok
  43. return 'ok'
  44. global _last_check_session_expire_ts, _event_loop
  45. if _event_loop:
  46. asyncio.set_event_loop(_event_loop)
  47. webio_session_id = None
  48. if 'webio_session_id' not in request.cookies: # start new WebIOSession
  49. webio_session_id = random_str(24)
  50. webio_session = WebIOSession(coro_func)
  51. _webio_sessions[webio_session_id] = webio_session
  52. _webio_expire[webio_session_id] = time.time()
  53. elif request.cookies['webio_session_id'] not in _webio_sessions: # WebIOSession deleted
  54. return jsonify([dict(command='close_session')])
  55. else:
  56. webio_session_id = request.cookies['webio_session_id']
  57. webio_session = _webio_sessions[webio_session_id]
  58. if request.method == 'POST': # client push event
  59. webio_session.send_client_msg(request.json)
  60. elif request.method == 'GET': # client pull messages
  61. pass
  62. if time.time() - _last_check_session_expire_ts > REMOVE_EXPIRED_SESSIONS_INTERVAL:
  63. _remove_expired_sessions(session_expire_seconds)
  64. _last_check_session_expire_ts = time.time()
  65. response = _make_response(webio_session)
  66. if webio_session.closed():
  67. _remove_webio_session(webio_session_id)
  68. elif 'webio_session_id' not in request.cookies:
  69. response.set_cookie('webio_session_id', webio_session_id)
  70. return response
  71. def webio_view(coro_func, session_expire_seconds):
  72. """获取Flask view"""
  73. view_func = partial(_webio_view, coro_func=coro_func, session_expire_seconds=session_expire_seconds)
  74. view_func.__name__ = 'webio_view'
  75. return view_func
  76. def _setup_event_loop():
  77. global _event_loop
  78. _event_loop = asyncio.new_event_loop()
  79. _event_loop.set_debug(True)
  80. asyncio.set_event_loop(_event_loop)
  81. _event_loop.run_forever()
  82. def start_flask_server(coro_func, port=8080, host='localhost', disable_asyncio=False,
  83. session_expire_seconds=DEFAULT_SESSION_EXPIRE_SECONDS,
  84. debug=False, **flask_options):
  85. """
  86. :param coro_func:
  87. :param port:
  88. :param host:
  89. :param disable_asyncio: 禁用 asyncio 函数。在Flask backend中使用asyncio需要单独开启一个线程来运行事件循环,
  90. 若程序中没有使用到asyncio中的异步函数,可以开启此选项来避免不必要的资源浪费
  91. :param session_expire_seconds:
  92. :param debug:
  93. :param flask_options:
  94. :return:
  95. """
  96. app = Flask(__name__)
  97. app.route('/io', methods=['GET', 'POST'])(webio_view(coro_func, session_expire_seconds))
  98. @app.route('/')
  99. def index_page():
  100. return send_from_directory(STATIC_PATH, 'index.html')
  101. @app.route('/<path:static_file>')
  102. def serve_static_file(static_file):
  103. return send_from_directory(STATIC_PATH, static_file)
  104. if not disable_asyncio:
  105. threading.Thread(target=_setup_event_loop, daemon=True).start()
  106. app.run(host=host, port=port, debug=debug, **flask_options)