|
@@ -23,35 +23,51 @@ class Outbox:
|
|
|
self.updates: Dict[ElementId, Optional[Element]] = {}
|
|
|
self.messages: Deque[Message] = deque()
|
|
|
self._should_stop = False
|
|
|
+ self._enqueue_event: Optional[asyncio.Event] = None
|
|
|
if core.app.is_started:
|
|
|
background_tasks.create(self.loop(), name=f'outbox loop {client.id}')
|
|
|
else:
|
|
|
core.app.on_startup(self.loop)
|
|
|
|
|
|
+ def _set_enqueue_event(self) -> None:
|
|
|
+ """Set the enqueue event while accounting for lazy initialization."""
|
|
|
+ if self._enqueue_event:
|
|
|
+ self._enqueue_event.set()
|
|
|
+
|
|
|
def enqueue_update(self, element: Element) -> None:
|
|
|
"""Enqueue an update for the given element."""
|
|
|
self.updates[element.id] = element
|
|
|
+ self._set_enqueue_event()
|
|
|
|
|
|
def enqueue_delete(self, element: Element) -> None:
|
|
|
"""Enqueue a deletion for the given element."""
|
|
|
self.updates[element.id] = None
|
|
|
+ self._set_enqueue_event()
|
|
|
|
|
|
def enqueue_message(self, message_type: MessageType, data: Any, target_id: ClientId) -> None:
|
|
|
"""Enqueue a message for the given client."""
|
|
|
self.messages.append((target_id, message_type, data))
|
|
|
+ self._set_enqueue_event()
|
|
|
|
|
|
async def loop(self) -> None:
|
|
|
"""Send updates and messages to all clients in an endless loop."""
|
|
|
+ self._enqueue_event = asyncio.Event()
|
|
|
+ self._enqueue_event.set()
|
|
|
+
|
|
|
while not self._should_stop:
|
|
|
try:
|
|
|
- await asyncio.sleep(0.01)
|
|
|
-
|
|
|
- if not self.updates and not self.messages:
|
|
|
- continue
|
|
|
+ if not self._enqueue_event.is_set():
|
|
|
+ try:
|
|
|
+ await asyncio.wait_for(self._enqueue_event.wait(), timeout=1.0)
|
|
|
+ except (TimeoutError, asyncio.TimeoutError):
|
|
|
+ continue
|
|
|
|
|
|
if not self.client.has_socket_connection:
|
|
|
+ await asyncio.sleep(0.1)
|
|
|
continue
|
|
|
|
|
|
+ self._enqueue_event.clear()
|
|
|
+
|
|
|
coros = []
|
|
|
data = {
|
|
|
element_id: None if element is None else element._to_dict() # pylint: disable=protected-access
|