1
0

tornado.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. import json
  2. from collections import OrderedDict
  3. import tornado
  4. import tornado.websocket
  5. from tornado.gen import coroutine
  6. from tornado.log import gen_log
  7. from ..framework import Task
  8. from .. import project_dir
  9. import sys, traceback
  10. from ..output import put_markdown
  11. STATIC_PATH = '%s/html' % project_dir
  12. def ws_handler(coro_func, debug=True):
  13. class WSHandler(tornado.websocket.WebSocketHandler):
  14. def check_origin(self, origin):
  15. return True
  16. def get_compression_options(self):
  17. # Non-None enables compression with default options.
  18. return {}
  19. @coroutine
  20. def open(self):
  21. print("WebSocket opened")
  22. self.set_nodelay(True)
  23. ############
  24. self.coros = {} # coro_id -> coro
  25. # self.callbacks = OrderedDict() # UI元素时的回调, callback_id -> (coro, save)
  26. # self.mark2id = {} # mark_name -> mark_id
  27. self._closed = False
  28. self.inactive_coro_instances = [] # 待激活的协程实例列表
  29. self.main_task = Task(coro_func(), ws=self)
  30. self.coros[self.main_task.coro_id] = self.main_task
  31. self.step_task(self.main_task)
  32. def step_task(self, task, result=None):
  33. task.step(result)
  34. if task.task_finished:
  35. gen_log.debug('del self.coros[%s]', task.coro_id)
  36. del self.coros[task.coro_id]
  37. while self.inactive_coro_instances:
  38. coro = self.inactive_coro_instances.pop()
  39. task = Task(coro, ws=self)
  40. self.coros[task.coro_id] = task
  41. task.step()
  42. if self.coros[task.coro_id].task_finished:
  43. gen_log.debug('del self.coros[%s]', task.coro_id)
  44. del self.coros[task.coro_id]
  45. if self.main_task.task_finished:
  46. for t in self.coros:
  47. t.cancel()
  48. self.close()
  49. def on_message(self, message):
  50. # print('on_message', message)
  51. data = json.loads(message)
  52. coro_id = data['coro_id']
  53. coro = self.coros.get(coro_id)
  54. if not coro:
  55. gen_log.error('coro not found, coro_id:%s', coro_id)
  56. return
  57. self.step_task(coro, data)
  58. def on_coro_error(self):
  59. type, value, tb = sys.exc_info()
  60. tb_len = len(list(traceback.walk_tb(tb)))
  61. lines = traceback.format_exception(type, value, tb, limit=1 - tb_len)
  62. traceback_msg = ''.join(lines)
  63. put_markdown("发生错误:\n```\n%s\n```" % traceback_msg)
  64. def on_close(self):
  65. self._closed = True
  66. print("WebSocket closed")
  67. def closed(self):
  68. return self._closed
  69. return WSHandler