tornado.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. import asyncio
  2. import json
  3. import logging
  4. import threading
  5. import webbrowser
  6. import tornado
  7. import tornado.httpserver
  8. import tornado.ioloop
  9. import tornado.websocket
  10. from tornado.web import StaticFileHandler
  11. from ..session import AsyncBasedSession, ThreadBasedWebIOSession, get_session_implement, DesignatedThreadSession, \
  12. mark_server_started
  13. from ..utils import get_free_port, wait_host_port, STATIC_PATH
  14. logger = logging.getLogger(__name__)
  15. def webio_handler(task_func):
  16. class WSHandler(tornado.websocket.WebSocketHandler):
  17. def check_origin(self, origin):
  18. return True
  19. def get_compression_options(self):
  20. # Non-None enables compression with default options.
  21. return {}
  22. def send_msg_to_client(self, session: AsyncBasedSession):
  23. for msg in session.get_task_messages():
  24. self.write_message(json.dumps(msg))
  25. def open(self):
  26. logger.debug("WebSocket opened")
  27. self.set_nodelay(True)
  28. self._close_from_session_tag = False # 是否从session中关闭连接
  29. if get_session_implement() is AsyncBasedSession:
  30. self.session = AsyncBasedSession(task_func, on_task_message=self.send_msg_to_client,
  31. on_session_close=self.close)
  32. else:
  33. self.session = ThreadBasedWebIOSession(task_func, on_task_message=self.send_msg_to_client,
  34. on_session_close=self.close_from_session,
  35. loop=asyncio.get_event_loop())
  36. def on_message(self, message):
  37. data = json.loads(message)
  38. self.session.send_client_event(data)
  39. def close_from_session(self):
  40. self._close_from_session_tag = True
  41. self.close()
  42. def on_close(self):
  43. if not self._close_from_session_tag:
  44. self.session.close(no_session_close_callback=True)
  45. logger.debug("WebSocket closed")
  46. return WSHandler
  47. async def open_webbrowser_on_server_started(host, port):
  48. url = 'http://%s:%s' % (host, port)
  49. is_open = await wait_host_port(host, port, duration=5, delay=0.5)
  50. if is_open:
  51. logger.info('Openning %s' % url)
  52. webbrowser.open(url)
  53. else:
  54. logger.error('Open %s failed.' % url)
  55. def _setup_server(webio_handler, port=0, host='', **tornado_app_settings):
  56. if port == 0:
  57. port = get_free_port()
  58. print('Listen on %s:%s' % (host or '0.0.0.0', port))
  59. handlers = [(r"/io", webio_handler),
  60. (r"/(.*)", StaticFileHandler, {"path": STATIC_PATH, 'default_filename': 'index.html'})]
  61. app = tornado.web.Application(handlers=handlers, **tornado_app_settings)
  62. server = app.listen(port, address=host)
  63. return server, port
  64. def start_server(target, port=0, host='', debug=False,
  65. auto_open_webbrowser=False,
  66. websocket_max_message_size=None,
  67. websocket_ping_interval=None,
  68. websocket_ping_timeout=None,
  69. **tornado_app_settings):
  70. """Start a Tornado server to serve `target` function
  71. :param target: task function. It's a coroutine function is use AsyncBasedSession or
  72. a simple function is use ThreadBasedWebIOSession.
  73. :param port: server bind port. set ``0`` to find a free port number to use
  74. :param host: server bind host. ``host`` may be either an IP address or hostname. If it's a hostname,
  75. the server will listen on all IP addresses associated with the name.
  76. set empty string or to listen on all available interfaces.
  77. :param bool debug: Tornado debug mode
  78. :param bool auto_open_webbrowser: Whether or not auto open web browser when server is started.
  79. :param int websocket_max_message_size: Max bytes of a message which Tornado can accept.
  80. Messages larger than the ``websocket_max_message_size`` (default 10MiB) will not be accepted.
  81. :param int websocket_ping_interval: If set to a number, all websockets will be pinged every n seconds.
  82. This can help keep the connection alive through certain proxy servers which close idle connections,
  83. and it can detect if the websocket has failed without being properly closed.
  84. :param int websocket_ping_timeout: If the ping interval is set, and the server doesn’t receive a ‘pong’
  85. in this many seconds, it will close the websocket. The default is three times the ping interval,
  86. with a minimum of 30 seconds. Ignored if ``websocket_ping_interval`` is not set.
  87. :param tornado_app_settings: Additional keyword arguments passed to the constructor of ``tornado.web.Application``.
  88. ref: https://www.tornadoweb.org/en/stable/web.html#tornado.web.Application.settings
  89. :return:
  90. """
  91. kwargs = locals()
  92. mark_server_started()
  93. app_options = ['debug', 'websocket_max_message_size', 'websocket_ping_interval', 'websocket_ping_timeout']
  94. for opt in app_options:
  95. if kwargs[opt] is not None:
  96. tornado_app_settings[opt] = kwargs[opt]
  97. handler = webio_handler(target)
  98. _, port = _setup_server(webio_handler=handler, port=port, host=host, **tornado_app_settings)
  99. if auto_open_webbrowser:
  100. tornado.ioloop.IOLoop.current().spawn_callback(open_webbrowser_on_server_started, host or 'localhost', port)
  101. tornado.ioloop.IOLoop.current().start()
  102. def start_server_in_current_thread_session():
  103. mark_server_started()
  104. websocket_conn_opened = threading.Event()
  105. thread = threading.current_thread()
  106. class SingleSessionWSHandler(webio_handler(None)):
  107. session = None
  108. def open(self):
  109. if SingleSessionWSHandler.session is None:
  110. SingleSessionWSHandler.session = DesignatedThreadSession(thread,
  111. on_task_message=self.send_msg_to_client,
  112. loop=asyncio.get_event_loop())
  113. websocket_conn_opened.set()
  114. else:
  115. self.close()
  116. def on_close(self):
  117. if SingleSessionWSHandler.session is not None:
  118. self.session.close()
  119. logger.debug('DesignatedThreadSession.closed')
  120. async def wait_to_stop_loop():
  121. alive_none_daemonic_thread_cnt = None
  122. while alive_none_daemonic_thread_cnt != 1:
  123. alive_none_daemonic_thread_cnt = sum(
  124. 1 for t in threading.enumerate() if t.is_alive() and not t.isDaemon()
  125. )
  126. await asyncio.sleep(1)
  127. # Current thread is only one none-daemonic-thread, so exit
  128. logger.debug('Closing tornado ioloop...')
  129. tornado.ioloop.IOLoop.current().stop()
  130. def server_thread():
  131. loop = asyncio.new_event_loop()
  132. asyncio.set_event_loop(loop)
  133. server, port = _setup_server(webio_handler=SingleSessionWSHandler, host='localhost')
  134. tornado.ioloop.IOLoop.current().spawn_callback(wait_to_stop_loop)
  135. tornado.ioloop.IOLoop.current().spawn_callback(open_webbrowser_on_server_started, 'localhost', port)
  136. tornado.ioloop.IOLoop.current().start()
  137. logger.debug('Tornado server exit')
  138. t = threading.Thread(target=server_thread, name='Tornado-server')
  139. t.start()
  140. websocket_conn_opened.wait()