1
0

http.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. """
  2. 本模块提供基于Http轮训的后端通用类和函数
  3. .. attention::
  4. PyWebIO 的会话状态保存在进程内,基于HTTP的会话不支持多进程部署的后端服务
  5. 比如使用 ``uWSGI`` 部署后端服务,并使用 ``--processes n`` 选项设置了多进程;
  6. 或者使用 ``nginx`` 等反向代理将流量负载到多个后端副本上。
  7. """
  8. import asyncio
  9. import fnmatch
  10. import json
  11. import logging
  12. import threading
  13. import time
  14. from contextlib import contextmanager
  15. from typing import Dict, Optional, List
  16. from collections import deque
  17. from ..page import make_applications, render_page
  18. from ..utils import deserialize_binary_event
  19. from ...session import CoroutineBasedSession, ThreadBasedSession, register_session_implement_for_target
  20. from ...session.base import get_session_info_from_headers, Session
  21. from ...utils import random_str, LRUDict, isgeneratorfunction, iscoroutinefunction, check_webio_js
  22. class HttpContext:
  23. """一次Http请求的上下文, 不同的后端框架需要根据框架提供的方法实现本类的方法
  24. The context of an Http request"""
  25. backend_name = '' # 当前使用的Web框架名
  26. def request_obj(self):
  27. """返回当前请求对象
  28. Return the current request object"""
  29. pass
  30. def request_method(self) -> str:
  31. """返回当前请求的方法,大写
  32. Return the HTTP method of the current request, uppercase"""
  33. pass
  34. def request_headers(self) -> Dict:
  35. """返回当前请求的header字典
  36. Return the header dictionary of the current request"""
  37. pass
  38. def request_url_parameter(self, name, default=None) -> str:
  39. """返回当前请求的URL参数
  40. Returns the value of the given URL parameter of the current request"""
  41. pass
  42. def request_body(self) -> bytes:
  43. """返回当前请求的body数据
  44. Returns the data of the current request body
  45. :return: bytes/bytearray
  46. """
  47. return b''
  48. def set_header(self, name, value):
  49. """为当前响应设置header
  50. Set a header for the current response"""
  51. pass
  52. def set_status(self, status):
  53. """为当前响应设置http status
  54. Set http status for the current response"""
  55. pass
  56. def set_content(self, content, json_type=False):
  57. """设置响应的内容。方法应该仅被调用一次
  58. Set the content of the response. This method should only be called once
  59. :param str/bytes/json-able content:
  60. :param bool json_type: Whether to serialize content into json str and set content-type to application/json
  61. """
  62. pass
  63. def get_response(self):
  64. """获取当前的响应对象,用于在视图函数中返回
  65. Get the current response object"""
  66. pass
  67. def get_client_ip(self) -> str:
  68. """获取用户的ip
  69. Get the user's ip"""
  70. pass
  71. logger = logging.getLogger(__name__)
  72. _event_loop = None
  73. class ReliableTransport:
  74. def __init__(self, session: Session, message_window: int = 4):
  75. self.session = session
  76. self.messages = deque()
  77. self.window_size = message_window
  78. self.min_msg_id = 0 # the id of the first message in the window
  79. self.finished_event_id = -1 # the id of the last finished event
  80. @staticmethod
  81. def close_message(ack):
  82. return dict(
  83. commands=[[dict(command='close_session')]],
  84. seq=ack + 1
  85. )
  86. def push_event(self, events: List[Dict], seq: int) -> int:
  87. """Send client events to the session and return the success message count"""
  88. if not events:
  89. return 0
  90. submit_cnt = 0
  91. for eid, event in enumerate(events, start=seq):
  92. if eid > self.finished_event_id:
  93. self.finished_event_id = eid # todo: use lock for check and set operation
  94. self.session.send_client_event(event)
  95. submit_cnt += 1
  96. return submit_cnt
  97. def get_response(self, ack=0):
  98. """
  99. ack num is the number of messages that the client has received.
  100. response is a list of messages that the client should receive, along with their min id `seq`.
  101. """
  102. while ack >= self.min_msg_id and self.messages:
  103. self.messages.popleft()
  104. self.min_msg_id += 1
  105. if len(self.messages) < self.window_size:
  106. msgs = self.session.get_task_commands()
  107. if msgs:
  108. self.messages.append(msgs)
  109. return dict(
  110. commands=list(self.messages),
  111. seq=self.min_msg_id,
  112. ack=self.finished_event_id
  113. )
  114. # todo: use lock to avoid thread race condition
  115. class HttpHandler:
  116. """基于HTTP的后端Handler实现
  117. .. note::
  118. Don't need a lock when access HttpHandler._webio_sessions, See:
  119. https://stackoverflow.com/questions/1312331/using-a-global-dictionary-with-threads-in-python
  120. """
  121. _webio_sessions = {} # WebIOSessionID -> WebIOSession()
  122. _webio_transports = {} # WebIOSessionID -> ReliableTransport(), type: Dict[str, ReliableTransport]
  123. _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp. In increasing order of last active time
  124. _webio_expire_lock = threading.Lock()
  125. _last_check_session_expire_ts = 0 # Timestamp of the last check session validation
  126. # After processing the POST request, wait for WAIT_MS_ON_POST milliseconds before generate response
  127. WAIT_MS_ON_POST = 100
  128. DEFAULT_SESSION_EXPIRE_SECONDS = 600 # Default session expiration time
  129. DEFAULT_SESSIONS_CLEANUP_INTERVAL = 300 # Default interval for clearing expired sessions (in seconds)
  130. @classmethod
  131. def _remove_expired_sessions(cls, session_expire_seconds):
  132. """清除当前会话列表中的过期会话"""
  133. logger.debug("removing expired sessions")
  134. while cls._webio_expire:
  135. sid, active_ts = cls._webio_expire.popitem(last=False) # 弹出最不活跃的session info
  136. if time.time() - active_ts < session_expire_seconds:
  137. # this session is not expired
  138. cls._webio_expire[sid] = active_ts
  139. cls._webio_expire.move_to_end(sid, last=False)
  140. break
  141. # clean this session
  142. logger.debug("session %s expired" % sid)
  143. session = cls._webio_sessions.get(sid)
  144. if session:
  145. session.close(nonblock=True)
  146. del cls._webio_sessions[sid]
  147. del cls._webio_transports[sid]
  148. @classmethod
  149. def _remove_webio_session(cls, sid):
  150. cls._webio_sessions.pop(sid, None)
  151. cls._webio_expire.pop(sid, None)
  152. def _process_cors(self, context: HttpContext):
  153. """Handling cross-domain requests: check the source of the request and set headers"""
  154. origin = context.request_headers().get('Origin', '')
  155. if self.check_origin(origin):
  156. context.set_header('Access-Control-Allow-Origin', origin)
  157. context.set_header('Access-Control-Allow-Methods', 'GET, POST')
  158. context.set_header('Access-Control-Allow-Headers', 'content-type, webio-session-id')
  159. context.set_header('Access-Control-Expose-Headers', 'webio-session-id')
  160. context.set_header('Access-Control-Max-Age', str(1440 * 60))
  161. def interval_cleaning(self):
  162. # clean up at intervals
  163. cls = type(self)
  164. need_clean = False
  165. if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
  166. with cls._webio_expire_lock:
  167. if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
  168. cls._last_check_session_expire_ts = time.time()
  169. need_clean = True
  170. if need_clean:
  171. cls._remove_expired_sessions(self.session_expire_seconds)
  172. def handle_request(self, context: HttpContext):
  173. try:
  174. with self.handle_request_context(context) as sleep_dur:
  175. if sleep_dur:
  176. time.sleep(sleep_dur)
  177. except RuntimeError:
  178. pass
  179. return context.get_response()
  180. async def handle_request_async(self, context: HttpContext):
  181. try:
  182. with self.handle_request_context(context) as sleep_dur:
  183. if sleep_dur:
  184. await asyncio.sleep(sleep_dur)
  185. except RuntimeError:
  186. pass
  187. return context.get_response()
  188. def get_cdn(self, context):
  189. if self.cdn is True and context.request_url_parameter('_pywebio_cdn', '') == 'false':
  190. return False
  191. return self.cdn
  192. def read_event_data(self, context: HttpContext) -> List[Dict]:
  193. try:
  194. if context.request_headers().get('content-type') == 'application/octet-stream':
  195. return [deserialize_binary_event(context.request_body())]
  196. return json.loads(context.request_body())
  197. except Exception:
  198. return []
  199. @contextmanager
  200. def handle_request_context(self, context: HttpContext):
  201. """called when every http request"""
  202. cls = type(self)
  203. if _event_loop:
  204. asyncio.set_event_loop(_event_loop)
  205. request_headers = context.request_headers()
  206. # CORS process start ############################
  207. if context.request_method() == 'OPTIONS': # preflight request for CORS
  208. self._process_cors(context)
  209. context.set_status(204)
  210. return context.get_response()
  211. if request_headers.get('Origin'): # set headers for CORS request
  212. self._process_cors(context)
  213. # CORS process end ############################
  214. if context.request_url_parameter('test'): # 测试接口,当会话使用基于http的backend时,返回 ok
  215. context.set_content('ok')
  216. return context.get_response()
  217. # 对首页HTML的请求
  218. if 'webio-session-id' not in request_headers:
  219. app = self.app_loader(context)
  220. html = render_page(app, protocol='http', cdn=self.get_cdn(context))
  221. context.set_content(html)
  222. return context.get_response()
  223. ack = int(context.request_url_parameter('ack', 0))
  224. webio_session_id = request_headers['webio-session-id']
  225. new_request = False
  226. if webio_session_id.startswith('NEW-'):
  227. new_request = True
  228. webio_session_id = webio_session_id[4:]
  229. if new_request and webio_session_id not in cls._webio_sessions: # 初始请求,创建新 Session
  230. if context.request_method() == 'POST': # 不能在POST请求中创建Session,防止CSRF攻击
  231. context.set_status(403)
  232. return context.get_response()
  233. session_info = get_session_info_from_headers(context.request_headers())
  234. session_info['user_ip'] = context.get_client_ip()
  235. session_info['request'] = context.request_obj()
  236. session_info['backend'] = context.backend_name
  237. session_info['protocol'] = 'http'
  238. application = self.app_loader(context)
  239. if iscoroutinefunction(application) or isgeneratorfunction(application):
  240. session_cls = CoroutineBasedSession
  241. else:
  242. session_cls = ThreadBasedSession
  243. webio_session = session_cls(application, session_info=session_info)
  244. cls._webio_sessions[webio_session_id] = webio_session
  245. cls._webio_transports[webio_session_id] = ReliableTransport(webio_session)
  246. yield cls.WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <---
  247. elif webio_session_id not in cls._webio_sessions: # WebIOSession deleted
  248. close_msg = ReliableTransport.close_message(ack)
  249. context.set_content(close_msg, json_type=True)
  250. return context.get_response()
  251. else:
  252. # in this case, the request_headers['webio-session-id'] may also startswith NEW,
  253. # this is because the response for the previous new session request has not been received by the client,
  254. # and the client has sent a new request with the same session id.
  255. webio_session = cls._webio_sessions[webio_session_id]
  256. if context.request_method() == 'POST': # client push event
  257. seq = int(context.request_url_parameter('seq', 0))
  258. event_data = self.read_event_data(context)
  259. submit_cnt = cls._webio_transports[webio_session_id].push_event(event_data, seq)
  260. if submit_cnt > 0:
  261. yield type(self).WAIT_MS_ON_POST / 1000.0 # <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <--- <---
  262. elif context.request_method() == 'GET': # client pull messages
  263. pass
  264. cls._webio_expire[webio_session_id] = time.time()
  265. self.interval_cleaning()
  266. resp = cls._webio_transports[webio_session_id].get_response(ack)
  267. context.set_content(resp, json_type=True)
  268. if webio_session.closed():
  269. self._remove_webio_session(webio_session_id)
  270. return context.get_response()
  271. def __init__(self, applications=None, app_loader=None,
  272. cdn=True,
  273. session_expire_seconds=None,
  274. session_cleanup_interval=None,
  275. allowed_origins=None, check_origin=None):
  276. """Get the view function for running PyWebIO applications in Web framework.
  277. The view communicates with the client by HTTP protocol.
  278. :param callable app_loader: PyWebIO app factory, which receives the HttpContext instance as the parameter.
  279. Can not use `app_loader` and `applications` at the same time.
  280. The rest arguments of the constructor have the same meaning as for :func:`pywebio.platform.flask.start_server()`
  281. """
  282. check_webio_js()
  283. cls = type(self)
  284. self.cdn = cdn
  285. self.check_origin = check_origin
  286. self.session_expire_seconds = session_expire_seconds or cls.DEFAULT_SESSION_EXPIRE_SECONDS
  287. self.session_cleanup_interval = session_cleanup_interval or cls.DEFAULT_SESSIONS_CLEANUP_INTERVAL
  288. assert applications is not None or app_loader is not None
  289. if applications is not None:
  290. applications = make_applications(applications)
  291. def get_app(context):
  292. app_name = context.request_url_parameter('app', 'index')
  293. return applications.get(app_name) or applications['index']
  294. self.app_loader = app_loader or get_app
  295. for target in (applications or {}).values():
  296. register_session_implement_for_target(target)
  297. if check_origin is None:
  298. self.check_origin = lambda origin: any(
  299. fnmatch.fnmatch(origin, pattern)
  300. for pattern in (allowed_origins or [])
  301. )
  302. def run_event_loop(debug=False):
  303. """run asyncio event loop
  304. See also: :ref:`Integration coroutine-based session with Web framework <coroutine_web_integration>`
  305. :param debug: Set the debug mode of the event loop.
  306. See also: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
  307. """
  308. global _event_loop
  309. CoroutineBasedSession.event_loop_thread_id = threading.current_thread().ident
  310. _event_loop = asyncio.new_event_loop()
  311. _event_loop.set_debug(debug)
  312. asyncio.set_event_loop(_event_loop)
  313. _event_loop.run_forever()