framework.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import tornado.websocket
  2. import time, json
  3. from collections import defaultdict
  4. from tornado.gen import coroutine, sleep
  5. import random, string
  6. from contextlib import contextmanager
  7. from tornado.log import gen_log
  8. from tornado import ioloop
  9. # from tornado.concurrent import Future
  10. class WebIOFuture:
  11. def __iter__(self):
  12. result = yield
  13. return result
  14. __await__ = __iter__ # make compatible with 'await' expression
  15. class Task:
  16. @contextmanager
  17. def ws_context(self):
  18. """
  19. >>> with ws_context():
  20. ... res = self.coros[-1].send(data)
  21. """
  22. Global.active_ws = self.ws
  23. Global.active_coro_id = self.coro_id
  24. try:
  25. yield
  26. finally:
  27. Global.active_ws = None
  28. Global.active_coro_id = None
  29. @staticmethod
  30. def gen_coro_id(coro=None):
  31. name = 'coro'
  32. if hasattr(coro, '__name__'):
  33. name = coro.__name__
  34. random_str = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.digits) for _ in range(10))
  35. return '%s-%s' % (name, random_str)
  36. def __init__(self, coro, ws):
  37. self.ws = ws
  38. self.coro = coro
  39. self.coro_id = None
  40. self.result = None
  41. self.task_finished = False # 协程完毕
  42. self.coro_id = self.gen_coro_id(self.coro)
  43. self.pending_futures = {} # id(future) -> future
  44. gen_log.debug('Task[%s] __init__ ', self.coro_id)
  45. def step(self, result=None):
  46. future_or_none = None
  47. with self.ws_context():
  48. try:
  49. future_or_none = self.coro.send(result)
  50. except StopIteration as e:
  51. if len(e.args) == 1:
  52. self.result = e.args[0]
  53. self.task_finished = True
  54. gen_log.debug('Task[%s] finished', self.coro_id)
  55. except Exception as e:
  56. self.ws.on_coro_error()
  57. if not isinstance(future_or_none, WebIOFuture) and future_or_none is not None:
  58. if not self.ws.closed():
  59. future_or_none.add_done_callback(self._tornado_future_callback)
  60. self.pending_futures[id(future_or_none)] = future_or_none
  61. def _tornado_future_callback(self, future):
  62. del self.pending_futures[id(future)]
  63. self.step(future.result())
  64. def cancel(self):
  65. gen_log.debug('Task[%s] canceled', self.coro_id)
  66. self.coro.close()
  67. while self.pending_futures:
  68. _, f = self.pending_futures.popitem()
  69. f.cancel()
  70. def __del__(self):
  71. if not self.task_finished:
  72. gen_log.warning('Task[%s] not finished when destroy', self.coro_id)
  73. class Msg:
  74. mid2callback = defaultdict(list)
  75. @staticmethod
  76. def gen_msg_id():
  77. mid = '%s-%s' % (Global.active_ws.sid, int(time.time()))
  78. return mid
  79. @classmethod
  80. def add_callback(cls, msg_id, callback):
  81. cls.mid2callback[msg_id].append(callback)
  82. @classmethod
  83. def get_callbacks(cls, msg_id):
  84. return cls.mid2callback[msg_id]
  85. @classmethod
  86. def get_callbacks(cls, msg_id):
  87. return cls.mid2callback[msg_id]
  88. @classmethod
  89. def unregister_msg(cls, msg_id):
  90. del cls.mid2callback[msg_id]
  91. class Global:
  92. # todo issue: with 语句可能发生嵌套,导致内层with退出时,将属性置空
  93. active_ws = None # type:"EchoWebSocket"
  94. active_coro_id = None