tornado.py 18 KB


  1. import asyncio
  2. import fnmatch
  3. import json
  4. import logging
  5. import os
  6. import threading
  7. import typing
  8. import webbrowser
  9. from functools import partial
  10. from typing import Callable, Dict, List, Optional, Union
  11. from urllib.parse import urlparse
  12. import tornado
  13. import tornado.httpserver
  14. import tornado.ioloop
  15. import tornado.web
  16. import tornado.websocket
  17. from . import page
  18. from .adaptor import ws as ws_adaptor
  19. from .page import make_applications, render_page
  20. from .remote_access import start_remote_access_service
  21. from .utils import cdn_validation, print_listen_address, deserialize_binary_event
  22. from ..session import ScriptModeSession, register_session_implement_for_target, Session
  23. from ..session.base import get_session_info_from_headers
  24. from ..utils import get_free_port, wait_host_port, STATIC_PATH, check_webio_js, parse_file_size
  25. logger = logging.getLogger(__name__)
  26. _ioloop = None
  27. def set_ioloop(loop):
  28. global _ioloop
  29. _ioloop = loop
  30. def ioloop() -> tornado.ioloop.IOLoop:
  31. """获得运行Tornado server的IOLoop
  32. 本方法当前仅在显示boken app时使用
  33. This method is currently only used when displaying boken app"""
  34. global _ioloop
  35. return _ioloop
  36. def _check_origin(origin, allowed_origins, handler: tornado.websocket.WebSocketHandler):
  37. if _is_same_site(origin, handler):
  38. return True
  39. return any(
  40. fnmatch.fnmatch(origin, pattern)
  41. for pattern in allowed_origins
  42. )
  43. def _is_same_site(origin, handler: tornado.websocket.WebSocketHandler):
  44. parsed_origin = urlparse(origin)
  45. origin = parsed_origin.netloc
  46. origin = origin.lower()
  47. host = handler.request.headers.get("Host")
  48. # Check to see that origin matches host directly, including ports
  49. return origin == host
  50. class WebSocketConnection(ws_adaptor.WebSocketConnection):
  51. def __init__(self, context: tornado.websocket.WebSocketHandler):
  52. self.context = context
  53. def get_query_argument(self, name) -> typing.Optional[str]:
  54. return self.context.get_query_argument(name, None)
  55. def make_session_info(self) -> dict:
  56. session_info = get_session_info_from_headers(self.context.request.headers)
  57. session_info['user_ip'] = self.context.request.remote_ip
  58. session_info['request'] = self.context.request
  59. session_info['backend'] = 'tornado'
  60. session_info['protocol'] = 'websocket'
  61. return session_info
  62. def write_message(self, message: dict):
  63. self.context.write_message(json.dumps(message))
  64. def closed(self) -> bool:
  65. return not bool(self.context.ws_connection)
  66. def close(self):
  67. self.context.close()
  68. def _webio_handler(applications=None, cdn=True, reconnect_timeout=0, check_origin_func=_is_same_site) \
  69. -> tornado.websocket.WebSocketHandler: # noqa: C901
  70. """
  71. :param dict applications: dict of `name -> task function`
  72. :param bool/str cdn: Whether to load front-end static resources from CDN
  73. :param callable check_origin_func: check_origin_func(origin, handler) -> bool
  74. :return: Tornado RequestHandler class
  75. """
  76. check_webio_js()
  77. if applications is None:
  78. applications = dict(index=lambda: None) # mock PyWebIO app
  79. ws_adaptor.set_expire_second(reconnect_timeout)
  80. tornado.ioloop.IOLoop.current().spawn_callback(ws_adaptor.session_clean_task)
  81. class Handler(tornado.websocket.WebSocketHandler):
  82. def get_app(self):
  83. app_name = self.get_query_argument('app', 'index')
  84. app = applications.get(app_name) or applications['index']
  85. return app
  86. def get_cdn(self):
  87. if cdn is True and self.get_query_argument('_pywebio_cdn', '') == 'false':
  88. return False
  89. return cdn
  90. async def get(self, *args, **kwargs) -> None:
  91. """http GET request"""
  92. if self.request.headers.get("Upgrade", "").lower() != "websocket":
  93. # Backward compatible
  94. # Frontend detect whether the backend is http server
  95. if self.get_query_argument('test', ''):
  96. return self.write('')
  97. app = self.get_app()
  98. html = render_page(app, protocol='ws', cdn=self.get_cdn())
  99. return self.write(html)
  100. else:
  101. await super().get()
  102. def check_origin(self, origin):
  103. return check_origin_func(origin=origin, handler=self)
  104. def get_compression_options(self):
  105. # Non-None enables compression with default options.
  106. return {}
  107. _handler: ws_adaptor.WebSocketHandler
  108. def open(self):
  109. conn = WebSocketConnection(self)
  110. self._handler = ws_adaptor.WebSocketHandler(
  111. connection=conn, application=self.get_app(), reconnectable=bool(reconnect_timeout)
  112. )
  113. def on_message(self, message):
  114. self._handler.send_client_data(message)
  115. def on_close(self):
  116. self._handler.notify_connection_lost()
  117. return Handler
  118. def webio_handler(applications, cdn=True, reconnect_timeout=0, allowed_origins=None, check_origin=None):
  119. """Get the ``RequestHandler`` class for running PyWebIO applications in Tornado.
  120. The ``RequestHandler`` communicates with the browser by WebSocket protocol.
  121. The arguments of ``webio_handler()`` have the same meaning as for :func:`pywebio.platform.tornado.start_server`
  122. """
  123. applications = make_applications(applications)
  124. for target in applications.values():
  125. register_session_implement_for_target(target)
  126. cdn = cdn_validation(cdn, 'error') # if CDN is not available, raise error
  127. if check_origin is None:
  128. check_origin_func = partial(_check_origin, allowed_origins=allowed_origins or [])
  129. else:
  130. check_origin_func = lambda origin, handler: _is_same_site(origin, handler) or check_origin(origin)
  131. return _webio_handler(applications=applications, cdn=cdn, check_origin_func=check_origin_func,
  132. reconnect_timeout=reconnect_timeout)
  133. async def open_webbrowser_on_server_started(host, port):
  134. url = 'http://%s:%s' % (host, port)
  135. is_open = await wait_host_port(host, port, duration=20)
  136. if is_open:
  137. logger.info('Try open %s in web browser' % url)
  138. # webbrowser.open() may block, so invoke it in thread
  139. threading.Thread(target=webbrowser.open, args=(url,), daemon=True).start()
  140. else:
  141. logger.error('Open %s in web browser failed.' % url)
  142. def _setup_server(webio_handler, port=0, host='', static_dir=None, max_buffer_size=2 ** 20 * 200,
  143. **tornado_app_settings):
  144. if port == 0:
  145. port = get_free_port()
  146. handlers = [(r"/", webio_handler)]
  147. if static_dir is not None:
  148. handlers.append((r"/static/(.*)", tornado.web.StaticFileHandler, {"path": static_dir}))
  149. handlers.append((r"/(.*)", tornado.web.StaticFileHandler, {"path": STATIC_PATH, 'default_filename': 'index.html'}))
  150. app = tornado.web.Application(handlers=handlers, **tornado_app_settings)
  151. # Credit: https://stackoverflow.com/questions/19074972/content-length-too-long-when-uploading-file-using-tornado
  152. server = app.listen(port, address=host, max_buffer_size=max_buffer_size)
  153. return server, port
  154. def start_server(applications: Union[Callable[[], None], List[Callable[[], None]], Dict[str, Callable[[], None]]],
  155. port: int = 0, host: str = '', debug: bool = False, cdn: Union[bool, str] = True,
  156. static_dir: Optional[str] = None, remote_access: bool = False, reconnect_timeout: int = 0,
  157. allowed_origins: Optional[List[str]] = None, check_origin: Callable[[str], bool] = None,
  158. auto_open_webbrowser: bool = False, max_payload_size: Union[int, str] = '200M',
  159. **tornado_app_settings):
  160. """Start a Tornado server to provide the PyWebIO application as a web service.
  161. The Tornado server communicates with the browser by WebSocket protocol.
  162. Tornado is the default backend server for PyWebIO applications,
  163. and ``start_server`` can be imported directly using ``from pywebio import start_server``.
  164. :param list/dict/callable applications: PyWebIO application.
  165. Can be a task function, a list of functions, or a dictionary.
  166. Refer to :ref:`Advanced topic: Multiple applications in start_server() <multiple_app>` for more information.
  167. When the task function is a coroutine function, use :ref:`Coroutine-based session <coroutine_based_session>` implementation,
  168. otherwise, use thread-based session implementation.
  169. :param int port: The port the server listens on.
  170. When set to ``0``, the server will automatically select a available port.
  171. :param str host: The host the server listens on. ``host`` may be either an IP address or hostname.
  172. If it’s a hostname, the server will listen on all IP addresses associated with the name.
  173. ``host`` may be an empty string or None to listen on all available interfaces.
  174. :param bool debug: Tornado Server's debug mode. If enabled, the server will automatically reload for code changes.
  175. See `tornado doc <https://www.tornadoweb.org/en/stable/guide/running.html#debug-mode>`_ for more detail.
  176. :param bool/str cdn: Whether to load front-end static resources from CDN, the default is ``True``.
  177. Can also use a string to directly set the url of PyWebIO static resources.
  178. :param str static_dir: The directory to store the application static files.
  179. The files in this directory can be accessed via ``http://<host>:<port>/static/files``.
  180. For example, if there is a ``A/B.jpg`` file in ``static_dir`` path,
  181. it can be accessed via ``http://<host>:<port>/static/A/B.jpg``.
  182. :param bool remote_access: Whether to enable remote access, when enabled,
  183. you can get a temporary public network access address for the current application,
  184. others can access your application via this address.
  185. :param bool auto_open_webbrowser: Whether or not auto open web browser when server is started (if the operating system allows it) .
  186. :param int reconnect_timeout: The client can reconnect to server within ``reconnect_timeout`` seconds after an unexpected disconnection.
  187. If set to 0 (default), once the client disconnects, the server session will be closed.
  188. :param list allowed_origins: The allowed request source list. (The current server host is always allowed)
  189. The source contains the protocol, domain name, and port part.
  190. Can use Unix shell-style wildcards:
  191. - ``*`` matches everything
  192. - ``?`` matches any single character
  193. - ``[seq]`` matches any character in *seq*
  194. - ``[!seq]`` matches any character not in *seq*
  195. Such as: ``https://*.example.com`` 、 ``*://*.example.com``
  196. For detail, see `Python Doc <https://docs.python.org/zh-tw/3/library/fnmatch.html>`_
  197. :param callable check_origin: The validation function for request source.
  198. It receives the source string (which contains protocol, host, and port parts) as parameter and
  199. return ``True/False`` to indicate that the server accepts/rejects the request.
  200. If ``check_origin`` is set, the ``allowed_origins`` parameter will be ignored.
  201. :param bool auto_open_webbrowser: Whether or not auto open web browser when server is started (if the operating system allows it) .
  202. :param int/str max_payload_size: Max size of a websocket message which Tornado can accept.
  203. Messages larger than the ``max_payload_size`` (default 200MB) will not be accepted.
  204. ``max_payload_size`` can be a integer indicating the number of bytes, or a string ending with `K` / `M` / `G`
  205. (representing kilobytes, megabytes, and gigabytes, respectively).
  206. E.g: ``500``, ``'40K'``, ``'3M'``
  207. :param tornado_app_settings: Additional keyword arguments passed to the constructor of ``tornado.web.Application``.
  208. For details, please refer: https://www.tornadoweb.org/en/stable/web.html#tornado.web.Application.settings
  209. """
  210. set_ioloop(tornado.ioloop.IOLoop.current()) # to enable bokeh app
  211. cdn = cdn_validation(cdn, 'warn') # if CDN is not available, warn user and disable CDN
  212. page.MAX_PAYLOAD_SIZE = max_payload_size = parse_file_size(max_payload_size)
  213. # covered `os.environ.get()` func with `bool()` to prevent type check error
  214. debug = Session.debug = bool(os.environ.get('PYWEBIO_DEBUG', debug))
  215. # Since some cloud server may close idle connections (such as heroku),
  216. # use `websocket_ping_interval` to keep the connection alive
  217. tornado_app_settings.setdefault('websocket_ping_interval', 30)
  218. tornado_app_settings.setdefault('websocket_max_message_size', max_payload_size) # Backward compatible
  219. tornado_app_settings['websocket_max_message_size'] = parse_file_size(
  220. tornado_app_settings['websocket_max_message_size'])
  221. tornado_app_settings['debug'] = debug
  222. handler = webio_handler(applications, cdn, allowed_origins=allowed_origins, check_origin=check_origin,
  223. reconnect_timeout=reconnect_timeout)
  224. _, port = _setup_server(webio_handler=handler, port=port, host=host, static_dir=static_dir,
  225. max_buffer_size=max_payload_size, **tornado_app_settings)
  226. print_listen_address(host, port)
  227. if auto_open_webbrowser:
  228. tornado.ioloop.IOLoop.current().spawn_callback(open_webbrowser_on_server_started, host or '127.0.0.1', port)
  229. if remote_access:
  230. start_remote_access_service(local_port=port)
  231. tornado.ioloop.IOLoop.current().start()
  232. def start_server_in_current_thread_session():
  233. """启动 script mode 的server,监听可用端口,并自动打开浏览器
  234. Start the server for script mode, and automatically open the browser when the server port is available.
  235. PYWEBIO_SCRIPT_MODE_PORT环境变量可以设置监听端口,并关闭自动打开浏览器,用于测试
  236. The PYWEBIO_SCRIPT_MODE_PORT environment variable can set the listening port, just used in testing.
  237. """
  238. websocket_conn_opened = threading.Event()
  239. thread = threading.current_thread()
  240. class SingleSessionWSHandler(_webio_handler(cdn=False)):
  241. session: ScriptModeSession = None
  242. instance: typing.ClassVar = None
  243. closed = False
  244. def send_msg_to_client(self, session):
  245. for msg in session.get_task_commands():
  246. try:
  247. self.write_message(json.dumps(msg))
  248. except TypeError as e:
  249. logger.exception('Data serialization error: %s\n'
  250. 'This may be because you pass the wrong type of parameter to the function'
  251. ' of PyWebIO.\nData content: %s', e, msg)
  252. def open(self):
  253. if SingleSessionWSHandler.session is None:
  254. SingleSessionWSHandler.instance = self
  255. session_info = get_session_info_from_headers(self.request.headers)
  256. session_info['user_ip'] = self.request.remote_ip
  257. session_info['request'] = self.request
  258. session_info['backend'] = 'tornado'
  259. session_info['protocol'] = 'websocket'
  260. self.session = SingleSessionWSHandler.session = ScriptModeSession(
  261. thread, session_info=session_info,
  262. on_task_command=self.send_msg_to_client,
  263. loop=asyncio.get_event_loop())
  264. websocket_conn_opened.set()
  265. else:
  266. self.close()
  267. def on_message(self, data):
  268. if isinstance(data, bytes):
  269. event = deserialize_binary_event(data)
  270. else:
  271. event = json.loads(data)
  272. if event is None:
  273. return
  274. self.session.send_client_event(event)
  275. def on_close(self):
  276. if self.session is not None:
  277. self.session.close()
  278. self.closed = True
  279. logger.debug('ScriptModeSession closed')
  280. async def wait_to_stop_loop(server):
  281. """当只剩当前线程和Daemon线程运行时,关闭Server
  282. When only the current thread and Daemon thread are running, close the Server"""
  283. # 包括当前线程在内的非Daemon线程数
  284. # The number of non-Daemon threads(including the current thread)
  285. alive_none_daemonic_thread_cnt = None
  286. while alive_none_daemonic_thread_cnt != 1:
  287. alive_none_daemonic_thread_cnt = sum(
  288. 1 for t in threading.enumerate() if t.is_alive() and not t.isDaemon()
  289. )
  290. await asyncio.sleep(0.5)
  291. if SingleSessionWSHandler.session and SingleSessionWSHandler.session.need_keep_alive():
  292. while not SingleSessionWSHandler.instance.closed:
  293. await asyncio.sleep(0.5)
  294. # 关闭Websocket连接
  295. # Close the Websocket connection
  296. if SingleSessionWSHandler.instance:
  297. SingleSessionWSHandler.instance.close()
  298. server.stop()
  299. logger.debug('Closing tornado ioloop...')
  300. tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task() and not t.done()]
  301. for task in tasks:
  302. task.cancel()
  303. # 必须需要 await asyncio.sleep ,否则上方 task.cancel() 调用无法调度生效
  304. # This line must be required, otherwise the `task.cancel()` call cannot be scheduled to take effect
  305. await asyncio.sleep(0)
  306. tornado.ioloop.IOLoop.current().stop()
  307. def server_thread():
  308. from tornado.log import access_log, app_log, gen_log
  309. access_log.setLevel(logging.ERROR)
  310. app_log.setLevel(logging.ERROR)
  311. gen_log.setLevel(logging.ERROR)
  312. loop = asyncio.new_event_loop()
  313. asyncio.set_event_loop(loop)
  314. set_ioloop(tornado.ioloop.IOLoop.current()) # to enable bokeh app
  315. port = 0
  316. if os.environ.get("PYWEBIO_SCRIPT_MODE_PORT"):
  317. port = int(os.environ.get("PYWEBIO_SCRIPT_MODE_PORT"))
  318. server, port = _setup_server(webio_handler=SingleSessionWSHandler, port=port, host='127.0.0.1',
  319. websocket_max_message_size=parse_file_size('200M'))
  320. tornado.ioloop.IOLoop.current().spawn_callback(partial(wait_to_stop_loop, server=server))
  321. if "PYWEBIO_SCRIPT_MODE_PORT" not in os.environ:
  322. tornado.ioloop.IOLoop.current().spawn_callback(open_webbrowser_on_server_started, '127.0.0.1', port)
  323. tornado.ioloop.IOLoop.current().start()
  324. logger.debug('Tornado server exit')
  325. t = threading.Thread(target=server_thread, name='Tornado-server')
  326. t.start()
  327. websocket_conn_opened.wait()