httpbased.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. """
  2. 本模块提供基于Http轮训的后端通用类和函数
  3. .. attention::
  4. PyWebIO 的会话状态保存在进程内,基于HTTP的会话不支持多进程部署的后端服务
  5. 比如使用 ``uWSGI`` 部署后端服务,并使用 ``--processes n`` 选项设置了多进程;
  6. 或者使用 ``nginx`` 等反向代理将流量负载到多个后端副本上。
  7. """
  8. import asyncio
  9. import fnmatch
  10. import logging
  11. import threading
  12. import time
  13. from contextlib import contextmanager
  14. from typing import Dict
  15. from .utils import make_applications, render_page
  16. from ..session import CoroutineBasedSession, Session, ThreadBasedSession, register_session_implement_for_target
  17. from ..session.base import get_session_info_from_headers
  18. from ..utils import random_str, LRUDict, isgeneratorfunction, iscoroutinefunction, check_webio_js
  19. class HttpContext:
  20. """一次Http请求的上下文, 不同的后端框架需要根据框架提供的方法实现本类的方法
  21. The context of an Http request"""
  22. backend_name = '' # 当前使用的Web框架名
  23. def request_obj(self):
  24. """返回当前请求对象
  25. Return the current request object"""
  26. pass
  27. def request_method(self):
  28. """返回当前请求的方法,大写
  29. Return the HTTP method of the current request, uppercase"""
  30. pass
  31. def request_headers(self) -> dict:
  32. """返回当前请求的header字典
  33. Return the header dictionary of the current request"""
  34. pass
  35. def request_url_parameter(self, name, default=None):
  36. """返回当前请求的URL参数
  37. Returns the value of the given URL parameter of the current request"""
  38. pass
  39. def request_json(self) -> dict:
  40. """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None
  41. Return the data (json deserialization) of the currently requested, if the data is not in json format, return None"""
  42. pass
  43. def set_header(self, name, value):
  44. """为当前响应设置header
  45. Set a header for the current response"""
  46. pass
  47. def set_status(self, status):
  48. """为当前响应设置http status
  49. Set http status for the current response"""
  50. pass
  51. def set_content(self, content, json_type=False):
  52. """设置响应的内容。方法应该仅被调用一次
  53. Set the content of the response. This method should only be called once
  54. :param str/bytes/json-able content:
  55. :param bool json_type: Whether to serialize content into json str and set content-type to application/json
  56. """
  57. pass
  58. def get_response(self):
  59. """获取当前的响应对象,用于在视图函数中返回
  60. Get the current response object"""
  61. pass
  62. def get_client_ip(self):
  63. """获取用户的ip
  64. Get the user's ip"""
  65. pass
  66. logger = logging.getLogger(__name__)
  67. _event_loop = None
  68. # todo: use lock to avoid thread race condition
  69. class HttpHandler:
  70. """基于HTTP的后端Handler实现
  71. .. note::
  72. Don't need a lock when access HttpHandler._webio_sessions, See:
  73. https://stackoverflow.com/questions/1312331/using-a-global-dictionary-with-threads-in-python
  74. """
  75. # type: Dict[str, Session]
  76. _webio_sessions = {} # WebIOSessionID -> WebIOSession()
  77. _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp. In increasing order of last active time
  78. _webio_expire_lock = threading.Lock()
  79. _last_check_session_expire_ts = 0 # Timestamp of the last check session validation
  80. # After processing the POST request, wait for WAIT_MS_ON_POST milliseconds before generate response
  81. WAIT_MS_ON_POST = 100
  82. DEFAULT_SESSION_EXPIRE_SECONDS = 600 # Default session expiration time
  83. DEFAULT_SESSIONS_CLEANUP_INTERVAL = 300 # Default interval for clearing expired sessions (in seconds)
  84. @classmethod
  85. def _remove_expired_sessions(cls, session_expire_seconds):
  86. """清除当前会话列表中的过期会话"""
  87. logger.debug("removing expired sessions")
  88. while cls._webio_expire:
  89. sid, active_ts = cls._webio_expire.popitem(last=False) # 弹出最不活跃的session info
  90. if time.time() - active_ts < session_expire_seconds:
  91. # this session is not expired
  92. cls._webio_expire[sid] = active_ts
  93. cls._webio_expire.move_to_end(sid, last=False)
  94. break
  95. # clean this session
  96. logger.debug("session %s expired" % sid)
  97. session = cls._webio_sessions.get(sid)
  98. if session:
  99. session.close(nonblock=True)
  100. del cls._webio_sessions[sid]
  101. @classmethod
  102. def _remove_webio_session(cls, sid):
  103. cls._webio_sessions.pop(sid, None)
  104. cls._webio_expire.pop(sid, None)
  105. def _process_cors(self, context: HttpContext):
  106. """Handling cross-domain requests: check the source of the request and set headers"""
  107. origin = context.request_headers().get('Origin', '')
  108. if self.check_origin(origin):
  109. context.set_header('Access-Control-Allow-Origin', origin)
  110. context.set_header('Access-Control-Allow-Methods', 'GET, POST')
  111. context.set_header('Access-Control-Allow-Headers', 'content-type, webio-session-id')
  112. context.set_header('Access-Control-Expose-Headers', 'webio-session-id')
  113. context.set_header('Access-Control-Max-Age', str(1440 * 60))
  114. def interval_cleaning(self):
  115. # clean up at intervals
  116. cls = type(self)
  117. need_clean = False
  118. if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
  119. with cls._webio_expire_lock:
  120. if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
  121. cls._last_check_session_expire_ts = time.time()
  122. need_clean = True
  123. if need_clean:
  124. cls._remove_expired_sessions(self.session_expire_seconds)
  125. def handle_request(self, context: HttpContext):
  126. try:
  127. with self.handle_request_context(context) as sleep_dur:
  128. if sleep_dur:
  129. time.sleep(sleep_dur)
  130. except RuntimeError:
  131. pass
  132. return context.get_response()
  133. async def handle_request_async(self, context: HttpContext):
  134. try:
  135. with self.handle_request_context(context) as sleep_dur:
  136. if sleep_dur:
  137. await asyncio.sleep(sleep_dur)
  138. except RuntimeError:
  139. pass
  140. return context.get_response()
  141. @contextmanager
  142. def handle_request_context(self, context: HttpContext):
  143. """called when every http request"""
  144. cls = type(self)
  145. if _event_loop:
  146. asyncio.set_event_loop(_event_loop)
  147. request_headers = context.request_headers()
  148. # CORS process start ############################
  149. if context.request_method() == 'OPTIONS': # preflight request for CORS
  150. self._process_cors(context)
  151. context.set_status(204)
  152. return context.get_response()
  153. if request_headers.get('Origin'): # set headers for CORS request
  154. self._process_cors(context)
  155. # CORS process end ############################
  156. if context.request_url_parameter('test'): # 测试接口,当会话使用基于http的backend时,返回 ok
  157. context.set_content('ok')
  158. return context.get_response()
  159. # 对首页HTML的请求
  160. if 'webio-session-id' not in request_headers:
  161. app = self.app_loader(context)
  162. html = render_page(app, protocol='http', cdn=self.cdn)
  163. context.set_content(html)
  164. return context.get_response()
  165. webio_session_id = None
  166. # 初始请求,创建新 Session
  167. if not request_headers['webio-session-id'] or request_headers['webio-session-id'] == 'NEW':
  168. if context.request_method() == 'POST': # 不能在POST请求中创建Session,防止CSRF攻击
  169. context.set_status(403)
  170. return context.get_response()
  171. webio_session_id = random_str(24)
  172. context.set_header('webio-session-id', webio_session_id)
  173. session_info = get_session_info_from_headers(context.request_headers())
  174. session_info['user_ip'] = context.get_client_ip()
  175. session_info['request'] = context.request_obj()
  176. session_info['backend'] = context.backend_name
  177. session_info['protocol'] = 'http'
  178. application = self.app_loader(context)
  179. if iscoroutinefunction(application) or isgeneratorfunction(application):
  180. session_cls = CoroutineBasedSession
  181. else:
  182. session_cls = ThreadBasedSession
  183. webio_session = session_cls(application, session_info=session_info)
  184. cls._webio_sessions[webio_session_id] = webio_session
  185. yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <---
  186. elif request_headers['webio-session-id'] not in cls._webio_sessions: # WebIOSession deleted
  187. context.set_content([dict(command='close_session')], json_type=True)
  188. return context.get_response()
  189. else:
  190. webio_session_id = request_headers['webio-session-id']
  191. webio_session = cls._webio_sessions[webio_session_id]
  192. if context.request_method() == 'POST': # client push event
  193. if context.request_json() is not None:
  194. webio_session.send_client_event(context.request_json())
  195. yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <---
  196. elif context.request_method() == 'GET': # client pull messages
  197. pass
  198. cls._webio_expire[webio_session_id] = time.time()
  199. self.interval_cleaning()
  200. context.set_content(webio_session.get_task_commands(), json_type=True)
  201. if webio_session.closed():
  202. self._remove_webio_session(webio_session_id)
  203. return context.get_response()
  204. def __init__(self, applications=None, app_loader=None,
  205. cdn=True,
  206. session_expire_seconds=None,
  207. session_cleanup_interval=None,
  208. allowed_origins=None, check_origin=None):
  209. """Get the view function for running PyWebIO applications in Web framework.
  210. The view communicates with the client by HTTP protocol.
  211. :param callable app_loader: PyWebIO app factory, which receives the HttpContext instance as the parameter.
  212. Can not use `app_loader` and `applications` at the same time.
  213. The rest arguments of the constructor have the same meaning as for :func:`pywebio.platform.flask.start_server()`
  214. """
  215. check_webio_js()
  216. cls = type(self)
  217. self.cdn = cdn
  218. self.check_origin = check_origin
  219. self.session_expire_seconds = session_expire_seconds or cls.DEFAULT_SESSION_EXPIRE_SECONDS
  220. self.session_cleanup_interval = session_cleanup_interval or cls.DEFAULT_SESSIONS_CLEANUP_INTERVAL
  221. assert applications is not None or app_loader is not None
  222. if applications is not None:
  223. applications = make_applications(applications)
  224. def get_app(context):
  225. app_name = context.request_url_parameter('app', 'index')
  226. return applications.get(app_name) or applications['index']
  227. self.app_loader = app_loader or get_app
  228. for target in (applications or {}).values():
  229. register_session_implement_for_target(target)
  230. if check_origin is None:
  231. self.check_origin = lambda origin: any(
  232. fnmatch.fnmatch(origin, patten)
  233. for patten in (allowed_origins or [])
  234. )
  235. def run_event_loop(debug=False):
  236. """run asyncio event loop
  237. See also: :ref:`Integration coroutine-based session with Web framework <coroutine_web_integration>`
  238. :param debug: Set the debug mode of the event loop.
  239. See also: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
  240. """
  241. global _event_loop
  242. CoroutineBasedSession.event_loop_thread_id = threading.current_thread().ident
  243. _event_loop = asyncio.new_event_loop()
  244. _event_loop.set_debug(debug)
  245. asyncio.set_event_loop(_event_loop)
  246. _event_loop.run_forever()