httpbased.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. """
  2. 本模块提供基于Http轮训的后端通用类和函数
  3. .. attention::
  4. PyWebIO 的会话状态保存在进程内,基于HTTP的会话不支持多进程部署的后端服务
  5. 比如使用 ``uWSGI`` 部署后端服务,并使用 ``--processes n`` 选项设置了多进程;
  6. 或者使用 ``nginx`` 等反向代理将流量负载到多个后端副本上。
  7. """
  8. import asyncio
  9. import fnmatch
  10. import logging
  11. import threading
  12. import time
  13. from typing import Dict
  14. from .utils import make_applications, render_page
  15. from ..session import CoroutineBasedSession, Session, ThreadBasedSession, register_session_implement_for_target
  16. from ..session.base import get_session_info_from_headers
  17. from ..utils import random_str, LRUDict, isgeneratorfunction, iscoroutinefunction, check_webio_js
  18. class HttpContext:
  19. """一次Http请求的上下文, 不同的后端框架需要根据框架提供的方法实现本类的方法
  20. The context of an Http request"""
  21. backend_name = '' # 当前使用的Web框架名
  22. def request_obj(self):
  23. """返回当前请求对象
  24. Return the current request object"""
  25. pass
  26. def request_method(self):
  27. """返回当前请求的方法,大写
  28. Return the HTTP method of the current request, uppercase"""
  29. pass
  30. def request_headers(self) -> dict:
  31. """返回当前请求的header字典
  32. Return the header dictionary of the current request"""
  33. pass
  34. def request_url_parameter(self, name, default=None):
  35. """返回当前请求的URL参数
  36. Returns the value of the given URL parameter of the current request"""
  37. pass
  38. def request_json(self) -> dict:
  39. """返回当前请求的json反序列化后的内容,若请求数据不为json格式,返回None
  40. Return the data (json deserialization) of the currently requested, if the data is not in json format, return None"""
  41. pass
  42. def set_header(self, name, value):
  43. """为当前响应设置header
  44. Set a header for the current response"""
  45. pass
  46. def set_status(self, status):
  47. """为当前响应设置http status
  48. Set http status for the current response"""
  49. pass
  50. def set_content(self, content, json_type=False):
  51. """设置响应的内容。方法应该仅被调用一次
  52. Set the content of the response. This method should only be called once
  53. :param str/bytes/json-able content:
  54. :param bool json_type: Whether to serialize content into json str and set content-type to application/json
  55. """
  56. pass
  57. def get_response(self):
  58. """获取当前的响应对象,用于在视图函数中返回
  59. Get the current response object"""
  60. pass
  61. def get_client_ip(self):
  62. """获取用户的ip
  63. Get the user's ip"""
  64. pass
  65. logger = logging.getLogger(__name__)
  66. _event_loop = None
  67. # todo: use lock to avoid thread race condition
  68. class HttpHandler:
  69. """基于HTTP的后端Handler实现
  70. .. note::
  71. Don't need a lock when access HttpHandler._webio_sessions, See:
  72. https://stackoverflow.com/questions/1312331/using-a-global-dictionary-with-threads-in-python
  73. """
  74. # type: Dict[str, Session]
  75. _webio_sessions = {} # WebIOSessionID -> WebIOSession()
  76. _webio_expire = LRUDict() # WebIOSessionID -> last active timestamp. In increasing order of last active time
  77. _webio_expire_lock = threading.Lock()
  78. _last_check_session_expire_ts = 0 # Timestamp of the last check session validation
  79. # After processing the POST request, wait for WAIT_MS_ON_POST milliseconds before generate response
  80. WAIT_MS_ON_POST = 100
  81. DEFAULT_SESSION_EXPIRE_SECONDS = 600 # Default session expiration time
  82. DEFAULT_SESSIONS_CLEANUP_INTERVAL = 300 # Default interval for clearing expired sessions (in seconds)
  83. @classmethod
  84. def _remove_expired_sessions(cls, session_expire_seconds):
  85. """清除当前会话列表中的过期会话"""
  86. logger.debug("removing expired sessions")
  87. while cls._webio_expire:
  88. sid, active_ts = cls._webio_expire.popitem(last=False) # 弹出最不活跃的session info
  89. if time.time() - active_ts < session_expire_seconds:
  90. # this session is not expired
  91. cls._webio_expire[sid] = active_ts
  92. cls._webio_expire.move_to_end(sid, last=False)
  93. break
  94. # clean this session
  95. logger.debug("session %s expired" % sid)
  96. session = cls._webio_sessions.get(sid)
  97. if session:
  98. session.close()
  99. del cls._webio_sessions[sid]
  100. @classmethod
  101. def _remove_webio_session(cls, sid):
  102. cls._webio_sessions.pop(sid, None)
  103. cls._webio_expire.pop(sid, None)
  104. def _process_cors(self, context: HttpContext):
  105. """Handling cross-domain requests: check the source of the request and set headers"""
  106. origin = context.request_headers().get('Origin', '')
  107. if self.check_origin(origin):
  108. context.set_header('Access-Control-Allow-Origin', origin)
  109. context.set_header('Access-Control-Allow-Methods', 'GET, POST')
  110. context.set_header('Access-Control-Allow-Headers', 'content-type, webio-session-id')
  111. context.set_header('Access-Control-Expose-Headers', 'webio-session-id')
  112. context.set_header('Access-Control-Max-Age', str(1440 * 60))
  113. def interval_cleaning(self):
  114. # clean up at intervals
  115. cls = type(self)
  116. need_clean = False
  117. with cls._webio_expire_lock:
  118. if time.time() - cls._last_check_session_expire_ts > self.session_cleanup_interval:
  119. cls._last_check_session_expire_ts = time.time()
  120. need_clean = True
  121. if need_clean:
  122. cls._remove_expired_sessions(self.session_expire_seconds)
  123. def handle_request(self, context: HttpContext):
  124. """called when every http request"""
  125. cls = type(self)
  126. if _event_loop:
  127. asyncio.set_event_loop(_event_loop)
  128. request_headers = context.request_headers()
  129. # CORS process start ############################
  130. if context.request_method() == 'OPTIONS': # preflight request for CORS
  131. self._process_cors(context)
  132. context.set_status(204)
  133. return context.get_response()
  134. if request_headers.get('Origin'): # set headers for CORS request
  135. self._process_cors(context)
  136. # CORS process end ############################
  137. if context.request_url_parameter('test'): # 测试接口,当会话使用基于http的backend时,返回 ok
  138. context.set_content('ok')
  139. return context.get_response()
  140. # 对首页HTML的请求
  141. if 'webio-session-id' not in request_headers:
  142. app_name = context.request_url_parameter('app', 'index')
  143. app = self.applications.get(app_name) or self.applications['index']
  144. html = render_page(app, protocol='http', cdn=self.cdn)
  145. context.set_content(html)
  146. return context.get_response()
  147. webio_session_id = None
  148. # 初始请求,创建新 Session
  149. if not request_headers['webio-session-id'] or request_headers['webio-session-id'] == 'NEW':
  150. if context.request_method() == 'POST': # 不能在POST请求中创建Session,防止CSRF攻击
  151. context.set_status(403)
  152. return context.get_response()
  153. webio_session_id = random_str(24)
  154. context.set_header('webio-session-id', webio_session_id)
  155. session_info = get_session_info_from_headers(context.request_headers())
  156. session_info['user_ip'] = context.get_client_ip()
  157. session_info['request'] = context.request_obj()
  158. session_info['backend'] = context.backend_name
  159. session_info['protocol'] = 'http'
  160. app_name = context.request_url_parameter('app', 'index')
  161. application = self.applications.get(app_name) or self.applications['index']
  162. if iscoroutinefunction(application) or isgeneratorfunction(application):
  163. session_cls = CoroutineBasedSession
  164. else:
  165. session_cls = ThreadBasedSession
  166. webio_session = session_cls(application, session_info=session_info)
  167. cls._webio_sessions[webio_session_id] = webio_session
  168. time.sleep(cls.WAIT_MS_ON_POST / 1000.0) # 等待session输出完毕
  169. elif request_headers['webio-session-id'] not in cls._webio_sessions: # WebIOSession deleted
  170. context.set_content([dict(command='close_session')], json_type=True)
  171. return context.get_response()
  172. else:
  173. webio_session_id = request_headers['webio-session-id']
  174. webio_session = cls._webio_sessions[webio_session_id]
  175. if context.request_method() == 'POST': # client push event
  176. if context.request_json() is not None:
  177. webio_session.send_client_event(context.request_json())
  178. time.sleep(cls.WAIT_MS_ON_POST / 1000.0) # 等待session输出完毕
  179. elif context.request_method() == 'GET': # client pull messages
  180. pass
  181. cls._webio_expire[webio_session_id] = time.time()
  182. self.interval_cleaning()
  183. context.set_content(webio_session.get_task_commands(), json_type=True)
  184. if webio_session.closed():
  185. self._remove_webio_session(webio_session_id)
  186. return context.get_response()
  187. def __init__(self, applications, cdn,
  188. session_expire_seconds=None,
  189. session_cleanup_interval=None,
  190. allowed_origins=None, check_origin=None):
  191. """Get the view function for running PyWebIO applications in Web framework.
  192. The view communicates with the client by HTTP protocol.
  193. The arguments of the constructor have the same meaning as for :func:`pywebio.platform.flask.start_server()`
  194. """
  195. check_webio_js()
  196. cls = type(self)
  197. self.cdn = cdn
  198. self.applications = make_applications(applications)
  199. self.check_origin = check_origin
  200. self.session_expire_seconds = session_expire_seconds or cls.DEFAULT_SESSION_EXPIRE_SECONDS
  201. self.session_cleanup_interval = session_cleanup_interval or cls.DEFAULT_SESSIONS_CLEANUP_INTERVAL
  202. for target in self.applications.values():
  203. register_session_implement_for_target(target)
  204. if check_origin is None:
  205. self.check_origin = lambda origin: any(
  206. fnmatch.fnmatch(origin, patten)
  207. for patten in (allowed_origins or [])
  208. )
  209. def run_event_loop(debug=False):
  210. """run asyncio event loop
  211. See also: :ref:`Integration coroutine-based session with Web framework <coroutine_web_integration>`
  212. :param debug: Set the debug mode of the event loop.
  213. See also: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode
  214. """
  215. global _event_loop
  216. CoroutineBasedSession.event_loop_thread_id = threading.current_thread().ident
  217. _event_loop = asyncio.new_event_loop()
  218. _event_loop.set_debug(debug)
  219. asyncio.set_event_loop(_event_loop)
  220. _event_loop.run_forever()