tornado.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. import asyncio
  2. import fnmatch
  3. import json
  4. import logging
  5. import os
  6. import threading
  7. import webbrowser
  8. from functools import partial
  9. from urllib.parse import urlparse
  10. import tornado
  11. import tornado.httpserver
  12. import tornado.ioloop
  13. from tornado.web import StaticFileHandler
  14. from tornado.websocket import WebSocketHandler
  15. from ..session import CoroutineBasedSession, ThreadBasedSession, ScriptModeSession, \
  16. register_session_implement_for_target, AbstractSession
  17. from ..utils import get_free_port, wait_host_port, STATIC_PATH
  18. logger = logging.getLogger(__name__)
  19. def _check_origin(origin, allowed_origins, handler: WebSocketHandler):
  20. if _is_same_site(origin, handler):
  21. return True
  22. return any(
  23. fnmatch.fnmatch(origin, patten)
  24. for patten in allowed_origins
  25. )
  26. def _is_same_site(origin, handler: WebSocketHandler):
  27. parsed_origin = urlparse(origin)
  28. origin = parsed_origin.netloc
  29. origin = origin.lower()
  30. host = handler.request.headers.get("Host")
  31. # Check to see that origin matches host directly, including ports
  32. return origin == host
  33. def _webio_handler(target, session_cls, check_origin_func=_is_same_site):
  34. """获取用于Tornado进行整合的RequestHandle类
  35. :param target: 任务函数
  36. :param session_cls: 会话实现类
  37. :param callable check_origin_func: check_origin_func(origin, handler) -> bool
  38. :return: Tornado RequestHandle类
  39. """
  40. class WSHandler(WebSocketHandler):
  41. def check_origin(self, origin):
  42. return check_origin_func(origin=origin, handler=self)
  43. def get_compression_options(self):
  44. # Non-None enables compression with default options.
  45. return {}
  46. def send_msg_to_client(self, session: AbstractSession):
  47. for msg in session.get_task_commands():
  48. self.write_message(json.dumps(msg))
  49. def open(self):
  50. logger.debug("WebSocket opened")
  51. self.set_nodelay(True)
  52. self._close_from_session_tag = False # 由session主动关闭连接
  53. if session_cls is CoroutineBasedSession:
  54. self.session = CoroutineBasedSession(target, on_task_command=self.send_msg_to_client,
  55. on_session_close=self.close_from_session)
  56. elif session_cls is ThreadBasedSession:
  57. self.session = ThreadBasedSession(target, on_task_command=self.send_msg_to_client,
  58. on_session_close=self.close_from_session,
  59. loop=asyncio.get_event_loop())
  60. else:
  61. raise RuntimeError("Don't support session type:%s" % session_cls)
  62. def on_message(self, message):
  63. data = json.loads(message)
  64. if data is not None:
  65. self.session.send_client_event(data)
  66. def close_from_session(self):
  67. self._close_from_session_tag = True
  68. self.close()
  69. def on_close(self):
  70. if not self._close_from_session_tag: # 只有在由客户端主动断开连接时,才调用 session.close()
  71. self.session.close()
  72. logger.debug("WebSocket closed")
  73. return WSHandler
  74. def webio_handler(target, allowed_origins=None, check_origin=None):
  75. """获取在Tornado中运行PyWebIO任务的RequestHandle类。RequestHandle类基于WebSocket协议与浏览器进行通讯。
  76. :param target: 任务函数。任务函数为协程函数时,使用 :ref:`基于协程的会话实现 <coroutine_based_session>` ;任务函数为普通函数时,使用基于线程的会话实现。
  77. :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
  78. 来源包含协议和域名和端口部分,允许使用 Unix shell 风格的匹配模式:
  79. - ``*`` 为通配符
  80. - ``?`` 匹配单个字符
  81. - ``[seq]`` 匹配seq内的字符
  82. - ``[!seq]`` 匹配不在seq内的字符
  83. 比如 ``https://*.example.com`` 、 ``*://*.example.com`` 、
  84. :param callable check_origin: 请求来源检查函数。接收请求来源(包含协议和域名和端口部分)字符串,
  85. 返回 ``True/False`` 。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略
  86. :return: Tornado RequestHandle类
  87. """
  88. session_cls = register_session_implement_for_target(target)
  89. if check_origin is None:
  90. check_origin_func = partial(_check_origin, allowed_origins=allowed_origins or [])
  91. else:
  92. check_origin_func = lambda origin, handler: _is_same_site(origin, handler) or check_origin(origin)
  93. return _webio_handler(target=target, session_cls=session_cls, check_origin_func=check_origin_func)
  94. async def open_webbrowser_on_server_started(host, port):
  95. url = 'http://%s:%s' % (host, port)
  96. is_open = await wait_host_port(host, port, duration=5, delay=0.5)
  97. if is_open:
  98. logger.info('Openning %s' % url)
  99. webbrowser.open(url)
  100. else:
  101. logger.error('Open %s failed.' % url)
  102. def _setup_server(webio_handler, port=0, host='', **tornado_app_settings):
  103. if port == 0:
  104. port = get_free_port()
  105. print('Listen on %s:%s' % (host or '0.0.0.0', port))
  106. handlers = [(r"/io", webio_handler),
  107. (r"/(.*)", StaticFileHandler, {"path": STATIC_PATH, 'default_filename': 'index.html'})]
  108. app = tornado.web.Application(handlers=handlers, **tornado_app_settings)
  109. server = app.listen(port, address=host)
  110. return server, port
  111. def start_server(target, port=0, host='', debug=False,
  112. allowed_origins=None, check_origin=None,
  113. auto_open_webbrowser=False,
  114. websocket_max_message_size=None,
  115. websocket_ping_interval=None,
  116. websocket_ping_timeout=None,
  117. **tornado_app_settings):
  118. """启动一个 Tornado server 将 ``target`` 任务函数作为Web服务提供。
  119. :param target: 任务函数。任务函数为协程函数时,使用 :ref:`基于协程的会话实现 <coroutine_based_session>` ;任务函数为普通函数时,使用基于线程的会话实现。
  120. :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
  121. :param int port: server bind port. set ``0`` to find a free port number to use
  122. :param str host: server bind host. ``host`` may be either an IP address or hostname. If it's a hostname,
  123. the server will listen on all IP addresses associated with the name.
  124. set empty string or to listen on all available interfaces.
  125. :param bool debug: Tornado debug mode
  126. :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
  127. 来源包含协议和域名和端口部分,允许使用 Unix shell 风格的匹配模式:
  128. - ``*`` 为通配符
  129. - ``?`` 匹配单个字符
  130. - ``[seq]`` 匹配seq内的字符
  131. - ``[!seq]`` 匹配不在seq内的字符
  132. 比如 ``https://*.example.com`` 、 ``*://*.example.com``
  133. :param callable check_origin: 请求来源检查函数。接收请求来源(包含协议和域名和端口部分)字符串,
  134. 返回 ``True/False`` 。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略
  135. :param bool auto_open_webbrowser: Whether or not auto open web browser when server is started (if the operating system allows it) .
  136. :param int websocket_max_message_size: Max bytes of a message which Tornado can accept.
  137. Messages larger than the ``websocket_max_message_size`` (default 10MiB) will not be accepted.
  138. :param int websocket_ping_interval: If set to a number, all websockets will be pinged every n seconds.
  139. This can help keep the connection alive through certain proxy servers which close idle connections,
  140. and it can detect if the websocket has failed without being properly closed.
  141. :param int websocket_ping_timeout: If the ping interval is set, and the server doesn’t receive a ‘pong’
  142. in this many seconds, it will close the websocket. The default is three times the ping interval,
  143. with a minimum of 30 seconds. Ignored if ``websocket_ping_interval`` is not set.
  144. :param tornado_app_settings: Additional keyword arguments passed to the constructor of ``tornado.web.Application``.
  145. ref: https://www.tornadoweb.org/en/stable/web.html#tornado.web.Application.settings
  146. """
  147. kwargs = locals()
  148. app_options = ['debug', 'websocket_max_message_size', 'websocket_ping_interval', 'websocket_ping_timeout']
  149. for opt in app_options:
  150. if kwargs[opt] is not None:
  151. tornado_app_settings[opt] = kwargs[opt]
  152. handler = webio_handler(target, allowed_origins=allowed_origins, check_origin=check_origin)
  153. _, port = _setup_server(webio_handler=handler, port=port, host=host, **tornado_app_settings)
  154. if auto_open_webbrowser:
  155. tornado.ioloop.IOLoop.current().spawn_callback(open_webbrowser_on_server_started, host or 'localhost', port)
  156. tornado.ioloop.IOLoop.current().start()
  157. def start_server_in_current_thread_session():
  158. """启动 script mode 的server,监听可用端口,并自动打开浏览器
  159. PYWEBIO_SCRIPT_MODE_PORT环境变量可以设置监听端口,并关闭自动打开浏览器,用于测试
  160. """
  161. websocket_conn_opened = threading.Event()
  162. thread = threading.current_thread()
  163. class SingleSessionWSHandler(_webio_handler(target=None, session_cls=None)):
  164. session = None
  165. instance = None
  166. def open(self):
  167. self.main_session = False
  168. if SingleSessionWSHandler.session is None:
  169. self.main_session = True
  170. SingleSessionWSHandler.instance = self
  171. SingleSessionWSHandler.session = ScriptModeSession(thread,
  172. on_task_command=self.send_msg_to_client,
  173. loop=asyncio.get_event_loop())
  174. websocket_conn_opened.set()
  175. else:
  176. self.close()
  177. def on_close(self):
  178. if SingleSessionWSHandler.session is not None and self.main_session:
  179. self.session.close()
  180. logger.debug('ScriptModeSession closed')
  181. async def wait_to_stop_loop(server):
  182. """当只剩当前线程和Daemon线程运行时,关闭Server"""
  183. alive_none_daemonic_thread_cnt = None # 包括当前线程在内的非Daemon线程数
  184. while alive_none_daemonic_thread_cnt != 1:
  185. alive_none_daemonic_thread_cnt = sum(
  186. 1 for t in threading.enumerate() if t.is_alive() and not t.isDaemon()
  187. )
  188. await asyncio.sleep(1)
  189. # 关闭Websocket连接
  190. if SingleSessionWSHandler.instance:
  191. SingleSessionWSHandler.instance.close()
  192. server.stop()
  193. logger.debug('Closing tornado ioloop...')
  194. tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task() and not t.done()]
  195. for task in tasks: task.cancel()
  196. # 必须需要 await asyncio.sleep ,否则 t.cancel() 调用无法调度生效
  197. await asyncio.sleep(0)
  198. tornado.ioloop.IOLoop.current().stop()
  199. def server_thread():
  200. loop = asyncio.new_event_loop()
  201. asyncio.set_event_loop(loop)
  202. port = 0
  203. if os.environ.get("PYWEBIO_SCRIPT_MODE_PORT"):
  204. port = int(os.environ.get("PYWEBIO_SCRIPT_MODE_PORT"))
  205. server, port = _setup_server(webio_handler=SingleSessionWSHandler, port=port, host='localhost')
  206. tornado.ioloop.IOLoop.current().spawn_callback(partial(wait_to_stop_loop, server=server))
  207. if "PYWEBIO_SCRIPT_MODE_PORT" not in os.environ:
  208. tornado.ioloop.IOLoop.current().spawn_callback(open_webbrowser_on_server_started, 'localhost', port)
  209. tornado.ioloop.IOLoop.current().start()
  210. logger.debug('Tornado server exit')
  211. t = threading.Thread(target=server_thread, name='Tornado-server')
  212. t.start()
  213. websocket_conn_opened.wait()