outbox.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. from __future__ import annotations
  2. import asyncio
  3. import time
  4. from collections import defaultdict, deque
  5. from dataclasses import dataclass, field
  6. from typing import TYPE_CHECKING, Any, DefaultDict, Deque, Dict, List, Optional, Tuple
  7. from . import core
  8. from .dataclasses import KWONLY_SLOTS
  9. if TYPE_CHECKING:
  10. from .air import Air
  11. from .client import Client
  12. from .element import Element
  13. @dataclass(**KWONLY_SLOTS)
  14. class DelayedUpdate:
  15. time: float = 0
  16. data: Dict[ElementId, Optional[Dict]] = field(default_factory=dict)
  17. @dataclass(**KWONLY_SLOTS)
  18. class DelayedMessage:
  19. time: float
  20. target_id: str
  21. message_type: str
  22. data: Any
  23. ClientId = str
  24. ElementId = int
  25. MessageType = str
  26. Message = Tuple[ClientId, MessageType, Any]
  27. waiting_updates: DefaultDict[ClientId, Dict[ElementId, Optional[Element]]] = defaultdict(dict)
  28. delayed_updates: DefaultDict[ClientId, DelayedUpdate] = defaultdict(DelayedUpdate)
  29. waiting_messages: Deque[Message] = deque()
  30. delayed_messages: List[DelayedMessage] = []
  31. def enqueue_update(element: Element) -> None:
  32. """Enqueue an update for the given element."""
  33. waiting_updates[element.client.id][element.id] = element
  34. def enqueue_delete(element: Element) -> None:
  35. """Enqueue a deletion for the given element."""
  36. waiting_updates[element.client.id][element.id] = None
  37. def enqueue_message(message_type: MessageType, data: Any, target_id: ClientId) -> None:
  38. """Enqueue a message for the given client."""
  39. waiting_messages.append((target_id, message_type, data))
  40. async def loop(air: Optional[Air], clients: Dict[str, Client]) -> None:
  41. """Emit queued updates and messages in an endless loop."""
  42. async def emit(message_type: MessageType, data: Any, target_id: ClientId) -> None:
  43. await core.sio.emit(message_type, data, room=target_id)
  44. if air is not None and air.is_air_target(target_id):
  45. await air.emit(message_type, data, room=target_id)
  46. while True:
  47. await asyncio.sleep(0.01)
  48. if not delayed_updates and not waiting_updates and not delayed_messages and not waiting_messages:
  49. continue
  50. coros = []
  51. try:
  52. # process delayed_updates
  53. for client_id in list(delayed_updates):
  54. update = delayed_updates[client_id]
  55. client = clients.get(client_id)
  56. if client is None or client.has_socket_connection:
  57. coros.append(emit('update', update.data, client_id))
  58. delayed_updates.pop(client_id)
  59. elif time.time() > update.time + 3.0:
  60. delayed_updates.pop(client_id)
  61. # process waiting_updates
  62. for client_id, elements in waiting_updates.items():
  63. data = {
  64. element_id: None if element is None else element._to_dict() # pylint: disable=protected-access
  65. for element_id, element in elements.items()
  66. }
  67. client = clients.get(client_id)
  68. if client is None or client.has_socket_connection:
  69. coros.append(emit('update', data, client_id))
  70. else:
  71. delayed_updates[client_id].time = time.time()
  72. delayed_updates[client_id].data.update(data)
  73. waiting_updates.clear()
  74. # process delayed_messages
  75. for i, message in enumerate(list(delayed_messages)):
  76. client = clients.get(message.target_id)
  77. if client is None or client.has_socket_connection:
  78. coros.append(emit(message.message_type, message.data, message.target_id))
  79. delayed_messages.pop(i)
  80. elif time.time() > message.time + 3.0:
  81. delayed_messages.pop(i)
  82. # process waiting_messages
  83. for target_id, message_type, data in waiting_messages:
  84. client = clients.get(target_id)
  85. if client is None or client.has_socket_connection:
  86. coros.append(emit(message_type, data, target_id))
  87. else:
  88. message = DelayedMessage(time=time.time(),
  89. target_id=target_id, message_type=message_type, data=data)
  90. delayed_messages.append(message)
  91. waiting_messages.clear()
  92. # run coroutines
  93. for coro in coros:
  94. try:
  95. await coro
  96. except Exception as e:
  97. core.app.handle_exception(e)
  98. except Exception as e:
  99. core.app.handle_exception(e)
  100. await asyncio.sleep(0.1)