1
0

framework.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import logging
  2. import random
  3. import string
  4. import sys
  5. import time
  6. import traceback
  7. from collections import defaultdict
  8. from contextlib import contextmanager
  9. logger = logging.getLogger(__name__)
  10. class WebIOFuture:
  11. def __iter__(self):
  12. result = yield
  13. return result
  14. __await__ = __iter__ # make compatible with 'await' expression
  15. class WebIOSession:
  16. def __init__(self, coro_func, server_msg_listener=None):
  17. self._server_msg_listener = server_msg_listener or (lambda _: None)
  18. self.unhandled_server_msgs = []
  19. self.coros = {} # coro_id -> coro
  20. self._closed = False
  21. self.inactive_coro_instances = [] # 待激活的协程实例列表
  22. self.main_task = Task(coro_func(), ws=self)
  23. self.coros[self.main_task.coro_id] = self.main_task
  24. self._step_task(self.main_task)
  25. def _step_task(self, task, result=None):
  26. task.step(result)
  27. if task.task_finished:
  28. logger.debug('del self.coros[%s]', task.coro_id)
  29. del self.coros[task.coro_id]
  30. while self.inactive_coro_instances:
  31. coro = self.inactive_coro_instances.pop()
  32. sub_task = Task(coro, ws=self)
  33. self.coros[sub_task.coro_id] = sub_task
  34. sub_task.step()
  35. if sub_task.task_finished:
  36. logger.debug('del self.coros[%s]', sub_task.coro_id)
  37. del self.coros[sub_task.coro_id]
  38. if self.main_task.task_finished:
  39. self.close()
  40. def add_server_msg(self, message):
  41. self.unhandled_server_msgs.append(message)
  42. self._server_msg_listener(self)
  43. def add_client_msg(self, message):
  44. # data = json.loads(message)
  45. coro_id = message['coro_id']
  46. coro = self.coros.get(coro_id)
  47. if not coro:
  48. logger.error('coro not found, coro_id:%s', coro_id)
  49. return
  50. self._step_task(coro, message)
  51. def on_coro_error(self):
  52. from pywebio.output import put_markdown # todo
  53. type, value, tb = sys.exc_info()
  54. tb_len = len(list(traceback.walk_tb(tb)))
  55. lines = traceback.format_exception(type, value, tb, limit=1 - tb_len)
  56. traceback_msg = ''.join(lines)
  57. put_markdown("发生错误:\n```\n%s\n```" % traceback_msg)
  58. def _cleanup(self):
  59. for t in self.coros.values():
  60. t.cancel()
  61. self.coros = {} # delete session tasks
  62. while self.inactive_coro_instances:
  63. coro = self.inactive_coro_instances.pop()
  64. coro.close()
  65. def close(self):
  66. """关闭当前Session"""
  67. self._cleanup()
  68. self._closed = True
  69. # todo clean
  70. def closed(self):
  71. return self._closed
  72. class Task:
  73. @contextmanager
  74. def ws_context(self):
  75. """
  76. >>> with ws_context():
  77. ... res = self.coros[-1].send(data)
  78. """
  79. Global.active_ws = self.ws
  80. Global.active_coro_id = self.coro_id
  81. try:
  82. yield
  83. finally:
  84. Global.active_ws = None
  85. Global.active_coro_id = None
  86. @staticmethod
  87. def gen_coro_id(coro=None):
  88. name = 'coro'
  89. if hasattr(coro, '__name__'):
  90. name = coro.__name__
  91. random_str = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(10))
  92. return '%s-%s' % (name, random_str)
  93. def __init__(self, coro, ws):
  94. self.ws = ws
  95. self.coro = coro
  96. self.coro_id = None
  97. self.result = None
  98. self.task_finished = False # 任务完毕/取消
  99. self.coro_id = self.gen_coro_id(self.coro)
  100. self.pending_futures = {} # id(future) -> future
  101. logger.debug('Task[%s] created ', self.coro_id)
  102. def step(self, result=None):
  103. future_or_none = None
  104. with self.ws_context():
  105. try:
  106. future_or_none = self.coro.send(result)
  107. except StopIteration as e:
  108. if len(e.args) == 1:
  109. self.result = e.args[0]
  110. self.task_finished = True
  111. logger.debug('Task[%s] finished', self.coro_id)
  112. except Exception as e:
  113. self.ws.on_coro_error()
  114. if not isinstance(future_or_none, WebIOFuture) and future_or_none is not None:
  115. if not self.ws.closed():
  116. future_or_none.add_done_callback(self._tornado_future_callback)
  117. self.pending_futures[id(future_or_none)] = future_or_none
  118. def _tornado_future_callback(self, future):
  119. if not future.cancelled():
  120. del self.pending_futures[id(future)]
  121. self.step(future.result())
  122. def cancel(self):
  123. logger.debug('Task[%s] canceled', self.coro_id)
  124. self.coro.close()
  125. while self.pending_futures:
  126. _, f = self.pending_futures.popitem()
  127. f.cancel()
  128. self.task_finished = True
  129. def __del__(self):
  130. if not self.task_finished:
  131. logger.warning('Task[%s] not finished when destroy', self.coro_id)
  132. class Global:
  133. # todo issue: with 语句可能发生嵌套,导致内层with退出时,将属性置空
  134. active_ws = None # type:"WebIOController"
  135. active_coro_id = None