1
0

tornado.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. import asyncio
  2. import fnmatch
  3. import json
  4. import logging
  5. import os
  6. import threading
  7. import webbrowser
  8. from functools import partial
  9. from urllib.parse import urlparse
  10. import tornado
  11. import tornado.httpserver
  12. import tornado.ioloop
  13. from tornado.web import StaticFileHandler
  14. from tornado.websocket import WebSocketHandler
  15. from .utils import make_applications, render_page, cdn_validation
  16. from ..session import CoroutineBasedSession, ThreadBasedSession, ScriptModeSession, \
  17. register_session_implement_for_target, Session
  18. from ..session.base import get_session_info_from_headers
  19. from ..utils import get_free_port, wait_host_port, STATIC_PATH, iscoroutinefunction, isgeneratorfunction, check_webio_js
  20. logger = logging.getLogger(__name__)
  21. _ioloop = None
  22. def ioloop() -> tornado.ioloop.IOLoop:
  23. """获得运行Tornado server的IOLoop
  24. 本方法当前仅在显示boken app时使用
  25. This method is currently only used when displaying boken app"""
  26. global _ioloop
  27. return _ioloop
  28. def _check_origin(origin, allowed_origins, handler: WebSocketHandler):
  29. if _is_same_site(origin, handler):
  30. return True
  31. return any(
  32. fnmatch.fnmatch(origin, patten)
  33. for patten in allowed_origins
  34. )
  35. def _is_same_site(origin, handler: WebSocketHandler):
  36. parsed_origin = urlparse(origin)
  37. origin = parsed_origin.netloc
  38. origin = origin.lower()
  39. host = handler.request.headers.get("Host")
  40. # Check to see that origin matches host directly, including ports
  41. return origin == host
  42. def _webio_handler(applications, cdn, check_origin_func=_is_same_site):
  43. """
  44. :param dict applications: dict of `name -> task function`
  45. :param bool/str cdn: Whether to load front-end static resources from CDN
  46. :param callable check_origin_func: check_origin_func(origin, handler) -> bool
  47. :return: Tornado RequestHandler class
  48. """
  49. check_webio_js()
  50. class WSHandler(WebSocketHandler):
  51. async def get(self, *args, **kwargs) -> None:
  52. # It's a simple http GET request
  53. if self.request.headers.get("Upgrade", "").lower() != "websocket":
  54. # Backward compatible
  55. if self.get_query_argument('test', ''):
  56. return self.write('')
  57. app_name = self.get_query_argument('app', 'index')
  58. app = applications.get(app_name) or applications['index']
  59. html = render_page(app, protocol='ws', cdn=cdn)
  60. return self.write(html)
  61. else:
  62. await super().get()
  63. def check_origin(self, origin):
  64. return check_origin_func(origin=origin, handler=self)
  65. def get_compression_options(self):
  66. # Non-None enables compression with default options.
  67. return {}
  68. def send_msg_to_client(self, session: Session):
  69. for msg in session.get_task_commands():
  70. self.write_message(json.dumps(msg))
  71. def open(self):
  72. logger.debug("WebSocket opened")
  73. # self.set_nodelay(True)
  74. # 由session主动关闭连接
  75. # connection is closed from session
  76. self._close_from_session_tag = False
  77. session_info = get_session_info_from_headers(self.request.headers)
  78. session_info['user_ip'] = self.request.remote_ip
  79. session_info['request'] = self.request
  80. session_info['backend'] = 'tornado'
  81. app_name = self.get_query_argument('app', 'index')
  82. application = applications.get(app_name) or applications['index']
  83. if iscoroutinefunction(application) or isgeneratorfunction(application):
  84. self.session = CoroutineBasedSession(application, session_info=session_info,
  85. on_task_command=self.send_msg_to_client,
  86. on_session_close=self.close_from_session)
  87. else:
  88. self.session = ThreadBasedSession(application, session_info=session_info,
  89. on_task_command=self.send_msg_to_client,
  90. on_session_close=self.close_from_session,
  91. loop=asyncio.get_event_loop())
  92. def on_message(self, message):
  93. data = json.loads(message)
  94. if data is not None:
  95. self.session.send_client_event(data)
  96. def close_from_session(self):
  97. self._close_from_session_tag = True
  98. self.close()
  99. def on_close(self):
  100. # Session.close() is called only when connection is closed from the client.
  101. # 只有在由客户端主动断开连接时,才调用 session.close()
  102. if not self._close_from_session_tag:
  103. self.session.close()
  104. logger.debug("WebSocket closed")
  105. return WSHandler
  106. def webio_handler(applications, cdn=True, allowed_origins=None, check_origin=None):
  107. """Get the ``RequestHandler`` class for running PyWebIO applications in Tornado.
  108. The ``RequestHandler`` communicates with the browser by WebSocket protocol.
  109. The arguments of ``webio_handler()`` have the same meaning as for :func:`pywebio.platform.tornado.start_server`
  110. """
  111. applications = make_applications(applications)
  112. for target in applications.values():
  113. register_session_implement_for_target(target)
  114. cdn = cdn_validation(cdn, 'error')
  115. if check_origin is None:
  116. check_origin_func = partial(_check_origin, allowed_origins=allowed_origins or [])
  117. else:
  118. check_origin_func = lambda origin, handler: _is_same_site(origin, handler) or check_origin(origin)
  119. return _webio_handler(applications=applications, cdn=cdn, check_origin_func=check_origin_func)
  120. async def open_webbrowser_on_server_started(host, port):
  121. url = 'http://%s:%s' % (host, port)
  122. is_open = await wait_host_port(host, port, duration=20)
  123. if is_open:
  124. logger.info('Try open %s in web browser' % url)
  125. webbrowser.open(url)
  126. else:
  127. logger.error('Open %s failed.' % url)
  128. def _setup_server(webio_handler, port=0, host='', **tornado_app_settings):
  129. if port == 0:
  130. port = get_free_port()
  131. handlers = [(r"/", webio_handler),
  132. (r"/(.*)", StaticFileHandler, {"path": STATIC_PATH, 'default_filename': 'index.html'})]
  133. app = tornado.web.Application(handlers=handlers, **tornado_app_settings)
  134. server = app.listen(port, address=host)
  135. return server, port
  136. def start_server(applications, port=0, host='',
  137. debug=False, cdn=True,
  138. allowed_origins=None, check_origin=None,
  139. auto_open_webbrowser=False,
  140. websocket_max_message_size=None,
  141. websocket_ping_interval=None,
  142. websocket_ping_timeout=None,
  143. **tornado_app_settings):
  144. """Start a Tornado server to provide the PyWebIO application as a web service.
  145. Tornado is the default backend server for PyWebIO applications,
  146. and ``start_server`` can be imported directly using ``from pywebio import start_server``.
  147. :param list/dict/callable applications: PyWebIO application.
  148. Can be a task function, a list of functions, or a dictionary.
  149. When it is a dictionary, whose key is task name and value is task function.
  150. When it is a list, using function name as task name.
  151. You can select the task to run through the ``app`` URL parameter (for example, visit ``http://host:port/?app=foo`` to run the ``foo`` task),
  152. By default, the ``index`` task function is used. When the ``index`` task does not exist, PyWebIO will provide a default index home page.
  153. See also :ref:`Server mode <server_and_script_mode>`
  154. When the task function is a coroutine function, use :ref:`Coroutine-based session <coroutine_based_session>` implementation,
  155. otherwise, use thread-based session implementation.
  156. :param int port: The port the server listens on.
  157. When set to ``0``, the server will automatically select a available port.
  158. :param str host: The host the server listens on. ``host`` may be either an IP address or hostname. If it’s a hostname, the server will listen on all IP addresses associated with the name. ``host`` may be an empty string or None to listen on all available interfaces.
  159. :param bool debug: Tornado Server's debug mode. If enabled, the server will automatically reload for code changes.
  160. See `tornado doc <https://www.tornadoweb.org/en/stable/guide/running.html#debug-mode>`_ for more detail.
  161. :param bool/str cdn: Whether to load front-end static resources from CDN, the default is ``True``.
  162. Can also use a string to directly set the url of PyWebIO static resources.
  163. :param list allowed_origins: The allowed request source list. (The current server host is always allowed)
  164. The source contains the protocol, domain name, and port part.
  165. Can use Unix shell-style wildcards:
  166. - ``*`` matches everything
  167. - ``?`` matches any single character
  168. - ``[seq]`` matches any character in *seq*
  169. - ``[!seq]`` matches any character not in *seq*
  170. Such as: ``https://*.example.com`` 、 ``*://*.example.com``
  171. For detail, see `Python Doc <https://docs.python.org/zh-tw/3/library/fnmatch.html>`_
  172. :param callable check_origin: The validation function for request source.
  173. It receives the source string (which contains protocol, host, and port parts) as parameter and return ``True/False`` to indicate that the server accepts/rejects the request.
  174. If ``check_origin`` is set, the ``allowed_origins`` parameter will be ignored.
  175. :param bool auto_open_webbrowser: Whether or not auto open web browser when server is started (if the operating system allows it) .
  176. :param int websocket_max_message_size: Max bytes of a message which Tornado can accept.
  177. Messages larger than the ``websocket_max_message_size`` (default 10MiB) will not be accepted.
  178. :param int websocket_ping_interval: If set to a number, all websockets will be pinged every n seconds.
  179. This can help keep the connection alive through certain proxy servers which close idle connections,
  180. and it can detect if the websocket has failed without being properly closed.
  181. :param int websocket_ping_timeout: If the ping interval is set, and the server doesn’t receive a ‘pong’
  182. in this many seconds, it will close the websocket. The default is three times the ping interval,
  183. with a minimum of 30 seconds. Ignored if ``websocket_ping_interval`` is not set.
  184. :param tornado_app_settings: Additional keyword arguments passed to the constructor of ``tornado.web.Application``.
  185. For details, please refer: https://www.tornadoweb.org/en/stable/web.html#tornado.web.Application.settings
  186. """
  187. kwargs = locals()
  188. global _ioloop
  189. _ioloop = tornado.ioloop.IOLoop.current()
  190. app_options = ['debug', 'websocket_max_message_size', 'websocket_ping_interval', 'websocket_ping_timeout']
  191. for opt in app_options:
  192. if kwargs[opt] is not None:
  193. tornado_app_settings[opt] = kwargs[opt]
  194. cdn = cdn_validation(cdn, 'warn')
  195. handler = webio_handler(applications, cdn, allowed_origins=allowed_origins, check_origin=check_origin)
  196. _, port = _setup_server(webio_handler=handler, port=port, host=host, **tornado_app_settings)
  197. print('Listen on %s:%s' % (host or '0.0.0.0', port))
  198. if auto_open_webbrowser:
  199. tornado.ioloop.IOLoop.current().spawn_callback(open_webbrowser_on_server_started, host or 'localhost', port)
  200. tornado.ioloop.IOLoop.current().start()
  201. def start_server_in_current_thread_session():
  202. """启动 script mode 的server,监听可用端口,并自动打开浏览器
  203. Start the server for script mode, and automatically open the browser when the server port is available.
  204. PYWEBIO_SCRIPT_MODE_PORT环境变量可以设置监听端口,并关闭自动打开浏览器,用于测试
  205. The PYWEBIO_SCRIPT_MODE_PORT environment variable can set the listening port, just used in testing.
  206. """
  207. websocket_conn_opened = threading.Event()
  208. thread = threading.current_thread()
  209. mock_apps = dict(index=lambda: None)
  210. class SingleSessionWSHandler(_webio_handler(applications=mock_apps, cdn=False)):
  211. session = None
  212. instance = None
  213. def open(self):
  214. self.main_session = False
  215. if SingleSessionWSHandler.session is None:
  216. self.main_session = True
  217. SingleSessionWSHandler.instance = self
  218. session_info = get_session_info_from_headers(self.request.headers)
  219. session_info['user_ip'] = self.request.remote_ip
  220. session_info['request'] = self.request
  221. session_info['backend'] = 'tornado'
  222. SingleSessionWSHandler.session = ScriptModeSession(thread, session_info=session_info,
  223. on_task_command=self.send_msg_to_client,
  224. loop=asyncio.get_event_loop())
  225. websocket_conn_opened.set()
  226. else:
  227. self.close()
  228. def on_close(self):
  229. if SingleSessionWSHandler.session is not None and self.main_session:
  230. self.session.close()
  231. logger.debug('ScriptModeSession closed')
  232. async def wait_to_stop_loop(server):
  233. """当只剩当前线程和Daemon线程运行时,关闭Server
  234. When only the current thread and Daemon thread are running, close the Server"""
  235. # 包括当前线程在内的非Daemon线程数
  236. # The number of non-Daemon threads(including the current thread)
  237. alive_none_daemonic_thread_cnt = None
  238. while alive_none_daemonic_thread_cnt != 1:
  239. alive_none_daemonic_thread_cnt = sum(
  240. 1 for t in threading.enumerate() if t.is_alive() and not t.isDaemon()
  241. )
  242. await asyncio.sleep(1)
  243. # 关闭Websocket连接
  244. # Close the Websocket connection
  245. if SingleSessionWSHandler.instance:
  246. SingleSessionWSHandler.instance.close()
  247. server.stop()
  248. logger.debug('Closing tornado ioloop...')
  249. tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task() and not t.done()]
  250. for task in tasks: task.cancel()
  251. # 必须需要 await asyncio.sleep ,否则上方 task.cancel() 调用无法调度生效
  252. # This line must be required, otherwise the `task.cancel()` call cannot be scheduled to take effect
  253. await asyncio.sleep(0)
  254. tornado.ioloop.IOLoop.current().stop()
  255. def server_thread():
  256. from tornado.log import access_log, app_log, gen_log
  257. access_log.setLevel(logging.ERROR)
  258. app_log.setLevel(logging.ERROR)
  259. gen_log.setLevel(logging.ERROR)
  260. loop = asyncio.new_event_loop()
  261. asyncio.set_event_loop(loop)
  262. global _ioloop
  263. _ioloop = tornado.ioloop.IOLoop.current()
  264. port = 0
  265. if os.environ.get("PYWEBIO_SCRIPT_MODE_PORT"):
  266. port = int(os.environ.get("PYWEBIO_SCRIPT_MODE_PORT"))
  267. server, port = _setup_server(webio_handler=SingleSessionWSHandler, port=port, host='localhost')
  268. tornado.ioloop.IOLoop.current().spawn_callback(partial(wait_to_stop_loop, server=server))
  269. if "PYWEBIO_SCRIPT_MODE_PORT" not in os.environ:
  270. tornado.ioloop.IOLoop.current().spawn_callback(open_webbrowser_on_server_started, 'localhost', port)
  271. tornado.ioloop.IOLoop.current().start()
  272. logger.debug('Tornado server exit')
  273. t = threading.Thread(target=server_thread, name='Tornado-server')
  274. t.start()
  275. websocket_conn_opened.wait()