1
0

httpbased.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. """
  2. 本模块提供基于Http轮训的后端通用类和函数
  3. .. attention::
  4. PyWebIO 的会话状态保存在进程内,基于HTTP的会话不支持多进程部署的后端服务
  5. 比如使用 ``uWSGI`` 部署后端服务,并使用 ``--processes n`` 选项设置了多进程;
  6. 或者使用 ``nginx`` 等反向代理将流量负载到多个后端副本上。
  7. """
  8. import asyncio
  9. import fnmatch
  10. import json
  11. import logging
  12. import threading
  13. import time
  14. from contextlib import contextmanager
  15. from typing import Dict
  16. from .utils import make_applications, render_page, deserialize_binary_event
  17. from ..session import CoroutineBasedSession, Session, ThreadBasedSession, register_session_implement_for_target
  18. from ..session.base import get_session_info_from_headers
  19. from ..utils import random_str, LRUDict, isgeneratorfunction, iscoroutinefunction, check_webio_js
  20. class HttpContext:
  21. """一次Http请求的上下文, 不同的后端框架需要根据框架提供的方法实现本类的方法
  22. The context of an Http request"""
  23. backend_name = '' # 当前使用的Web框架名
  24. def request_obj(self):
  25. """返回当前请求对象
  26. Return the current request object"""
  27. pass
  28. def request_method(self):
  29. """返回当前请求的方法,大写
  30. Return the HTTP method of the current request, uppercase"""
  31. pass
  32. def request_headers(self) -> dict:
  33. """返回当前请求的header字典
  34. Return the header dictionary of the current request"""
  35. pass
  36. def request_url_parameter(self, name, default=None):
  37. """返回当前请求的URL参数
  38. Returns the value of the given URL parameter of the current request"""
  39. pass
  40. def request_body(self):
  41. """返回当前请求的body数据
  42. Returns the data of the current request body
  43. :return: bytes/bytearray
  44. """
  45. return b''
  46. def request_json(self) -> dict:
  47. """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None
  48. Return the data (json deserialization) of the currently requested, if the data is not in json format, return None"""
  49. try:
  50. if self.request_headers().get('content-type') == 'application/octet-stream':
  51. return deserialize_binary_event(self.request_body())
  52. return json.loads(self.request_body())
  53. except Exception:
  54. return None
  55. def set_header(self, name, value):
  56. """为当前响应设置header
  57. Set a header for the current response"""
  58. pass
  59. def set_status(self, status):
  60. """为当前响应设置http status
  61. Set http status for the current response"""
  62. pass
  63. def set_content(self, content, json_type=False):
  64. """设置响应的内容。方法应该仅被调用一次
  65. Set the content of the response. This method should only be called once
  66. :param str/bytes/json-able content:
  67. :param bool json_type: Whether to serialize content into json str and set content-type to application/json
  68. """
  69. pass
  70. def get_response(self):
  71. """获取当前的响应对象,用于在视图函数中返回
  72. Get the current response object"""
  73. pass
  74. def get_client_ip(self):
  75. """获取用户的ip
  76. Get the user's ip"""
  77. pass
  78. logger = logging.getLogger(__name__)
  79. _event_loop = None
  80. # todo: use lock to avoid thread race condition
  81. class HttpHandler:
  82. """基于HTTP的后端Handler实现
  83. .. note::
  84. Don't need a lock when access HttpHandler._webio_sessions, See:
  85. https://stackoverflow.com/questions/1312331/using-a-global-dictionary-with-threads-in-python
  86. """
  87. # type: Dict[str, Session]
  88. _webio_sessions = {} # WebIOSessionID -> WebIOSession()
  89. _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp. In increasing order of last active time
  90. _webio_expire_lock = threading.Lock()
  91. _last_check_session_expire_ts = 0 # Timestamp of the last check session validation
  92. # After processing the POST request, wait for WAIT_MS_ON_POST milliseconds before generate response
  93. WAIT_MS_ON_POST = 100
  94. DEFAULT_SESSION_EXPIRE_SECONDS = 600 # Default session expiration time
  95. DEFAULT_SESSIONS_CLEANUP_INTERVAL = 300 # Default interval for clearing expired sessions (in seconds)
  96. @classmethod
  97. def _remove_expired_sessions(cls, session_expire_seconds):
  98. """清除当前会话列表中的过期会话"""
  99. logger.debug("removing expired sessions")
  100. while cls._webio_expire:
  101. sid, active_ts = cls._webio_expire.popitem(last=False) # 弹出最不活跃的session info
  102. if time.time() - active_ts < session_expire_seconds:
  103. # this session is not expired
  104. cls._webio_expire[sid] = active_ts
  105. cls._webio_expire.move_to_end(sid, last=False)
  106. break
  107. # clean this session
  108. logger.debug("session %s expired" % sid)
  109. session = cls._webio_sessions.get(sid)
  110. if session:
  111. session.close(nonblock=True)
  112. del cls._webio_sessions[sid]
  113. @classmethod
  114. def _remove_webio_session(cls, sid):
  115. cls._webio_sessions.pop(sid, None)
  116. cls._webio_expire.pop(sid, None)
  117. def _process_cors(self, context: HttpContext):
  118. """Handling cross-domain requests: check the source of the request and set headers"""
  119. origin = context.request_headers().get('Origin', '')
  120. if self.check_origin(origin):
  121. context.set_header('Access-Control-Allow-Origin', origin)
  122. context.set_header('Access-Control-Allow-Methods', 'GET, POST')
  123. context.set_header('Access-Control-Allow-Headers', 'content-type, webio-session-id')
  124. context.set_header('Access-Control-Expose-Headers', 'webio-session-id')
  125. context.set_header('Access-Control-Max-Age', str(1440 * 60))
  126. def interval_cleaning(self):
  127. # clean up at intervals
  128. cls = type(self)
  129. need_clean = False
  130. if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
  131. with cls._webio_expire_lock:
  132. if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
  133. cls._last_check_session_expire_ts = time.time()
  134. need_clean = True
  135. if need_clean:
  136. cls._remove_expired_sessions(self.session_expire_seconds)
  137. def handle_request(self, context: HttpContext):
  138. try:
  139. with self.handle_request_context(context) as sleep_dur:
  140. if sleep_dur:
  141. time.sleep(sleep_dur)
  142. except RuntimeError:
  143. pass
  144. return context.get_response()
  145. async def handle_request_async(self, context: HttpContext):
  146. try:
  147. with self.handle_request_context(context) as sleep_dur:
  148. if sleep_dur:
  149. await asyncio.sleep(sleep_dur)
  150. except RuntimeError:
  151. pass
  152. return context.get_response()
  153. @contextmanager
  154. def handle_request_context(self, context: HttpContext):
  155. """called when every http request"""
  156. cls = type(self)
  157. if _event_loop:
  158. asyncio.set_event_loop(_event_loop)
  159. request_headers = context.request_headers()
  160. # CORS process start ############################
  161. if context.request_method() == 'OPTIONS': # preflight request for CORS
  162. self._process_cors(context)
  163. context.set_status(204)
  164. return context.get_response()
  165. if request_headers.get('Origin'): # set headers for CORS request
  166. self._process_cors(context)
  167. # CORS process end ############################
  168. if context.request_url_parameter('test'): # 测试接口,当会话使用基于http的backend时,返回 ok
  169. context.set_content('ok')
  170. return context.get_response()
  171. # 对首页HTML的请求
  172. if 'webio-session-id' not in request_headers:
  173. app = self.app_loader(context)
  174. html = render_page(app, protocol='http', cdn=self.cdn)
  175. context.set_content(html)
  176. return context.get_response()
  177. webio_session_id = None
  178. # 初始请求,创建新 Session
  179. if not request_headers['webio-session-id'] or request_headers['webio-session-id'] == 'NEW':
  180. if context.request_method() == 'POST': # 不能在POST请求中创建Session,防止CSRF攻击
  181. context.set_status(403)
  182. return context.get_response()
  183. webio_session_id = random_str(24)
  184. context.set_header('webio-session-id', webio_session_id)
  185. session_info = get_session_info_from_headers(context.request_headers())
  186. session_info['user_ip'] = context.get_client_ip()
  187. session_info['request'] = context.request_obj()
  188. session_info['backend'] = context.backend_name
  189. session_info['protocol'] = 'http'
  190. application = self.app_loader(context)
  191. if iscoroutinefunction(application) or isgeneratorfunction(application):
  192. session_cls = CoroutineBasedSession
  193. else:
  194. session_cls = ThreadBasedSession
  195. webio_session = session_cls(application, session_info=session_info)
  196. cls._webio_sessions[webio_session_id] = webio_session
  197. yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <---
  198. elif request_headers['webio-session-id'] not in cls._webio_sessions: # WebIOSession deleted
  199. context.set_content([dict(command='close_session')], json_type=True)
  200. return context.get_response()
  201. else:
  202. webio_session_id = request_headers['webio-session-id']
  203. webio_session = cls._webio_sessions[webio_session_id]
  204. if context.request_method() == 'POST': # client push event
  205. if context.request_json() is not None:
  206. webio_session.send_client_event(context.request_json())
  207. yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <---
  208. elif context.request_method() == 'GET': # client pull messages
  209. pass
  210. cls._webio_expire[webio_session_id] = time.time()
  211. self.interval_cleaning()
  212. context.set_content(webio_session.get_task_commands(), json_type=True)
  213. if webio_session.closed():
  214. self._remove_webio_session(webio_session_id)
  215. return context.get_response()
  216. def __init__(self, applications=None, app_loader=None,
  217. cdn=True,
  218. session_expire_seconds=None,
  219. session_cleanup_interval=None,
  220. allowed_origins=None, check_origin=None):
  221. """Get the view function for running PyWebIO applications in Web framework.
  222. The view communicates with the client by HTTP protocol.
  223. :param callable app_loader: PyWebIO app factory, which receives the HttpContext instance as the parameter.
  224. Can not use `app_loader` and `applications` at the same time.
  225. The rest arguments of the constructor have the same meaning as for :func:`pywebio.platform.flask.start_server()`
  226. """
  227. check_webio_js()
  228. cls = type(self)
  229. self.cdn = cdn
  230. self.check_origin = check_origin
  231. self.session_expire_seconds = session_expire_seconds or cls.DEFAULT_SESSION_EXPIRE_SECONDS
  232. self.session_cleanup_interval = session_cleanup_interval or cls.DEFAULT_SESSIONS_CLEANUP_INTERVAL
  233. assert applications is not None or app_loader is not None
  234. if applications is not None:
  235. applications = make_applications(applications)
  236. def get_app(context):
  237. app_name = context.request_url_parameter('app', 'index')
  238. return applications.get(app_name) or applications['index']
  239. self.app_loader = app_loader or get_app
  240. for target in (applications or {}).values():
  241. register_session_implement_for_target(target)
  242. if check_origin is None:
  243. self.check_origin = lambda origin: any(
  244. fnmatch.fnmatch(origin, pattern)
  245. for pattern in (allowed_origins or [])
  246. )
  247. def run_event_loop(debug=False):
  248. """run asyncio event loop
  249. See also: :ref:`Integration coroutine-based session with Web framework <coroutine_web_integration>`
  250. :param debug: Set the debug mode of the event loop.
  251. See also: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
  252. """
  253. global _event_loop
  254. CoroutineBasedSession.event_loop_thread_id = threading.current_thread().ident
  255. _event_loop = asyncio.new_event_loop()
  256. _event_loop.set_debug(debug)
  257. asyncio.set_event_loop(_event_loop)
  258. _event_loop.run_forever()