|
@@ -30,14 +30,10 @@ class _context:
|
|
|
|
|
|
class CoroutineBasedSession(AbstractSession):
|
|
class CoroutineBasedSession(AbstractSession):
|
|
"""
|
|
"""
|
|
- 一个PyWebIO任务会话, 由不同的后端Backend创建并维护
|
|
|
|
|
|
+ 基于协程的任务会话
|
|
|
|
|
|
- WebIOSession是不同的后端Backend与协程交互的桥梁:
|
|
|
|
- 后端Backend在接收到用户浏览器的数据后,会通过调用 ``send_client_event`` 来通知会话,进而由Session驱动协程的运行。
|
|
|
|
- Task内在调用输入输出函数后,会调用 ``send_task_command`` 向会话发送输入输出消息指令, Session将其保存并留给后端Backend处理。
|
|
|
|
-
|
|
|
|
- .. note::
|
|
|
|
- 后端Backend在相应on_session_close时关闭连接时,需要保证会话内的所有消息都传送到了客户端
|
|
|
|
|
|
+ 当主协程任务和会话内所有通过 `run_async` 注册的协程都退出后,会话关闭。
|
|
|
|
+ 当用户浏览器主动关闭会话,CoroutineBasedSession.close 被调用, 协程任务和会话内所有通过 `run_async` 注册的协程都被关闭。
|
|
"""
|
|
"""
|
|
|
|
|
|
@staticmethod
|
|
@staticmethod
|
|
@@ -69,32 +65,35 @@ class CoroutineBasedSession(AbstractSession):
|
|
|
|
|
|
self._closed = False
|
|
self._closed = False
|
|
self.inactive_coro_instances = [] # 待激活的协程实例列表
|
|
self.inactive_coro_instances = [] # 待激活的协程实例列表
|
|
|
|
+ self._not_closed_coro_cnt = 1 # 当前会话未结束运行的协程数量。当 self._not_closed_coro_cnt == 0 时,会话结束。
|
|
|
|
|
|
- self.main_task = Task(target(), session=self, on_coro_stop=self._on_main_task_finish)
|
|
|
|
- self.coros[self.main_task.coro_id] = self.main_task
|
|
|
|
|
|
+ main_task = Task(target(), session=self, on_coro_stop=self._on_task_finish)
|
|
|
|
+ self.coros[main_task.coro_id] = main_task
|
|
|
|
|
|
- self._step_task(self.main_task)
|
|
|
|
|
|
+ self._step_task(main_task)
|
|
|
|
|
|
def _step_task(self, task, result=None):
|
|
def _step_task(self, task, result=None):
|
|
task.step(result)
|
|
task.step(result)
|
|
- if task.task_finished and task.coro_id in self.coros:
|
|
|
|
|
|
+ if task.task_closed and task.coro_id in self.coros:
|
|
# 若task 为main task,则 task.step(result) 结束后,可能task已经结束,self.coros已被清理
|
|
# 若task 为main task,则 task.step(result) 结束后,可能task已经结束,self.coros已被清理
|
|
logger.debug('del self.coros[%s]', task.coro_id)
|
|
logger.debug('del self.coros[%s]', task.coro_id)
|
|
del self.coros[task.coro_id]
|
|
del self.coros[task.coro_id]
|
|
|
|
|
|
- while self.inactive_coro_instances and not self.main_task.task_finished:
|
|
|
|
|
|
+ while self.inactive_coro_instances:
|
|
coro = self.inactive_coro_instances.pop()
|
|
coro = self.inactive_coro_instances.pop()
|
|
- sub_task = Task(coro, session=self)
|
|
|
|
|
|
+ sub_task = Task(coro, session=self, on_coro_stop=self._on_task_finish)
|
|
self.coros[sub_task.coro_id] = sub_task
|
|
self.coros[sub_task.coro_id] = sub_task
|
|
sub_task.step()
|
|
sub_task.step()
|
|
- if sub_task.task_finished:
|
|
|
|
|
|
+ if sub_task.task_closed:
|
|
logger.debug('del self.coros[%s]', sub_task.coro_id)
|
|
logger.debug('del self.coros[%s]', sub_task.coro_id)
|
|
del self.coros[sub_task.coro_id]
|
|
del self.coros[sub_task.coro_id]
|
|
|
|
|
|
- def _on_main_task_finish(self):
|
|
|
|
- self.send_task_command(dict(command='close_session'))
|
|
|
|
- self._on_session_close()
|
|
|
|
- self.close()
|
|
|
|
|
|
+ def _on_task_finish(self):
|
|
|
|
+ self._not_closed_coro_cnt -= 1
|
|
|
|
+ if self._not_closed_coro_cnt <= 0:
|
|
|
|
+ self.send_task_command(dict(command='close_session'))
|
|
|
|
+ self._on_session_close()
|
|
|
|
+ self.close()
|
|
|
|
|
|
def send_task_command(self, command):
|
|
def send_task_command(self, command):
|
|
"""向会话发送来自协程内的消息
|
|
"""向会话发送来自协程内的消息
|
|
@@ -192,7 +191,12 @@ class CoroutineBasedSession(AbstractSession):
|
|
return callback_task.coro_id
|
|
return callback_task.coro_id
|
|
|
|
|
|
def run_async(self, coro_obj):
|
|
def run_async(self, coro_obj):
|
|
|
|
+ """异步运行协程对象。可以在协程内调用 PyWebIO 交互函数
|
|
|
|
+
|
|
|
|
+ :param coro_obj: 协程对象
|
|
|
|
+ """
|
|
self.inactive_coro_instances.append(coro_obj)
|
|
self.inactive_coro_instances.append(coro_obj)
|
|
|
|
+ self._not_closed_coro_cnt += 1
|
|
|
|
|
|
async def run_asyncio_coroutine(self, coro_obj):
|
|
async def run_asyncio_coroutine(self, coro_obj):
|
|
"""若会话线程和运行事件的线程不是同一个线程,需要用 asyncio_coroutine 来运行asyncio中的协程"""
|
|
"""若会话线程和运行事件的线程不是同一个线程,需要用 asyncio_coroutine 来运行asyncio中的协程"""
|
|
@@ -230,7 +234,7 @@ class Task:
|
|
self.coro = coro
|
|
self.coro = coro
|
|
self.coro_id = None
|
|
self.coro_id = None
|
|
self.result = None
|
|
self.result = None
|
|
- self.task_finished = False # 任务完毕/取消
|
|
|
|
|
|
+ self.task_closed = False # 任务完毕/取消
|
|
self.on_coro_stop = on_coro_stop or (lambda: None)
|
|
self.on_coro_stop = on_coro_stop or (lambda: None)
|
|
|
|
|
|
self.coro_id = self.gen_coro_id(self.coro)
|
|
self.coro_id = self.gen_coro_id(self.coro)
|
|
@@ -247,11 +251,13 @@ class Task:
|
|
except StopIteration as e:
|
|
except StopIteration as e:
|
|
if len(e.args) == 1:
|
|
if len(e.args) == 1:
|
|
self.result = e.args[0]
|
|
self.result = e.args[0]
|
|
- self.task_finished = True
|
|
|
|
|
|
+ self.task_closed = True
|
|
logger.debug('Task[%s] finished', self.coro_id)
|
|
logger.debug('Task[%s] finished', self.coro_id)
|
|
self.on_coro_stop()
|
|
self.on_coro_stop()
|
|
except Exception as e:
|
|
except Exception as e:
|
|
self.session.on_task_exception()
|
|
self.session.on_task_exception()
|
|
|
|
+ self.task_closed = True
|
|
|
|
+ self.on_coro_stop()
|
|
|
|
|
|
future = None
|
|
future = None
|
|
if isinstance(coro_yield, WebIOFuture):
|
|
if isinstance(coro_yield, WebIOFuture):
|
|
@@ -275,8 +281,8 @@ class Task:
|
|
_, f = self.pending_futures.popitem()
|
|
_, f = self.pending_futures.popitem()
|
|
f.cancel()
|
|
f.cancel()
|
|
|
|
|
|
- self.task_finished = True
|
|
|
|
|
|
+ self.task_closed = True
|
|
|
|
|
|
def __del__(self):
|
|
def __del__(self):
|
|
- if not self.task_finished:
|
|
|
|
|
|
+ if not self.task_closed:
|
|
logger.warning('Task[%s] not finished when destroy', self.coro_id)
|
|
logger.warning('Task[%s] not finished when destroy', self.coro_id)
|