Просмотр исходного кода

`run_async` return TaskHandle, which can be used later to close the task

wangweimin 5 лет назад
Родитель
Сommit
72eec915c5
2 измененных файлов с 39 добавлено и 22 удалено
  1. 2 1
      pywebio/session/__init__.py
  2. 37 21
      pywebio/session/coroutinebased.py

+ 2 - 1
pywebio/session/__init__.py

@@ -100,8 +100,9 @@ def run_async(coro_obj):
     """异步运行协程对象。协程中依然可以调用 PyWebIO 交互函数。 仅能在 CoroutineBasedSession 会话上下文中调用
 
     :param coro_obj: 协程对象
+    :return: An instance of  `TaskHandle <pywebio.session.coroutinebased.TaskHandle>` is returned, which can be used later to close the task.
     """
-    get_current_session().run_async(coro_obj)
+    return get_current_session().run_async(coro_obj)
 
 
 @check_session_impl(CoroutineBasedSession)

+ 37 - 21
pywebio/session/coroutinebased.py

@@ -64,7 +64,6 @@ class CoroutineBasedSession(AbstractSession):
         self.coros = {}  # coro_task_id -> coro
 
         self._closed = False
-        self.inactive_coro_instances = []  # 待激活的协程实例列表
         self._not_closed_coro_cnt = 1  # 当前会话未结束运行的协程数量。当 self._not_closed_coro_cnt == 0 时,会话结束。
 
         main_task = Task(target(), session=self, on_coro_stop=self._on_task_finish)
@@ -74,22 +73,14 @@ class CoroutineBasedSession(AbstractSession):
 
     def _step_task(self, task, result=None):
         task.step(result)
-        if task.task_closed and task.coro_id in self.coros:
-            # 若task 为main task,则 task.step(result) 结束后,可能task已经结束,self.coros已被清理
+
+    def _on_task_finish(self, task: "Task"):
+        self._not_closed_coro_cnt -= 1
+
+        if task.coro_id in self.coros:
             logger.debug('del self.coros[%s]', task.coro_id)
             del self.coros[task.coro_id]
 
-        while self.inactive_coro_instances:
-            coro = self.inactive_coro_instances.pop()
-            sub_task = Task(coro, session=self, on_coro_stop=self._on_task_finish)
-            self.coros[sub_task.coro_id] = sub_task
-            sub_task.step()
-            if sub_task.task_closed:
-                logger.debug('del self.coros[%s]', sub_task.coro_id)
-                del self.coros[sub_task.coro_id]
-
-    def _on_task_finish(self):
-        self._not_closed_coro_cnt -= 1
         if self._not_closed_coro_cnt <= 0:
             self.send_task_command(dict(command='close_session'))
             self._on_session_close()
@@ -130,10 +121,6 @@ class CoroutineBasedSession(AbstractSession):
             t.close()
         self.coros = {}  # delete session tasks
 
-        while self.inactive_coro_instances:
-            coro = self.inactive_coro_instances.pop()
-            coro.close()
-
     def close(self):
         """关闭当前Session。由Backend调用"""
         self._cleanup()
@@ -194,17 +181,38 @@ class CoroutineBasedSession(AbstractSession):
         """异步运行协程对象。可以在协程内调用 PyWebIO 交互函数
 
         :param coro_obj: 协程对象
+        :return: An instance of  `TaskHandle` is returned, which can be used later to close the task.
         """
-        self.inactive_coro_instances.append(coro_obj)
         self._not_closed_coro_cnt += 1
 
+        task = Task(coro_obj, session=self, on_coro_stop=self._on_task_finish)
+        self.coros[task.coro_id] = task
+        asyncio.get_event_loop().call_soon(task.step)
+        return task.task_handle()
+
     async def run_asyncio_coroutine(self, coro_obj):
         """若会话线程和运行事件的线程不是同一个线程,需要用 asyncio_coroutine 来运行asyncio中的协程"""
         res = await WebIOFuture(coro=coro_obj)
         return res
 
 
+class TaskHandle:
+
+    def __init__(self, close, closed):
+        self._close = close
+        self._closed = closed
+
+    def close(self):
+        """关闭任务"""
+        return self._close()
+
+    def closed(self):
+        """返回任务是否关闭"""
+        return self._closed()
+
+
 class Task:
+
     @contextmanager
     def session_context(self):
         """
@@ -253,11 +261,11 @@ class Task:
                     self.result = e.args[0]
                 self.task_closed = True
                 logger.debug('Task[%s] finished', self.coro_id)
-                self.on_coro_stop()
+                self.on_coro_stop(self)
             except Exception as e:
                 self.session.on_task_exception()
                 self.task_closed = True
-                self.on_coro_stop()
+                self.on_coro_stop(self)
 
         future = None
         if isinstance(coro_yield, WebIOFuture):
@@ -275,6 +283,9 @@ class Task:
             self.step(future.result())
 
     def close(self):
+        if self.task_closed:
+            return
+
         logger.debug('Task[%s] closed', self.coro_id)
         self.coro.close()
         while self.pending_futures:
@@ -282,7 +293,12 @@ class Task:
             f.cancel()
 
         self.task_closed = True
+        self.on_coro_stop(self)
 
     def __del__(self):
         if not self.task_closed:
             logger.warning('Task[%s] not finished when destroy', self.coro_id)
+
+    def task_handle(self):
+        handle = TaskHandle(close=self.close, closed=lambda: self.task_closed)
+        return handle