wangweimin преди 5 години
родител
ревизия
2c95242491
променени са 5 файла, в които са добавени 102 реда и са изтрити 74 реда
  1. 89 11
      pywebio/framework.py
  2. 0 1
      pywebio/input.py
  3. 3 3
      pywebio/input_ctrl.py
  4. 1 1
      pywebio/output.py
  5. 9 58
      pywebio/platform/tornado.py

+ 89 - 11
pywebio/framework.py

@@ -1,12 +1,12 @@
-import tornado.websocket
-import time, json
+import random
+import string
+import sys
+import time
+import traceback
 from collections import defaultdict
-from tornado.gen import coroutine, sleep
-import random, string
 from contextlib import contextmanager
+
 from tornado.log import gen_log
-from tornado import ioloop
-# from tornado.concurrent import Future
 
 
 class WebIOFuture:
@@ -17,6 +17,81 @@ class WebIOFuture:
     __await__ = __iter__  # make compatible with 'await' expression
 
 
+class WebIOSession:
+    def __init__(self, coro_func, server_msg_listener=None):
+        self._server_msg_listener = server_msg_listener or (lambda _: None)
+        self.unhandled_server_msgs = []
+
+        self.coros = {}  # coro_id -> coro
+
+        self._closed = False
+        self.inactive_coro_instances = []  # 待激活的协程实例列表
+
+        self.main_task = Task(coro_func(), ws=self)
+        self.coros[self.main_task.coro_id] = self.main_task
+
+        self._step_task(self.main_task)
+
+    def _step_task(self, task, result=None):
+        task.step(result)
+        if task.task_finished:
+            gen_log.debug('del self.coros[%s]', task.coro_id)
+            del self.coros[task.coro_id]
+
+        while self.inactive_coro_instances:
+            coro = self.inactive_coro_instances.pop()
+            sub_task = Task(coro, ws=self)
+            self.coros[sub_task.coro_id] = sub_task
+            sub_task.step()
+            if sub_task.task_finished:
+                gen_log.debug('del self.coros[%s]', sub_task.coro_id)
+                del self.coros[sub_task.coro_id]
+
+        if self.main_task.task_finished:
+            self.close()
+
+    def add_server_msg(self, message):
+        self.unhandled_server_msgs.append(message)
+        self._server_msg_listener(self)
+
+    def add_client_msg(self, message):
+        # data = json.loads(message)
+        coro_id = message['coro_id']
+        coro = self.coros.get(coro_id)
+        if not coro:
+            gen_log.error('coro not found, coro_id:%s', coro_id)
+            return
+
+        self._step_task(coro, message)
+
+    def on_coro_error(self):
+        from pywebio.output import put_markdown  # todo
+
+        type, value, tb = sys.exc_info()
+        tb_len = len(list(traceback.walk_tb(tb)))
+        lines = traceback.format_exception(type, value, tb, limit=1 - tb_len)
+        traceback_msg = ''.join(lines)
+        put_markdown("发生错误:\n```\n%s\n```" % traceback_msg)
+
+    def _cleanup(self):
+        for t in self.coros.values():
+            t.cancel()
+        self.coros = {}  # delete session tasks
+
+        while self.inactive_coro_instances:
+            coro = self.inactive_coro_instances.pop()
+            coro.close()
+
+    def close(self):
+        """关闭当前Session"""
+        self._cleanup()
+        self._closed = True
+        # todo clean
+
+    def closed(self):
+        return self._closed
+
+
 class Task:
     @contextmanager
     def ws_context(self):
@@ -46,13 +121,13 @@ class Task:
         self.coro = coro
         self.coro_id = None
         self.result = None
-        self.task_finished = False  # 协程完毕
+        self.task_finished = False  # 任务完毕/取消
 
         self.coro_id = self.gen_coro_id(self.coro)
 
         self.pending_futures = {}  # id(future) -> future
 
-        gen_log.debug('Task[%s] __init__ ', self.coro_id)
+        gen_log.debug('Task[%s] created ', self.coro_id)
 
     def step(self, result=None):
         future_or_none = None
@@ -73,8 +148,9 @@ class Task:
                 self.pending_futures[id(future_or_none)] = future_or_none
 
     def _tornado_future_callback(self, future):
-        del self.pending_futures[id(future)]
-        self.step(future.result())
+        if not future.cancelled():
+            del self.pending_futures[id(future)]
+            self.step(future.result())
 
     def cancel(self):
         gen_log.debug('Task[%s] canceled', self.coro_id)
@@ -83,6 +159,8 @@ class Task:
             _, f = self.pending_futures.popitem()
             f.cancel()
 
+        self.task_finished = True
+
     def __del__(self):
         if not self.task_finished:
             gen_log.warning('Task[%s] not finished when destroy', self.coro_id)
