123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- from __future__ import annotations
- import asyncio
- from collections import defaultdict, deque
- from typing import TYPE_CHECKING, Any, DefaultDict, Deque, Dict, Optional, Tuple
- from . import globals # pylint: disable=redefined-builtin
- if TYPE_CHECKING:
- from .element import Element
- ClientId = str
- ElementId = int
- MessageType = str
- Message = Tuple[ClientId, MessageType, Any]
- update_queue: DefaultDict[ClientId, Dict[ElementId, Optional[Element]]] = defaultdict(dict)
- message_queue: Deque[Message] = deque()
- def enqueue_update(element: Element) -> None:
- """Enqueue an update for the given element."""
- update_queue[element.client.id][element.id] = element
- def enqueue_delete(element: Element) -> None:
- """Enqueue a deletion for the given element."""
- update_queue[element.client.id][element.id] = None
- def enqueue_message(message_type: MessageType, data: Any, target_id: ClientId) -> None:
- """Enqueue a message for the given client."""
- message_queue.append((target_id, message_type, data))
- async def _emit(message_type: MessageType, data: Any, target_id: ClientId) -> None:
- await globals.sio.emit(message_type, data, room=target_id)
- if _is_target_on_air(target_id):
- assert globals.air is not None
- await globals.air.emit(message_type, data, room=target_id)
- async def loop() -> None:
- """Emit queued updates and messages in an endless loop."""
- while True:
- if not update_queue and not message_queue:
- await asyncio.sleep(0.01)
- continue
- coros = []
- try:
- for client_id, elements in update_queue.items():
- data = {
- element_id: None if element is None else element._to_dict() # pylint: disable=protected-access
- for element_id, element in elements.items()
- }
- coros.append(_emit('update', data, client_id))
- update_queue.clear()
- for target_id, message_type, data in message_queue:
- coros.append(_emit(message_type, data, target_id))
- message_queue.clear()
- for coro in coros:
- try:
- await coro
- except Exception as e:
- globals.app.handle_exception(e)
- except Exception as e:
- globals.app.handle_exception(e)
- await asyncio.sleep(0.1)
- def _is_target_on_air(target_id: str) -> bool:
- if target_id in globals.clients:
- return globals.clients[target_id].on_air
- return target_id in globals.sio.manager.rooms
|