outbox.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. from __future__ import annotations
  2. import asyncio
  3. from collections import defaultdict, deque
  4. from typing import TYPE_CHECKING, Any, DefaultDict, Deque, Dict, Optional, Tuple
  5. from . import globals # pylint: disable=redefined-builtin
  6. if TYPE_CHECKING:
  7. from .element import Element
  8. ClientId = str
  9. ElementId = int
  10. MessageType = str
  11. Message = Tuple[ClientId, MessageType, Any]
  12. update_queue: DefaultDict[ClientId, Dict[ElementId, Optional[Element]]] = defaultdict(dict)
  13. message_queue: Deque[Message] = deque()
  14. def enqueue_update(element: Element) -> None:
  15. """Enqueue an update for the given element."""
  16. update_queue[element.client.id][element.id] = element
  17. def enqueue_delete(element: Element) -> None:
  18. """Enqueue a deletion for the given element."""
  19. update_queue[element.client.id][element.id] = None
  20. def enqueue_message(message_type: MessageType, data: Any, target_id: ClientId) -> None:
  21. """Enqueue a message for the given client."""
  22. message_queue.append((target_id, message_type, data))
  23. async def _emit(message_type: MessageType, data: Any, target_id: ClientId) -> None:
  24. await globals.sio.emit(message_type, data, room=target_id)
  25. if _is_target_on_air(target_id):
  26. assert globals.air is not None
  27. await globals.air.emit(message_type, data, room=target_id)
  28. async def loop() -> None:
  29. """Emit queued updates and messages in an endless loop."""
  30. while True:
  31. if not update_queue and not message_queue:
  32. await asyncio.sleep(0.01)
  33. continue
  34. coros = []
  35. try:
  36. for client_id, elements in update_queue.items():
  37. data = {
  38. element_id: None if element is None else element._to_dict() # pylint: disable=protected-access
  39. for element_id, element in elements.items()
  40. }
  41. coros.append(_emit('update', data, client_id))
  42. update_queue.clear()
  43. for target_id, message_type, data in message_queue:
  44. coros.append(_emit(message_type, data, target_id))
  45. message_queue.clear()
  46. for coro in coros:
  47. try:
  48. await coro
  49. except Exception as e:
  50. globals.app.handle_exception(e)
  51. except Exception as e:
  52. globals.app.handle_exception(e)
  53. await asyncio.sleep(0.1)
  54. def _is_target_on_air(target_id: str) -> bool:
  55. if target_id in globals.clients:
  56. return globals.clients[target_id].on_air
  57. return target_id in globals.sio.manager.rooms