tornado.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. import asyncio
  2. import fnmatch
  3. import json
  4. import logging
  5. import os
  6. import threading
  7. import webbrowser
  8. from functools import partial
  9. from urllib.parse import urlparse
  10. import tornado
  11. import tornado.httpserver
  12. import tornado.ioloop
  13. from tornado.web import StaticFileHandler
  14. from tornado.websocket import WebSocketHandler
  15. from ..session import CoroutineBasedSession, ThreadBasedSession, ScriptModeSession, \
  16. register_session_implement_for_target, Session
  17. from ..session.base import get_session_info_from_headers
  18. from ..utils import get_free_port, wait_host_port, STATIC_PATH, iscoroutinefunction, isgeneratorfunction
  19. from .utils import make_applications, render_page
  20. logger = logging.getLogger(__name__)
  21. _ioloop = None
  22. def ioloop() -> tornado.ioloop.IOLoop:
  23. """获得运行Tornado server的IOLoop"""
  24. global _ioloop
  25. return _ioloop
  26. def _check_origin(origin, allowed_origins, handler: WebSocketHandler):
  27. if _is_same_site(origin, handler):
  28. return True
  29. return any(
  30. fnmatch.fnmatch(origin, patten)
  31. for patten in allowed_origins
  32. )
  33. def _is_same_site(origin, handler: WebSocketHandler):
  34. parsed_origin = urlparse(origin)
  35. origin = parsed_origin.netloc
  36. origin = origin.lower()
  37. host = handler.request.headers.get("Host")
  38. # Check to see that origin matches host directly, including ports
  39. return origin == host
  40. def _webio_handler(applications, check_origin_func=_is_same_site):
  41. """获取用于Tornado进行整合的RequestHandler类
  42. :param dict applications: 任务名->任务函数 的字典
  43. :param callable check_origin_func: check_origin_func(origin, handler) -> bool
  44. :return: Tornado RequestHandler类
  45. """
  46. class WSHandler(WebSocketHandler):
  47. async def get(self, *args, **kwargs) -> None:
  48. # It's a simple http GET request
  49. if self.request.headers.get("Upgrade", "").lower() != "websocket":
  50. # Backward compatible
  51. if self.get_query_argument('test', ''):
  52. return self.write('')
  53. app_name = self.get_query_argument('app', 'index')
  54. app = applications.get(app_name) or applications['index']
  55. html = render_page(app, protocol='ws')
  56. return self.write(html)
  57. else:
  58. await super().get()
  59. def check_origin(self, origin):
  60. return check_origin_func(origin=origin, handler=self)
  61. def get_compression_options(self):
  62. # Non-None enables compression with default options.
  63. return {}
  64. def send_msg_to_client(self, session: Session):
  65. for msg in session.get_task_commands():
  66. self.write_message(json.dumps(msg))
  67. def open(self):
  68. logger.debug("WebSocket opened")
  69. # self.set_nodelay(True)
  70. self._close_from_session_tag = False # 由session主动关闭连接
  71. session_info = get_session_info_from_headers(self.request.headers)
  72. session_info['user_ip'] = self.request.remote_ip
  73. session_info['request'] = self.request
  74. session_info['backend'] = 'tornado'
  75. app_name = self.get_query_argument('app', 'index')
  76. application = applications.get(app_name) or applications['index']
  77. if iscoroutinefunction(application) or isgeneratorfunction(application):
  78. self.session = CoroutineBasedSession(application, session_info=session_info,
  79. on_task_command=self.send_msg_to_client,
  80. on_session_close=self.close_from_session)
  81. else:
  82. self.session = ThreadBasedSession(application, session_info=session_info,
  83. on_task_command=self.send_msg_to_client,
  84. on_session_close=self.close_from_session,
  85. loop=asyncio.get_event_loop())
  86. def on_message(self, message):
  87. data = json.loads(message)
  88. if data is not None:
  89. self.session.send_client_event(data)
  90. def close_from_session(self):
  91. self._close_from_session_tag = True
  92. self.close()
  93. def on_close(self):
  94. if not self._close_from_session_tag: # 只有在由客户端主动断开连接时,才调用 session.close()
  95. self.session.close()
  96. logger.debug("WebSocket closed")
  97. return WSHandler
  98. def webio_handler(applications, allowed_origins=None, check_origin=None):
  99. """获取在Tornado中运行PyWebIO应用的RequestHandler类。RequestHandler类基于WebSocket协议与浏览器进行通讯。
  100. :param callable/list/dict applications: PyWebIO应用。
  101. :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
  102. :param callable check_origin: 请求来源检查函数。
  103. 关于各参数的详细说明见 :func:`pywebio.platform.tornado.start_server` 的同名参数。
  104. :return: Tornado RequestHandler类
  105. """
  106. applications = make_applications(applications)
  107. for target in applications.values():
  108. register_session_implement_for_target(target)
  109. if check_origin is None:
  110. check_origin_func = partial(_check_origin, allowed_origins=allowed_origins or [])
  111. else:
  112. check_origin_func = lambda origin, handler: _is_same_site(origin, handler) or check_origin(origin)
  113. return _webio_handler(applications=applications, check_origin_func=check_origin_func)
  114. async def open_webbrowser_on_server_started(host, port):
  115. url = 'http://%s:%s' % (host, port)
  116. is_open = await wait_host_port(host, port, duration=20)
  117. if is_open:
  118. logger.info('Try open %s in web browser' % url)
  119. webbrowser.open(url)
  120. else:
  121. logger.error('Open %s failed.' % url)
  122. def _setup_server(webio_handler, port=0, host='', **tornado_app_settings):
  123. if port == 0:
  124. port = get_free_port()
  125. handlers = [(r"/", webio_handler),
  126. (r"/(.*)", StaticFileHandler, {"path": STATIC_PATH, 'default_filename': 'index.html'})]
  127. app = tornado.web.Application(handlers=handlers, **tornado_app_settings)
  128. server = app.listen(port, address=host)
  129. return server, port
  130. def start_server(applications, port=0, host='', debug=False,
  131. allowed_origins=None, check_origin=None,
  132. auto_open_webbrowser=False,
  133. websocket_max_message_size=None,
  134. websocket_ping_interval=None,
  135. websocket_ping_timeout=None,
  136. **tornado_app_settings):
  137. """启动一个 Tornado server 将PyWebIO应用作为Web服务提供。
  138. Tornado为PyWebIO应用的默认后端Server,可以直接使用 ``from pywebio import start_server`` 导入。
  139. :param list/dict/callable applications: PyWebIO应用. 可以是任务函数或者任务函数的字典或列表。
  140. 类型为字典时,字典键为任务名,类型为列表时,函数名为任务名。
  141. 可以通过 ``app`` URL参数选择要运行的任务(例如访问 ``http://host:port/?app=foo`` 来运行 ``foo`` 任务),
  142. 默认使用运行 ``index`` 任务函数,当 ``index`` 任务不存在时,PyWebIO会提供一个默认的索引页作为主页。
  143. 参见 :ref:`Server模式 <server_and_script_mode>`
  144. 任务函数为协程函数时,使用 :ref:`基于协程的会话实现 <coroutine_based_session>` ;任务函数为普通函数时,使用基于线程的会话实现。
  145. :param int port: 服务监听的端口。设置为 ``0`` 时,表示自动选择可用端口。
  146. :param str host: 服务绑定的地址。 ``host`` 可以是IP地址或者为hostname。如果为hostname,服务会监听所有与该hostname关联的IP地址。
  147. 通过设置 ``host`` 为空字符串或 ``None`` 来将服务绑定到所有可用的地址上。
  148. :param bool debug: 是否开启Tornado Server的debug模式,开启后,代码发生修改后服务器会自动重启。
  149. 详情请参阅 `tornado 文档 <https://www.tornadoweb.org/en/stable/guide/running.html#debug-mode>`_
  150. :param list allowed_origins: 除当前域名外,服务器还允许的请求的来源列表。
  151. 来源包含协议、域名和端口部分,允许使用 Unix shell 风格的匹配模式(全部规则参见 `Python文档 <https://docs.python.org/zh-tw/3/library/fnmatch.html>`_ ):
  152. - ``*`` 为通配符
  153. - ``?`` 匹配单个字符
  154. - ``[seq]`` 匹配seq中的任何字符
  155. - ``[!seq]`` 匹配任何不在seq中的字符
  156. 比如 ``https://*.example.com`` 、 ``*://*.example.com``
  157. :param callable check_origin: 请求来源检查函数。接收请求来源(包含协议、域名和端口部分)字符串作为参数,
  158. 返回 ``True/False`` 指示服务器接受/拒绝该请求。若设置了 ``check_origin`` , ``allowed_origins`` 参数将被忽略
  159. :param bool auto_open_webbrowser: 当服务启动后,是否自动打开浏览器来访问服务。(该操作需要操作系统支持)
  160. :param int websocket_max_message_size: Tornado Server最大可接受的WebSockets消息大小。单位为字节,默认为10MiB。
  161. :param int websocket_ping_interval: 当被设置后,服务器会以 ``websocket_ping_interval`` 秒周期性地向每个WebSockets连接发送‘ping‘消息。
  162. 如果应用处在某些反向代理服务器之后,设置 ``websocket_ping_interval`` 可以避免WebSockets连接被代理服务器当作空闲连接而关闭。
  163. 同时,若WebSockets连接在某些情况下被异常关闭,应用也可以及时感知。
  164. :param int websocket_ping_timeout: 如果设置了 ``websocket_ping_interval`` ,而服务没有在发送‘ping‘消息后的 ``websocket_ping_timeout`` 秒
  165. 内收到‘pong’消息,应用会将连接关闭。默认的超时时间为 ``websocket_ping_interval`` 的三倍。
  166. :param tornado_app_settings: 传递给 ``tornado.web.Application`` 构造函数的额外的关键字参数
  167. 可设置项参考: https://www.tornadoweb.org/en/stable/web.html#tornado.web.Application.settings
  168. """
  169. kwargs = locals()
  170. global _ioloop
  171. _ioloop = tornado.ioloop.IOLoop.current()
  172. app_options = ['debug', 'websocket_max_message_size', 'websocket_ping_interval', 'websocket_ping_timeout']
  173. for opt in app_options:
  174. if kwargs[opt] is not None:
  175. tornado_app_settings[opt] = kwargs[opt]
  176. handler = webio_handler(applications, allowed_origins=allowed_origins, check_origin=check_origin)
  177. _, port = _setup_server(webio_handler=handler, port=port, host=host, **tornado_app_settings)
  178. print('Listen on %s:%s' % (host or '0.0.0.0', port))
  179. if auto_open_webbrowser:
  180. tornado.ioloop.IOLoop.current().spawn_callback(open_webbrowser_on_server_started, host or 'localhost', port)
  181. tornado.ioloop.IOLoop.current().start()
  182. def start_server_in_current_thread_session():
  183. """启动 script mode 的server,监听可用端口,并自动打开浏览器
  184. PYWEBIO_SCRIPT_MODE_PORT环境变量可以设置监听端口,并关闭自动打开浏览器,用于测试
  185. """
  186. websocket_conn_opened = threading.Event()
  187. thread = threading.current_thread()
  188. mock_apps = dict(index=lambda: None)
  189. class SingleSessionWSHandler(_webio_handler(applications=mock_apps)):
  190. session = None
  191. instance = None
  192. def open(self):
  193. self.main_session = False
  194. if SingleSessionWSHandler.session is None:
  195. self.main_session = True
  196. SingleSessionWSHandler.instance = self
  197. session_info = get_session_info_from_headers(self.request.headers)
  198. session_info['user_ip'] = self.request.remote_ip
  199. session_info['request'] = self.request
  200. session_info['backend'] = 'tornado'
  201. SingleSessionWSHandler.session = ScriptModeSession(thread, session_info=session_info,
  202. on_task_command=self.send_msg_to_client,
  203. loop=asyncio.get_event_loop())
  204. websocket_conn_opened.set()
  205. else:
  206. self.close()
  207. def on_close(self):
  208. if SingleSessionWSHandler.session is not None and self.main_session:
  209. self.session.close()
  210. logger.debug('ScriptModeSession closed')
  211. async def wait_to_stop_loop(server):
  212. """当只剩当前线程和Daemon线程运行时,关闭Server"""
  213. alive_none_daemonic_thread_cnt = None # 包括当前线程在内的非Daemon线程数
  214. while alive_none_daemonic_thread_cnt != 1:
  215. alive_none_daemonic_thread_cnt = sum(
  216. 1 for t in threading.enumerate() if t.is_alive() and not t.isDaemon()
  217. )
  218. await asyncio.sleep(1)
  219. # 关闭Websocket连接
  220. if SingleSessionWSHandler.instance:
  221. SingleSessionWSHandler.instance.close()
  222. server.stop()
  223. logger.debug('Closing tornado ioloop...')
  224. tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task() and not t.done()]
  225. for task in tasks: task.cancel()
  226. # 必须需要 await asyncio.sleep ,否则 t.cancel() 调用无法调度生效
  227. await asyncio.sleep(0)
  228. tornado.ioloop.IOLoop.current().stop()
  229. def server_thread():
  230. from tornado.log import access_log, app_log, gen_log
  231. access_log.setLevel(logging.ERROR)
  232. app_log.setLevel(logging.ERROR)
  233. gen_log.setLevel(logging.ERROR)
  234. loop = asyncio.new_event_loop()
  235. asyncio.set_event_loop(loop)
  236. global _ioloop
  237. _ioloop = tornado.ioloop.IOLoop.current()
  238. port = 0
  239. if os.environ.get("PYWEBIO_SCRIPT_MODE_PORT"):
  240. port = int(os.environ.get("PYWEBIO_SCRIPT_MODE_PORT"))
  241. server, port = _setup_server(webio_handler=SingleSessionWSHandler, port=port, host='localhost')
  242. tornado.ioloop.IOLoop.current().spawn_callback(partial(wait_to_stop_loop, server=server))
  243. if "PYWEBIO_SCRIPT_MODE_PORT" not in os.environ:
  244. tornado.ioloop.IOLoop.current().spawn_callback(open_webbrowser_on_server_started, 'localhost', port)
  245. tornado.ioloop.IOLoop.current().start()
  246. logger.debug('Tornado server exit')
  247. t = threading.Thread(target=server_thread, name='Tornado-server')
  248. t.start()
  249. websocket_conn_opened.wait()