outbox.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. import asyncio
  2. from collections import defaultdict, deque
  3. from typing import TYPE_CHECKING, Any, DefaultDict, Deque, Dict, Tuple
  4. from . import globals
  5. if TYPE_CHECKING:
  6. from .element import Element
  7. ClientId = str
  8. ElementId = int
  9. MessageType = str
  10. Message = Tuple[ClientId, MessageType, Any]
  11. update_queue: DefaultDict[ClientId, Dict[ElementId, 'Element']] = defaultdict(dict)
  12. message_queue: Deque[Message] = deque()
  13. def enqueue_update(element: 'Element') -> None:
  14. update_queue[element.client.id][element.id] = element
  15. def enqueue_message(message_type: 'MessageType', data: Any, client_id: 'ClientId') -> None:
  16. message_queue.append((client_id, message_type, data))
  17. async def loop() -> None:
  18. while True:
  19. if not update_queue and not message_queue:
  20. await asyncio.sleep(0.01)
  21. continue
  22. coros = []
  23. try:
  24. for client_id, elements in update_queue.items():
  25. data = {element_id: element._to_dict() for element_id, element in elements.items()}
  26. coros.append(globals.sio.emit('update', data, room=client_id))
  27. update_queue.clear()
  28. for client_id, message_type, data in message_queue:
  29. coros.append(globals.sio.emit(message_type, data, room=client_id))
  30. message_queue.clear()
  31. for coro in coros:
  32. try:
  33. await coro
  34. except Exception as e:
  35. globals.handle_exception(e)
  36. except Exception as e:
  37. globals.handle_exception(e)
  38. await asyncio.sleep(0.1)