httpbased.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. """
  2. 本模块提供基于Http轮训的后端通用类和函数
  3. .. attention::
  4. PyWebIO 的会话状态保存在进程内,基于HTTP的会话不支持多进程部署的后端服务
  5. 比如使用 ``uWSGI`` 部署后端服务,并使用 ``--processes n`` 选项设置了多进程;
  6. 或者使用 ``nginx`` 等反向代理将流量负载到多个后端副本上。
  7. """
  8. import asyncio
  9. import fnmatch
  10. import json
  11. import logging
  12. import threading
  13. import time
  14. from contextlib import contextmanager
  15. from typing import Dict
  16. from .page import make_applications, render_page
  17. from .utils import deserialize_binary_event
  18. from ..session import CoroutineBasedSession, Session, ThreadBasedSession, register_session_implement_for_target
  19. from ..session.base import get_session_info_from_headers
  20. from ..utils import random_str, LRUDict, isgeneratorfunction, iscoroutinefunction, check_webio_js
  21. class HttpContext:
  22. """一次Http请求的上下文, 不同的后端框架需要根据框架提供的方法实现本类的方法
  23. The context of an Http request"""
  24. backend_name = '' # 当前使用的Web框架名
  25. def request_obj(self):
  26. """返回当前请求对象
  27. Return the current request object"""
  28. pass
  29. def request_method(self):
  30. """返回当前请求的方法,大写
  31. Return the HTTP method of the current request, uppercase"""
  32. pass
  33. def request_headers(self) -> dict:
  34. """返回当前请求的header字典
  35. Return the header dictionary of the current request"""
  36. pass
  37. def request_url_parameter(self, name, default=None):
  38. """返回当前请求的URL参数
  39. Returns the value of the given URL parameter of the current request"""
  40. pass
  41. def request_body(self):
  42. """返回当前请求的body数据
  43. Returns the data of the current request body
  44. :return: bytes/bytearray
  45. """
  46. return b''
  47. def request_json(self) -> dict:
  48. """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None
  49. Return the data (json deserialization) of the currently requested, if the data is not in json format, return None"""
  50. try:
  51. if self.request_headers().get('content-type') == 'application/octet-stream':
  52. return deserialize_binary_event(self.request_body())
  53. return json.loads(self.request_body())
  54. except Exception:
  55. return None
  56. def set_header(self, name, value):
  57. """为当前响应设置header
  58. Set a header for the current response"""
  59. pass
  60. def set_status(self, status):
  61. """为当前响应设置http status
  62. Set http status for the current response"""
  63. pass
  64. def set_content(self, content, json_type=False):
  65. """设置响应的内容。方法应该仅被调用一次
  66. Set the content of the response. This method should only be called once
  67. :param str/bytes/json-able content:
  68. :param bool json_type: Whether to serialize content into json str and set content-type to application/json
  69. """
  70. pass
  71. def get_response(self):
  72. """获取当前的响应对象,用于在视图函数中返回
  73. Get the current response object"""
  74. pass
  75. def get_client_ip(self):
  76. """获取用户的ip
  77. Get the user's ip"""
  78. pass
  79. logger = logging.getLogger(__name__)
  80. _event_loop = None
  81. # todo: use lock to avoid thread race condition
  82. class HttpHandler:
  83. """基于HTTP的后端Handler实现
  84. .. note::
  85. Don't need a lock when access HttpHandler._webio_sessions, See:
  86. https://stackoverflow.com/questions/1312331/using-a-global-dictionary-with-threads-in-python
  87. """
  88. # type: Dict[str, Session]
  89. _webio_sessions = {} # WebIOSessionID -> WebIOSession()
  90. _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp. In increasing order of last active time
  91. _webio_expire_lock = threading.Lock()
  92. _last_check_session_expire_ts = 0 # Timestamp of the last check session validation
  93. # After processing the POST request, wait for WAIT_MS_ON_POST milliseconds before generate response
  94. WAIT_MS_ON_POST = 100
  95. DEFAULT_SESSION_EXPIRE_SECONDS = 600 # Default session expiration time
  96. DEFAULT_SESSIONS_CLEANUP_INTERVAL = 300 # Default interval for clearing expired sessions (in seconds)
  97. @classmethod
  98. def _remove_expired_sessions(cls, session_expire_seconds):
  99. """清除当前会话列表中的过期会话"""
  100. logger.debug("removing expired sessions")
  101. while cls._webio_expire:
  102. sid, active_ts = cls._webio_expire.popitem(last=False) # 弹出最不活跃的session info
  103. if time.time() - active_ts < session_expire_seconds:
  104. # this session is not expired
  105. cls._webio_expire[sid] = active_ts
  106. cls._webio_expire.move_to_end(sid, last=False)
  107. break
  108. # clean this session
  109. logger.debug("session %s expired" % sid)
  110. session = cls._webio_sessions.get(sid)
  111. if session:
  112. session.close(nonblock=True)
  113. del cls._webio_sessions[sid]
  114. @classmethod
  115. def _remove_webio_session(cls, sid):
  116. cls._webio_sessions.pop(sid, None)
  117. cls._webio_expire.pop(sid, None)
  118. def _process_cors(self, context: HttpContext):
  119. """Handling cross-domain requests: check the source of the request and set headers"""
  120. origin = context.request_headers().get('Origin', '')
  121. if self.check_origin(origin):
  122. context.set_header('Access-Control-Allow-Origin', origin)
  123. context.set_header('Access-Control-Allow-Methods', 'GET, POST')
  124. context.set_header('Access-Control-Allow-Headers', 'content-type, webio-session-id')
  125. context.set_header('Access-Control-Expose-Headers', 'webio-session-id')
  126. context.set_header('Access-Control-Max-Age', str(1440 * 60))
  127. def interval_cleaning(self):
  128. # clean up at intervals
  129. cls = type(self)
  130. need_clean = False
  131. if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
  132. with cls._webio_expire_lock:
  133. if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
  134. cls._last_check_session_expire_ts = time.time()
  135. need_clean = True
  136. if need_clean:
  137. cls._remove_expired_sessions(self.session_expire_seconds)
  138. def handle_request(self, context: HttpContext):
  139. try:
  140. with self.handle_request_context(context) as sleep_dur:
  141. if sleep_dur:
  142. time.sleep(sleep_dur)
  143. except RuntimeError:
  144. pass
  145. return context.get_response()
  146. async def handle_request_async(self, context: HttpContext):
  147. try:
  148. with self.handle_request_context(context) as sleep_dur:
  149. if sleep_dur:
  150. await asyncio.sleep(sleep_dur)
  151. except RuntimeError:
  152. pass
  153. return context.get_response()
  154. def get_cdn(self, context):
  155. if self.cdn is True and context.request_url_parameter('_pywebio_cdn', '') == 'false':
  156. return False
  157. return self.cdn
  158. @contextmanager
  159. def handle_request_context(self, context: HttpContext):
  160. """called when every http request"""
  161. cls = type(self)
  162. if _event_loop:
  163. asyncio.set_event_loop(_event_loop)
  164. request_headers = context.request_headers()
  165. # CORS process start ############################
  166. if context.request_method() == 'OPTIONS': # preflight request for CORS
  167. self._process_cors(context)
  168. context.set_status(204)
  169. return context.get_response()
  170. if request_headers.get('Origin'): # set headers for CORS request
  171. self._process_cors(context)
  172. # CORS process end ############################
  173. if context.request_url_parameter('test'): # 测试接口,当会话使用基于http的backend时,返回 ok
  174. context.set_content('ok')
  175. return context.get_response()
  176. # 对首页HTML的请求
  177. if 'webio-session-id' not in request_headers:
  178. app = self.app_loader(context)
  179. html = render_page(app, protocol='http', cdn=self.get_cdn(context))
  180. context.set_content(html)
  181. return context.get_response()
  182. webio_session_id = None
  183. # 初始请求,创建新 Session
  184. if not request_headers['webio-session-id'] or request_headers['webio-session-id'] == 'NEW':
  185. if context.request_method() == 'POST': # 不能在POST请求中创建Session,防止CSRF攻击
  186. context.set_status(403)
  187. return context.get_response()
  188. webio_session_id = random_str(24)
  189. context.set_header('webio-session-id', webio_session_id)
  190. session_info = get_session_info_from_headers(context.request_headers())
  191. session_info['user_ip'] = context.get_client_ip()
  192. session_info['request'] = context.request_obj()
  193. session_info['backend'] = context.backend_name
  194. session_info['protocol'] = 'http'
  195. application = self.app_loader(context)
  196. if iscoroutinefunction(application) or isgeneratorfunction(application):
  197. session_cls = CoroutineBasedSession
  198. else:
  199. session_cls = ThreadBasedSession
  200. webio_session = session_cls(application, session_info=session_info)
  201. cls._webio_sessions[webio_session_id] = webio_session
  202. yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <---
  203. elif request_headers['webio-session-id'] not in cls._webio_sessions: # WebIOSession deleted
  204. context.set_content([dict(command='close_session')], json_type=True)
  205. return context.get_response()
  206. else:
  207. webio_session_id = request_headers['webio-session-id']
  208. webio_session = cls._webio_sessions[webio_session_id]
  209. if context.request_method() == 'POST': # client push event
  210. if context.request_json() is not None:
  211. webio_session.send_client_event(context.request_json())
  212. yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <---
  213. elif context.request_method() == 'GET': # client pull messages
  214. pass
  215. cls._webio_expire[webio_session_id] = time.time()
  216. self.interval_cleaning()
  217. context.set_content(webio_session.get_task_commands(), json_type=True)
  218. if webio_session.closed():
  219. self._remove_webio_session(webio_session_id)
  220. return context.get_response()
  221. def __init__(self, applications=None, app_loader=None,
  222. cdn=True,
  223. session_expire_seconds=None,
  224. session_cleanup_interval=None,
  225. allowed_origins=None, check_origin=None):
  226. """Get the view function for running PyWebIO applications in Web framework.
  227. The view communicates with the client by HTTP protocol.
  228. :param callable app_loader: PyWebIO app factory, which receives the HttpContext instance as the parameter.
  229. Can not use `app_loader` and `applications` at the same time.
  230. The rest arguments of the constructor have the same meaning as for :func:`pywebio.platform.flask.start_server()`
  231. """
  232. check_webio_js()
  233. cls = type(self)
  234. self.cdn = cdn
  235. self.check_origin = check_origin
  236. self.session_expire_seconds = session_expire_seconds or cls.DEFAULT_SESSION_EXPIRE_SECONDS
  237. self.session_cleanup_interval = session_cleanup_interval or cls.DEFAULT_SESSIONS_CLEANUP_INTERVAL
  238. assert applications is not None or app_loader is not None
  239. if applications is not None:
  240. applications = make_applications(applications)
  241. def get_app(context):
  242. app_name = context.request_url_parameter('app', 'index')
  243. return applications.get(app_name) or applications['index']
  244. self.app_loader = app_loader or get_app
  245. for target in (applications or {}).values():
  246. register_session_implement_for_target(target)
  247. if check_origin is None:
  248. self.check_origin = lambda origin: any(
  249. fnmatch.fnmatch(origin, pattern)
  250. for pattern in (allowed_origins or [])
  251. )
  252. def run_event_loop(debug=False):
  253. """run asyncio event loop
  254. See also: :ref:`Integration coroutine-based session with Web framework <coroutine_web_integration>`
  255. :param debug: Set the debug mode of the event loop.
  256. See also: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
  257. """
  258. global _event_loop
  259. CoroutineBasedSession.event_loop_thread_id = threading.current_thread().ident
  260. _event_loop = asyncio.new_event_loop()
  261. _event_loop.set_debug(debug)
  262. asyncio.set_event_loop(_event_loop)
  263. _event_loop.run_forever()