1
0

framework.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. import logging
  2. import sys
  3. import traceback
  4. from contextlib import contextmanager
  5. import asyncio
  6. from .utils import random_str
  7. logger = logging.getLogger(__name__)
  8. class WebIOFuture:
  9. def __init__(self, coro=None):
  10. self.coro = coro
  11. def __iter__(self):
  12. result = yield self
  13. return result
  14. __await__ = __iter__ # make compatible with 'await' expression
  15. class WebIOSession:
  16. """
  17. 一个PyWebIO任务会话, 由不同的后端Backend创建并维护
  18. WebIOSession是不同的后端Backend与协程交互的桥梁:
  19. 后端Backend在接收到用户浏览器的数据后,会通过调用 ``send_client_msg`` 来通知会话,进而由WebIOSession驱动协程的运行。
  20. 协程内在调用输入输出函数后,会调用 ``send_coro_msg`` 向会话发送输入输出消息指令, WebIOSession将其保存并留给后端Backend处理。
  21. .. note::
  22. 后端Backend在相应on_session_close时关闭连接时,需要保证会话内的所有消息都传送到了客户端
  23. """
  24. def __init__(self, coro_func, on_coro_msg=None, on_session_close=None):
  25. """
  26. :param coro_func: 协程函数
  27. :param on_coro_msg: 由协程内发给session的消息的处理函数
  28. :param on_session_close: 会话结束的处理函数。后端Backend在相应on_session_close时关闭连接时,需要保证会话内的所有消息都传送到了客户端
  29. """
  30. self._on_coro_msg = on_coro_msg or (lambda _: None)
  31. self._on_session_close = on_session_close or (lambda: None)
  32. self.unhandled_server_msgs = []
  33. self.coros = {} # coro_id -> coro
  34. self._closed = False
  35. self.inactive_coro_instances = [] # 待激活的协程实例列表
  36. self.main_task = Task(coro_func(), ws=self)
  37. self.coros[self.main_task.coro_id] = self.main_task
  38. self._step_task(self.main_task)
  39. def _step_task(self, task, result=None):
  40. task.step(result)
  41. if task.task_finished:
  42. logger.debug('del self.coros[%s]', task.coro_id)
  43. del self.coros[task.coro_id]
  44. while self.inactive_coro_instances:
  45. coro = self.inactive_coro_instances.pop()
  46. sub_task = Task(coro, ws=self)
  47. self.coros[sub_task.coro_id] = sub_task
  48. sub_task.step()
  49. if sub_task.task_finished:
  50. logger.debug('del self.coros[%s]', sub_task.coro_id)
  51. del self.coros[sub_task.coro_id]
  52. if self.main_task.task_finished:
  53. self.send_coro_msg(dict(command='close_session'))
  54. self.close()
  55. def send_coro_msg(self, message):
  56. """向会话发送来自协程内的消息
  57. :param dict message: 消息
  58. """
  59. self.unhandled_server_msgs.append(message)
  60. self._on_coro_msg(self)
  61. def send_client_msg(self, message):
  62. """向会话发送来自用户浏览器的事件️
  63. :param dict message: 事件️消息
  64. """
  65. # data = json.loads(message)
  66. coro_id = message['coro_id']
  67. coro = self.coros.get(coro_id)
  68. if not coro:
  69. logger.error('coro not found, coro_id:%s', coro_id)
  70. return
  71. self._step_task(coro, message)
  72. def on_coro_error(self):
  73. from .output import put_markdown # todo
  74. logger.exception('Error in coroutine executing')
  75. type, value, tb = sys.exc_info()
  76. tb_len = len(list(traceback.walk_tb(tb)))
  77. lines = traceback.format_exception(type, value, tb, limit=1 - tb_len)
  78. traceback_msg = ''.join(lines)
  79. put_markdown("发生错误:\n```\n%s\n```" % traceback_msg)
  80. def _cleanup(self):
  81. for t in self.coros.values():
  82. t.cancel()
  83. self.coros = {} # delete session tasks
  84. while self.inactive_coro_instances:
  85. coro = self.inactive_coro_instances.pop()
  86. coro.close()
  87. def close(self, no_session_close_callback=False):
  88. """关闭当前Session
  89. :param bool no_session_close_callback: 不调用 on_session_close 会话结束的处理函数。
  90. 当 close 是由后端Backend调用时可能希望开启 no_session_close_callback
  91. """
  92. self._cleanup()
  93. self._closed = True
  94. if not no_session_close_callback:
  95. self._on_session_close()
  96. # todo clean
  97. def closed(self):
  98. return self._closed
  99. class Task:
  100. @contextmanager
  101. def ws_context(self):
  102. """
  103. >>> with ws_context():
  104. ... res = self.coros[-1].send(data)
  105. """
  106. Global.active_ws = self.ws
  107. Global.active_coro_id = self.coro_id
  108. try:
  109. yield
  110. finally:
  111. Global.active_ws = None
  112. Global.active_coro_id = None
  113. @staticmethod
  114. def gen_coro_id(coro=None):
  115. name = 'coro'
  116. if hasattr(coro, '__name__'):
  117. name = coro.__name__
  118. return '%s-%s' % (name, random_str(10))
  119. def __init__(self, coro, ws):
  120. self.ws = ws
  121. self.coro = coro
  122. self.coro_id = None
  123. self.result = None
  124. self.task_finished = False # 任务完毕/取消
  125. self.coro_id = self.gen_coro_id(self.coro)
  126. self.pending_futures = {} # id(future) -> future
  127. logger.debug('Task[%s] created ', self.coro_id)
  128. def step(self, result=None):
  129. coro_yield = None
  130. with self.ws_context():
  131. try:
  132. coro_yield = self.coro.send(result)
  133. except StopIteration as e:
  134. if len(e.args) == 1:
  135. self.result = e.args[0]
  136. self.task_finished = True
  137. logger.debug('Task[%s] finished', self.coro_id)
  138. except Exception as e:
  139. self.ws.on_coro_error()
  140. future = None
  141. if isinstance(coro_yield, WebIOFuture):
  142. if coro_yield.coro:
  143. future = asyncio.run_coroutine_threadsafe(coro_yield.coro, asyncio.get_event_loop())
  144. elif coro_yield is not None:
  145. future = coro_yield
  146. if not self.ws.closed() and hasattr(future, 'add_done_callback'):
  147. future.add_done_callback(self._tornado_future_callback)
  148. self.pending_futures[id(future)] = future
  149. def _tornado_future_callback(self, future):
  150. if not future.cancelled():
  151. del self.pending_futures[id(future)]
  152. self.step(future.result())
  153. def cancel(self):
  154. logger.debug('Task[%s] canceled', self.coro_id)
  155. self.coro.close()
  156. while self.pending_futures:
  157. _, f = self.pending_futures.popitem()
  158. f.cancel()
  159. self.task_finished = True
  160. def __del__(self):
  161. if not self.task_finished:
  162. logger.warning('Task[%s] not finished when destroy', self.coro_id)
  163. class Global:
  164. # todo issue: with 语句可能发生嵌套,导致内层with退出时,将属性置空
  165. active_ws = None # type:"WebIOController"
  166. active_coro_id = None