@@ -115,5 +193,5 @@ class Msg:
 
 class Global:
     # todo issue: with 语句可能发生嵌套,导致内层with退出时,将属性置空
-    active_ws = None  # type:"EchoWebSocket"
+    active_ws = None  # type:"WebIOController"
     active_coro_id = None

+ 0 - 1
pywebio/input.py

@@ -27,7 +27,6 @@ import json
 import logging
 from collections.abc import Mapping
 from base64 import b64decode
-from .framework import Global
 from .input_ctrl import send_msg, single_input, input_control
 from typing import Coroutine, Callable
 

+ 3 - 3
pywebio/input_ctrl.py

@@ -7,13 +7,13 @@ from tornado.log import gen_log
 logger = logging.getLogger(__name__)
 
 
-def run_async(coro):
-    Global.active_ws.inactive_coro_instances.append(coro)
+def run_async(coro_obj):
+    Global.active_ws.inactive_coro_instances.append(coro_obj)
 
 
 def send_msg(cmd, spec=None):
     msg = dict(command=cmd, spec=spec, coro_id=Global.active_coro_id)
-    Global.active_ws.write_message(json.dumps(msg))
+    Global.active_ws.add_server_msg(msg)
 
 
 async def next_event():

+ 1 - 1
pywebio/output.py

@@ -121,7 +121,7 @@ def _put_content(type, ws=None, anchor=None, before=None, after=None, **other_sp
         spec['after'] = _AnchorTPL % after
 
     msg = dict(command="output", spec=spec)
-    (ws or Global.active_ws).write_message(json.dumps(msg))
+    (ws or Global.active_ws).add_server_msg(msg)
 
 
 def put_text(text, inline=False, anchor=None, before=None, after=None):

+ 9 - 58
pywebio/platform/tornado.py

@@ -1,16 +1,9 @@
 import json
-from collections import OrderedDict
 
 import tornado
 import tornado.websocket
-from tornado.gen import coroutine
-from tornado.log import gen_log
-
-from ..framework import Task
-
 from .. import project_dir
-import sys, traceback
-from ..output import put_markdown
+from ..framework import WebIOSession
 
 STATIC_PATH = '%s/html' % project_dir
 
@@ -25,66 +18,24 @@ def ws_handler(coro_func, debug=True):
             # Non-None enables compression with default options.
             return {}
 
-        @coroutine
+        def on_server_msg(self, controller):
+            while controller.unhandled_server_msgs:
+                msg = controller.unhandled_server_msgs.pop()
+                self.write_message(json.dumps(msg))
+
         def open(self):
             print("WebSocket opened")
             self.set_nodelay(True)
-            ############
-            self.coros = {}  # coro_id -> coro
-            # self.callbacks = OrderedDict()  # UI元素时的回调, callback_id -> (coro, save)
-            # self.mark2id = {}  # mark_name -> mark_id
-
-            self._closed = False
-            self.inactive_coro_instances = []  # 待激活的协程实例列表
-
-            self.main_task = Task(coro_func(), ws=self)
-            self.coros[self.main_task.coro_id] = self.main_task
-
-            self.step_task(self.main_task)
 
-        def step_task(self, task, result=None):
-            task.step(result)
-            if task.task_finished:
-                gen_log.debug('del self.coros[%s]', task.coro_id)
-                del self.coros[task.coro_id]
-
-            while self.inactive_coro_instances:
-                coro = self.inactive_coro_instances.pop()
-                task = Task(coro, ws=self)
-                self.coros[task.coro_id] = task
-                task.step()
-                if self.coros[task.coro_id].task_finished:
-                    gen_log.debug('del self.coros[%s]', task.coro_id)
-                    del self.coros[task.coro_id]
-
-            if self.main_task.task_finished:
-                for t in self.coros:
-                    t.cancel()
-                self.close()
+            self.controller = WebIOSession(coro_func, server_msg_listener=self.on_server_msg)
 
         def on_message(self, message):
             # print('on_message', message)
             data = json.loads(message)
-            coro_id = data['coro_id']
-            coro = self.coros.get(coro_id)
-            if not coro:
-                gen_log.error('coro not found, coro_id:%s', coro_id)
-                return
-
-            self.step_task(coro, data)
-
-        def on_coro_error(self):
-            type, value, tb = sys.exc_info()
-            tb_len = len(list(traceback.walk_tb(tb)))
-            lines = traceback.format_exception(type, value, tb, limit=1 - tb_len)
-            traceback_msg = ''.join(lines)
-            put_markdown("发生错误:\n```\n%s\n```" % traceback_msg)
+            self.controller.add_client_msg(data)
 
         def on_close(self):
-            self._closed = True
+            self.controller.close()
             print("WebSocket closed")
 
-        def closed(self):
-            return self._closed
-
     return WSHandler