from __future__ import annotations import asyncio from collections import defaultdict, deque from typing import TYPE_CHECKING, Any, DefaultDict, Deque, Dict, Optional, Tuple from . import globals # pylint: disable=redefined-builtin if TYPE_CHECKING: from .element import Element ClientId = str ElementId = int MessageType = str Message = Tuple[ClientId, MessageType, Any] update_queue: DefaultDict[ClientId, Dict[ElementId, Optional[Element]]] = defaultdict(dict) message_queue: Deque[Message] = deque() def enqueue_update(element: Element) -> None: """Enqueue an update for the given element.""" update_queue[element.client.id][element.id] = element def enqueue_delete(element: Element) -> None: """Enqueue a deletion for the given element.""" update_queue[element.client.id][element.id] = None def enqueue_message(message_type: MessageType, data: Any, target_id: ClientId) -> None: """Enqueue a message for the given client.""" message_queue.append((target_id, message_type, data)) async def _emit(message_type: MessageType, data: Any, target_id: ClientId) -> None: await globals.sio.emit(message_type, data, room=target_id) if _is_target_on_air(target_id): assert globals.air is not None await globals.air.emit(message_type, data, room=target_id) async def loop() -> None: """Emit queued updates and messages in an endless loop.""" while True: if not update_queue and not message_queue: await asyncio.sleep(0.01) continue coros = [] try: for client_id, elements in update_queue.items(): data = { element_id: None if element is None else element._to_dict() # pylint: disable=protected-access for element_id, element in elements.items() } coros.append(_emit('update', data, client_id)) update_queue.clear() for target_id, message_type, data in message_queue: coros.append(_emit(message_type, data, target_id)) message_queue.clear() for coro in coros: try: await coro except Exception as e: globals.app.handle_exception(e) except Exception as e: globals.app.handle_exception(e) await asyncio.sleep(0.1) def _is_target_on_air(target_id: str) -> bool: if target_id in globals.clients: return globals.clients[target_id].on_air return target_id in globals.sio.manager.rooms