httpbased.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. """
  2. 本模块提供基于Http轮训的后端通用类和函数
  3. .. attention::
  4. PyWebIO 的会话状态保存在进程内,所以不支持多进程部署的后端服务
  5. 比如使用 ``uWSGI`` 部署后端服务,并使用 ``--processes n`` 选项设置了多进程;
  6. 或者使用 ``nginx`` 等反向代理将流量负载到多个后端副本上。
  7. A note on run backend server with uWSGI:
  8. If you start uWSGI without threads, the Python GIL will not be enabled,
  9. so threads generated by your application will never run.
  10. `uWSGI doc <https://uwsgi-docs.readthedocs.io/en/latest/WSGIquickstart.html#a-note-on-python-threads>`_
  11. """
  12. import asyncio
  13. import fnmatch
  14. import logging
  15. import threading
  16. from typing import Dict
  17. import time
  18. from .utils import make_applications
  19. from ..session import CoroutineBasedSession, Session, ThreadBasedSession, register_session_implement_for_target
  20. from ..session.base import get_session_info_from_headers
  21. from ..utils import random_str, LRUDict, isgeneratorfunction, iscoroutinefunction
  22. class HttpContext:
  23. """一次Http请求的上下文, 不同的后端框架需要根据框架提供的方法实现本类的方法"""
  24. backend_name = '' # 当前使用的Web框架名
  25. def request_obj(self):
  26. """返回当前请求对象"""
  27. pass
  28. def request_method(self):
  29. """返回当前请求的方法,大写"""
  30. pass
  31. def request_headers(self):
  32. """返回当前请求的header字典"""
  33. pass
  34. def request_url_parameter(self, name, default=None):
  35. """返回当前请求的URL参数"""
  36. pass
  37. def request_json(self) -> dict:
  38. """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None"""
  39. pass
  40. def set_header(self, name, value):
  41. """为当前响应设置header"""
  42. pass
  43. def set_status(self, status):
  44. """为当前响应设置http status"""
  45. pass
  46. def set_content(self, content, json_type=False):
  47. """设置响应的内容。方法应该仅被调用一次
  48. :param content:
  49. :param bool json_type: content是否要序列化成json格式,并将 content-type 设置为application/json
  50. """
  51. pass
  52. def get_response(self):
  53. """获取当前的响应对象,用于在私图函数中返回"""
  54. pass
  55. def get_client_ip(self):
  56. """获取用户的ip"""
  57. pass
  58. logger = logging.getLogger(__name__)
  59. _event_loop = None
  60. # todo: use lock to avoid thread race condition
  61. class HttpHandler:
  62. """基于HTTP的后端Handler实现
  63. .. note::
  64. 对 HttpHandler._webio_sessions 的访问不需要加锁, See:
  65. https://stackoverflow.com/questions/1312331/using-a-global-dictionary-with-threads-in-python
  66. """
  67. # type: Dict[str, Session]
  68. _webio_sessions = {} # WebIOSessionID -> WebIOSession()
  69. _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp。按照最后活跃时间递增排列
  70. _webio_expire_lock = threading.Lock()
  71. _last_check_session_expire_ts = 0 # 上次检查session有效期的时间戳
  72. WAIT_MS_ON_POST = 100 # 在处理完POST请求时,等待WAIT_MS_ON_POST毫秒再读取返回数据。Task的command可以立即返回
  73. DEFAULT_SESSION_EXPIRE_SECONDS = 60 # 默认会话过期时间
  74. DEFAULT_SESSIONS_CLEANUP_INTERVAL = 20 # 默认清理过期会话间隔(秒)
  75. @classmethod
  76. def _remove_expired_sessions(cls, session_expire_seconds):
  77. """清除当前会话列表中的过期会话"""
  78. logger.debug("removing expired sessions")
  79. while cls._webio_expire:
  80. sid, active_ts = cls._webio_expire.popitem(last=False) # 弹出最不活跃的session info
  81. if time.time() - active_ts < session_expire_seconds:
  82. # 当前session未过期
  83. cls._webio_expire[sid] = active_ts
  84. cls._webio_expire.move_to_end(sid, last=False)
  85. break
  86. # 清理session
  87. logger.debug("session %s expired" % sid)
  88. session = cls._webio_sessions.get(sid)
  89. if session:
  90. session.close()
  91. del cls._webio_sessions[sid]
  92. @classmethod
  93. def _remove_webio_session(cls, sid):
  94. cls._webio_sessions.pop(sid, None)
  95. cls._webio_expire.pop(sid, None)
  96. def _process_cors(self, context: HttpContext):
  97. """处理跨域请求:检查请求来源并根据可访问性设置headers"""
  98. origin = context.request_headers().get('Origin', '')
  99. if self.check_origin(origin):
  100. context.set_header('Access-Control-Allow-Origin', origin)
  101. context.set_header('Access-Control-Allow-Methods', 'GET, POST')
  102. context.set_header('Access-Control-Allow-Headers', 'content-type, webio-session-id')
  103. context.set_header('Access-Control-Expose-Headers', 'webio-session-id')
  104. context.set_header('Access-Control-Max-Age', str(1440 * 60))
  105. def interval_cleaning(self):
  106. # clean up at intervals
  107. cls = type(self)
  108. need_clean = False
  109. with cls._webio_expire_lock:
  110. if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
  111. cls._last_check_session_expire_ts = time.time()
  112. need_clean = True
  113. if need_clean:
  114. cls._remove_expired_sessions(self.session_expire_seconds)
  115. def handle_request(self, context: HttpContext):
  116. """处理请求"""
  117. cls = type(self)
  118. if _event_loop:
  119. asyncio.set_event_loop(_event_loop)
  120. request_headers = context.request_headers()
  121. if context.request_method() == 'OPTIONS': # preflight request for CORS
  122. self._process_cors(context)
  123. context.set_status(204)
  124. return context.get_response()
  125. if request_headers.get('Origin'): # set headers for CORS request
  126. self._process_cors(context)
  127. if context.request_url_parameter('test'): # 测试接口,当会话使用给予http的backend时,返回 ok
  128. context.set_content('ok')
  129. return context.get_response()
  130. webio_session_id = None
  131. # webio-session-id 的请求头为空时,创建新 Session
  132. if 'webio-session-id' not in request_headers or not request_headers['webio-session-id']:
  133. if context.request_method() == 'POST': # 不能在POST请求中创建Session,防止CSRF攻击
  134. context.set_status(403)
  135. return context.get_response()
  136. webio_session_id = random_str(24)
  137. context.set_header('webio-session-id', webio_session_id)
  138. session_info = get_session_info_from_headers(context.request_headers())
  139. session_info['user_ip'] = context.get_client_ip()
  140. session_info['request'] = context.request_obj()
  141. session_info['backend'] = context.backend_name
  142. app_name = context.request_url_parameter('app', 'index')
  143. application = self.applications.get(app_name) or self.applications['index']
  144. if iscoroutinefunction(application) or isgeneratorfunction(application):
  145. session_cls = CoroutineBasedSession
  146. else:
  147. session_cls = ThreadBasedSession
  148. webio_session = session_cls(application, session_info=session_info)
  149. cls._webio_sessions[webio_session_id] = webio_session
  150. time.sleep(cls.WAIT_MS_ON_POST / 1000.0) # 等待session输出完毕
  151. elif request_headers['webio-session-id'] not in cls._webio_sessions: # WebIOSession deleted
  152. context.set_content([dict(command='close_session')], json_type=True)
  153. return context.get_response()
  154. else:
  155. webio_session_id = request_headers['webio-session-id']
  156. webio_session = cls._webio_sessions[webio_session_id]
  157. if context.request_method() == 'POST': # client push event
  158. if context.request_json() is not None:
  159. webio_session.send_client_event(context.request_json())
  160. time.sleep(cls.WAIT_MS_ON_POST / 1000.0) # 等待session输出完毕
  161. elif context.request_method() == 'GET': # client pull messages
  162. pass
  163. cls._webio_expire[webio_session_id] = time.time()
  164. self.interval_cleaning()
  165. context.set_content(webio_session.get_task_commands(), json_type=True)
  166. if webio_session.closed():
  167. self._remove_webio_session(webio_session_id)
  168. return context.get_response()
  169. def __init__(self, applications,
  170. session_expire_seconds=None,
  171. session_cleanup_interval=None,
  172. allowed_origins=None, check_origin=None):
  173. """获取用于与后端实现进行整合的view函数,基于http请求与前端进行通讯
  174. :param list/dict/callable applications: PyWebIO应用. 可以是任务函数或者任务函数的字典或列表。
  175. :param int session_expire_seconds: 会话不活跃过期时间。
  176. :param int session_cleanup_interval: 会话清理间隔。
  177. :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
  178. 来源包含协议和域名和端口部分,允许使用 Unix shell 风格的匹配模式:
  179. - ``*`` 为通配符
  180. - ``?`` 匹配单个字符
  181. - ``[seq]`` 匹配seq内的字符
  182. - ``[!seq]`` 匹配不在seq内的字符
  183. 比如 ``https://*.example.com`` 、 ``*://*.example.com``
  184. :param callable check_origin: 请求来源检查函数。接收请求来源(包含协议和域名和端口部分)字符串,
  185. 返回 ``True/False`` 。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略
  186. """
  187. cls = type(self)
  188. self.applications = make_applications(applications)
  189. self.check_origin = check_origin
  190. self.session_expire_seconds = session_expire_seconds or cls.DEFAULT_SESSION_EXPIRE_SECONDS
  191. self.session_cleanup_interval = session_cleanup_interval or cls.DEFAULT_SESSIONS_CLEANUP_INTERVAL
  192. for target in self.applications.values():
  193. register_session_implement_for_target(target)
  194. if check_origin is None:
  195. self.check_origin = lambda origin: any(
  196. fnmatch.fnmatch(origin, patten)
  197. for patten in allowed_origins or []
  198. )
  199. def run_event_loop(debug=False):
  200. """运行事件循环
  201. 基于协程的会话在启动基于线程的http服务器之前需要启动一个单独的线程来运行事件循环。
  202. :param debug: Set the debug mode of the event loop.
  203. See also: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
  204. """
  205. global _event_loop
  206. CoroutineBasedSession.event_loop_thread_id = threading.current_thread().ident
  207. _event_loop = asyncio.new_event_loop()
  208. _event_loop.set_debug(debug)
  209. asyncio.set_event_loop(_event_loop)
  210. _event_loop.run_forever()