asyncbased.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import logging
  2. import sys
  3. import traceback
  4. from contextlib import contextmanager
  5. import asyncio
  6. from ..utils import random_str
  7. from .base import AbstractSession
  8. logger = logging.getLogger(__name__)
  9. class WebIOFuture:
  10. def __init__(self, coro=None):
  11. self.coro = coro
  12. def __iter__(self):
  13. result = yield self
  14. return result
  15. __await__ = __iter__ # make compatible with 'await' expression
  16. class _context:
  17. current_session = None # type:"AsyncBasedSession"
  18. current_task_id = None
  19. class AsyncBasedSession(AbstractSession):
  20. """
  21. 一个PyWebIO任务会话, 由不同的后端Backend创建并维护
  22. WebIOSession是不同的后端Backend与协程交互的桥梁:
  23. 后端Backend在接收到用户浏览器的数据后,会通过调用 ``send_client_msg`` 来通知会话,进而由WebIOSession驱动协程的运行。
  24. 协程内在调用输入输出函数后,会调用 ``send_coro_msg`` 向会话发送输入输出消息指令, WebIOSession将其保存并留给后端Backend处理。
  25. .. note::
  26. 后端Backend在相应on_session_close时关闭连接时,需要保证会话内的所有消息都传送到了客户端
  27. """
  28. @staticmethod
  29. def get_current_session() -> "AsyncBasedSession":
  30. if _context.current_session is None:
  31. raise RuntimeError("No current found in context!")
  32. return _context.current_session
  33. @staticmethod
  34. def get_current_task_id():
  35. if _context.current_task_id is None:
  36. raise RuntimeError("No current task found in context!")
  37. return _context.current_task_id
  38. def __init__(self, coroutine_func, on_task_message=None, on_session_close=None):
  39. """
  40. :param coro_func: 协程函数
  41. :param on_coro_msg: 由协程内发给session的消息的处理函数
  42. :param on_session_close: 会话结束的处理函数。后端Backend在相应on_session_close时关闭连接时,需要保证会话内的所有消息都传送到了客户端
  43. """
  44. self._on_task_message = on_task_message or (lambda _: None)
  45. self._on_session_close = on_session_close or (lambda: None)
  46. self.unhandled_task_msgs = []
  47. self.coros = {} # coro_id -> coro
  48. self._closed = False
  49. self.inactive_coro_instances = [] # 待激活的协程实例列表
  50. self.main_task = Task(coroutine_func(), session=self, on_coro_stop=self._on_main_task_finish)
  51. self.coros[self.main_task.coro_id] = self.main_task
  52. self._step_task(self.main_task)
  53. def _step_task(self, task, result=None):
  54. task.step(result)
  55. if task.task_finished and task.coro_id in self.coros:
  56. # 若task 为main task,则 task.step(result) 结束后,可能task已经结束,self.coros已被清理
  57. logger.debug('del self.coros[%s]', task.coro_id)
  58. del self.coros[task.coro_id]
  59. while self.inactive_coro_instances and not self.main_task.task_finished:
  60. coro = self.inactive_coro_instances.pop()
  61. sub_task = Task(coro, session=self)
  62. self.coros[sub_task.coro_id] = sub_task
  63. sub_task.step()
  64. if sub_task.task_finished:
  65. logger.debug('del self.coros[%s]', sub_task.coro_id)
  66. del self.coros[sub_task.coro_id]
  67. def _on_main_task_finish(self):
  68. self.send_task_message(dict(command='close_session'))
  69. self.close()
  70. def send_task_message(self, message):
  71. """向会话发送来自协程内的消息
  72. :param dict message: 消息
  73. """
  74. self.unhandled_task_msgs.append(message)
  75. self._on_task_message(self)
  76. async def next_client_event(self):
  77. res = await WebIOFuture()
  78. return res
  79. def send_client_event(self, event):
  80. """向会话发送来自用户浏览器的事件️
  81. :param dict event: 事件️消息
  82. """
  83. coro_id = event['coro_id']
  84. coro = self.coros.get(coro_id)
  85. if not coro:
  86. logger.error('coro not found, coro_id:%s', coro_id)
  87. return
  88. self._step_task(coro, event)
  89. def get_task_messages(self):
  90. msgs = self.unhandled_task_msgs
  91. self.unhandled_task_msgs = []
  92. return msgs
  93. def _cleanup(self):
  94. for t in self.coros.values():
  95. t.close()
  96. self.coros = {} # delete session tasks
  97. while self.inactive_coro_instances:
  98. coro = self.inactive_coro_instances.pop()
  99. coro.close()
  100. def close(self, no_session_close_callback=False):
  101. """关闭当前Session
  102. :param bool no_session_close_callback: 不调用 on_session_close 会话结束的处理函数。
  103. 当 close 是由后端Backend调用时可能希望开启 no_session_close_callback
  104. """
  105. self._cleanup()
  106. self._closed = True
  107. if not no_session_close_callback:
  108. self._on_session_close()
  109. # todo clean
  110. def closed(self):
  111. return self._closed
  112. def on_task_exception(self):
  113. from ..output import put_markdown # todo
  114. logger.exception('Error in coroutine executing')
  115. type, value, tb = sys.exc_info()
  116. tb_len = len(list(traceback.walk_tb(tb)))
  117. lines = traceback.format_exception(type, value, tb, limit=1 - tb_len)
  118. traceback_msg = ''.join(lines)
  119. put_markdown("发生错误:\n```\n%s\n```" % traceback_msg)
  120. def run_async(self, coro_obj):
  121. self.inactive_coro_instances.append(coro_obj)
  122. class Task:
  123. @contextmanager
  124. def session_context(self):
  125. """
  126. >>> with session_context():
  127. ... res = self.coros[-1].send(data)
  128. """
  129. # todo issue: with 语句可能发生嵌套,导致内层with退出时,将属性置空
  130. _context.current_session = self.session
  131. _context.current_task_id = self.coro_id
  132. try:
  133. yield
  134. finally:
  135. _context.current_session = None
  136. _context.current_task_id = None
  137. @staticmethod
  138. def gen_coro_id(coro=None):
  139. name = 'coro'
  140. if hasattr(coro, '__name__'):
  141. name = coro.__name__
  142. return '%s-%s' % (name, random_str(10))
  143. def __init__(self, coro, session: AsyncBasedSession, on_coro_stop=None):
  144. self.session = session
  145. self.coro = coro
  146. self.coro_id = None
  147. self.result = None
  148. self.task_finished = False # 任务完毕/取消
  149. self.on_coro_stop = on_coro_stop or (lambda: None)
  150. self.coro_id = self.gen_coro_id(self.coro)
  151. self.pending_futures = {} # id(future) -> future
  152. logger.debug('Task[%s] created ', self.coro_id)
  153. def step(self, result=None):
  154. coro_yield = None
  155. with self.session_context():
  156. try:
  157. coro_yield = self.coro.send(result)
  158. except StopIteration as e:
  159. if len(e.args) == 1:
  160. self.result = e.args[0]
  161. self.task_finished = True
  162. logger.debug('Task[%s] finished', self.coro_id)
  163. self.on_coro_stop()
  164. except Exception as e:
  165. self.session.on_task_exception()
  166. future = None
  167. if isinstance(coro_yield, WebIOFuture):
  168. if coro_yield.coro:
  169. future = asyncio.run_coroutine_threadsafe(coro_yield.coro, asyncio.get_event_loop())
  170. elif coro_yield is not None:
  171. future = coro_yield
  172. if not self.session.closed() and hasattr(future, 'add_done_callback'):
  173. future.add_done_callback(self._tornado_future_callback)
  174. self.pending_futures[id(future)] = future
  175. def _tornado_future_callback(self, future):
  176. if not future.cancelled():
  177. del self.pending_futures[id(future)]
  178. self.step(future.result())
  179. def close(self):
  180. logger.debug('Task[%s] closed', self.coro_id)
  181. self.coro.close()
  182. while self.pending_futures:
  183. _, f = self.pending_futures.popitem()
  184. f.cancel()
  185. self.task_finished = True
  186. def __del__(self):
  187. if not self.task_finished:
  188. logger.warning('Task[%s] not finished when destroy', self.coro_id)