framework.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import random
  2. import string
  3. import sys
  4. import time
  5. import traceback
  6. from collections import defaultdict
  7. from contextlib import contextmanager
  8. from tornado.log import gen_log
  9. class WebIOFuture:
  10. def __iter__(self):
  11. result = yield
  12. return result
  13. __await__ = __iter__ # make compatible with 'await' expression
  14. class WebIOSession:
  15. def __init__(self, coro_func, server_msg_listener=None):
  16. self._server_msg_listener = server_msg_listener or (lambda _: None)
  17. self.unhandled_server_msgs = []
  18. self.coros = {} # coro_id -> coro
  19. self._closed = False
  20. self.inactive_coro_instances = [] # 待激活的协程实例列表
  21. self.main_task = Task(coro_func(), ws=self)
  22. self.coros[self.main_task.coro_id] = self.main_task
  23. self._step_task(self.main_task)
  24. def _step_task(self, task, result=None):
  25. task.step(result)
  26. if task.task_finished:
  27. gen_log.debug('del self.coros[%s]', task.coro_id)
  28. del self.coros[task.coro_id]
  29. while self.inactive_coro_instances:
  30. coro = self.inactive_coro_instances.pop()
  31. sub_task = Task(coro, ws=self)
  32. self.coros[sub_task.coro_id] = sub_task
  33. sub_task.step()
  34. if sub_task.task_finished:
  35. gen_log.debug('del self.coros[%s]', sub_task.coro_id)
  36. del self.coros[sub_task.coro_id]
  37. if self.main_task.task_finished:
  38. self.close()
  39. def add_server_msg(self, message):
  40. self.unhandled_server_msgs.append(message)
  41. self._server_msg_listener(self)
  42. def add_client_msg(self, message):
  43. # data = json.loads(message)
  44. coro_id = message['coro_id']
  45. coro = self.coros.get(coro_id)
  46. if not coro:
  47. gen_log.error('coro not found, coro_id:%s', coro_id)
  48. return
  49. self._step_task(coro, message)
  50. def on_coro_error(self):
  51. from pywebio.output import put_markdown # todo
  52. type, value, tb = sys.exc_info()
  53. tb_len = len(list(traceback.walk_tb(tb)))
  54. lines = traceback.format_exception(type, value, tb, limit=1 - tb_len)
  55. traceback_msg = ''.join(lines)
  56. put_markdown("发生错误:\n```\n%s\n```" % traceback_msg)
  57. def _cleanup(self):
  58. for t in self.coros.values():
  59. t.cancel()
  60. self.coros = {} # delete session tasks
  61. while self.inactive_coro_instances:
  62. coro = self.inactive_coro_instances.pop()
  63. coro.close()
  64. def close(self):
  65. """关闭当前Session"""
  66. self._cleanup()
  67. self._closed = True
  68. # todo clean
  69. def closed(self):
  70. return self._closed
  71. class Task:
  72. @contextmanager
  73. def ws_context(self):
  74. """
  75. >>> with ws_context():
  76. ... res = self.coros[-1].send(data)
  77. """
  78. Global.active_ws = self.ws
  79. Global.active_coro_id = self.coro_id
  80. try:
  81. yield
  82. finally:
  83. Global.active_ws = None
  84. Global.active_coro_id = None
  85. @staticmethod
  86. def gen_coro_id(coro=None):
  87. name = 'coro'
  88. if hasattr(coro, '__name__'):
  89. name = coro.__name__
  90. random_str = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(10))
  91. return '%s-%s' % (name, random_str)
  92. def __init__(self, coro, ws):
  93. self.ws = ws
  94. self.coro = coro
  95. self.coro_id = None
  96. self.result = None
  97. self.task_finished = False # 任务完毕/取消
  98. self.coro_id = self.gen_coro_id(self.coro)
  99. self.pending_futures = {} # id(future) -> future
  100. gen_log.debug('Task[%s] created ', self.coro_id)
  101. def step(self, result=None):
  102. future_or_none = None
  103. with self.ws_context():
  104. try:
  105. future_or_none = self.coro.send(result)
  106. except StopIteration as e:
  107. if len(e.args) == 1:
  108. self.result = e.args[0]
  109. self.task_finished = True
  110. gen_log.debug('Task[%s] finished', self.coro_id)
  111. except Exception as e:
  112. self.ws.on_coro_error()
  113. if not isinstance(future_or_none, WebIOFuture) and future_or_none is not None:
  114. if not self.ws.closed():
  115. future_or_none.add_done_callback(self._tornado_future_callback)
  116. self.pending_futures[id(future_or_none)] = future_or_none
  117. def _tornado_future_callback(self, future):
  118. if not future.cancelled():
  119. del self.pending_futures[id(future)]
  120. self.step(future.result())
  121. def cancel(self):
  122. gen_log.debug('Task[%s] canceled', self.coro_id)
  123. self.coro.close()
  124. while self.pending_futures:
  125. _, f = self.pending_futures.popitem()
  126. f.cancel()
  127. self.task_finished = True
  128. def __del__(self):
  129. if not self.task_finished:
  130. gen_log.warning('Task[%s] not finished when destroy', self.coro_id)
  131. class Msg:
  132. mid2callback = defaultdict(list)
  133. @staticmethod
  134. def gen_msg_id():
  135. mid = '%s-%s' % (Global.active_ws.sid, int(time.time()))
  136. return mid
  137. @classmethod
  138. def add_callback(cls, msg_id, callback):
  139. cls.mid2callback[msg_id].append(callback)
  140. @classmethod
  141. def get_callbacks(cls, msg_id):
  142. return cls.mid2callback[msg_id]
  143. @classmethod
  144. def get_callbacks(cls, msg_id):
  145. return cls.mid2callback[msg_id]
  146. @classmethod
  147. def unregister_msg(cls, msg_id):
  148. del cls.mid2callback[msg_id]
  149. class Global:
  150. # todo issue: with 语句可能发生嵌套,导致内层with退出时,将属性置空
  151. active_ws = None # type:"WebIOController"
  152. active_coro_id = None