Преглед на файлове

Add message history and retransmission (#3199)

This PR attempts to resolve #3143 by adding a message history to
`outbox` and providing for the retransmission of missed messages in
order to resynchronize the client's state during a reconnection. If this
cannot be accomplished, a reload of the page is triggered. The goal of
this is to prevent a connected client's state from ever being out of
sync with the server.

For the auto-index page, a history duration of 30 seconds was
arbitrarily chosen. Since this value only determines when the UI is
updated through resending messages instead of a page reload, the UI
should stay properly synchronized regardless of this value.

For a `ui.page`, the history duration is computed based on the expected
lifetime of the `client` object. Currently, with the default
`reconnect_timeout = 3.0`, this is a max of 9 seconds. With this change,
a re-evaluation of this default could be warranted. Now that UI state
can be resynchronized indefinitely, discarding the user's page after
only 5-9s of disconnection seems premature. See
https://github.com/zauberzeug/nicegui/issues/3143#issuecomment-2151376569
for more.

---

Open tasks (October 24, 2024):

- [x] `message_history_length` isn't being used
- [x] handle reconnect when next message ID has already been pruned
- [x] fix failing pytests
- [x] fix test_no_object_duplication_on_index_client
- [x] Should the auto-index client reload when trying to reconnect
(because there is no message history)? -> No.
- [x] ack message to keep history short
- [x] thorough test
- [x] test On Air

---------

Co-authored-by: Falko Schindler <falko@zauberzeug.com>
Co-authored-by: Rodja Trappe <rodja@zauberzeug.com>
Aaron Fuller преди 5 месеца
родител
ревизия
93be790460
променени са 10 файла, в които са добавени 123 реда и са изтрити 24 реда
  1. 9 1
      nicegui/air.py
  2. 3 0
      nicegui/app/app_config.py
  3. 10 8
      nicegui/client.py
  4. 11 3
      nicegui/nicegui.py
  5. 58 10
      nicegui/outbox.py
  6. 5 1
      nicegui/page.py
  7. 20 1
      nicegui/static/nicegui.js
  8. 1 0
      nicegui/testing/general_fixtures.py
  9. 3 0
      nicegui/ui_run.py
  10. 3 0
      nicegui/ui_run_with.py

+ 9 - 1
nicegui/air.py

@@ -134,7 +134,7 @@ class Air:
                 core.app.storage.copy_tab(data['old_tab_id'], data['tab_id'])
             client.tab_id = data['tab_id']
             client.on_air = True
-            client.handle_handshake()
+            client.handle_handshake(data.get('next_message_id'))
             return True
 
         @self.relay.on('client_disconnect')
@@ -178,6 +178,14 @@ class Air:
             client = Client.instances[client_id]
             client.handle_javascript_response(data['msg'])
 
+        @self.relay.on('ack')
+        def _handle_ack(data: Dict[str, Any]) -> None:
+            client_id = data['client_id']
+            if client_id not in Client.instances:
+                return
+            client = Client.instances[client_id]
+            client.outbox.prune_history(data['msg']['next_message_id'])
+
         @self.relay.on('out_of_time')
         async def _handle_out_of_time() -> None:
             print('Sorry, you have reached the time limit of this NiceGUI On Air preview.', flush=True)

+ 3 - 0
nicegui/app/app_config.py

@@ -32,6 +32,7 @@ class AppConfig:
     language: Language = field(init=False)
     binding_refresh_interval: float = field(init=False)
     reconnect_timeout: float = field(init=False)
+    message_history_length: int = field(init=False)
     tailwind: bool = field(init=False)
     prod_js: bool = field(init=False)
     show_welcome_message: bool = field(init=False)
@@ -47,6 +48,7 @@ class AppConfig:
                        language: Language,
                        binding_refresh_interval: float,
                        reconnect_timeout: float,
+                       message_history_length: int,
                        tailwind: bool,
                        prod_js: bool,
                        show_welcome_message: bool,
@@ -60,6 +62,7 @@ class AppConfig:
         self.language = language
         self.binding_refresh_interval = binding_refresh_interval
         self.reconnect_timeout = reconnect_timeout
+        self.message_history_length = message_history_length
         self.tailwind = tailwind
         self.prod_js = prod_js
         self.show_welcome_message = show_welcome_message

+ 10 - 8
nicegui/client.py

@@ -63,6 +63,7 @@ class Client:
         self._deleted = False
         self.tab_id: Optional[str] = None
 
+        self.page = page
         self.outbox = Outbox(self)
 
         with Element('q-layout', _client=self).props('view="hhh lpr fff"').classes('nicegui-layout') as self.layout:
@@ -75,7 +76,6 @@ class Client:
         self._head_html = ''
         self._body_html = ''
 
-        self.page = page
         self.storage = ObservableDict()
 
         self.connect_handlers: List[Union[Callable[..., Any], Awaitable]] = []
@@ -123,7 +123,11 @@ class Client:
         elements = json.dumps({
             id: element._to_dict() for id, element in self.elements.items()  # pylint: disable=protected-access
         })
-        socket_io_js_query_params = {**core.app.config.socket_io_js_query_params, 'client_id': self.id}
+        socket_io_js_query_params = {
+            **core.app.config.socket_io_js_query_params,
+            'client_id': self.id,
+            'next_message_id': self.outbox.next_message_id,
+        }
         vue_html, vue_styles, vue_scripts, imports, js_imports = generate_resources(prefix, self.elements.values())
         return templates.TemplateResponse(
             request=request,
@@ -227,8 +231,10 @@ class Client:
         """Add a callback to be invoked when the client disconnects."""
         self.disconnect_handlers.append(handler)
 
-    def handle_handshake(self) -> None:
+    def handle_handshake(self, next_message_id: Optional[int]) -> None:
         """Cancel pending disconnect task and invoke connect handlers."""
+        if next_message_id is not None:
+            self.outbox.try_rewind(next_message_id)
         if self._disconnect_task:
             self._disconnect_task.cancel()
             self._disconnect_task = None
@@ -241,11 +247,7 @@ class Client:
     def handle_disconnect(self) -> None:
         """Wait for the browser to reconnect; invoke disconnect handlers if it doesn't."""
         async def handle_disconnect() -> None:
-            if self.page.reconnect_timeout is not None:
-                delay = self.page.reconnect_timeout
-            else:
-                delay = core.app.config.reconnect_timeout  # pylint: disable=protected-access
-            await asyncio.sleep(delay)
+            await asyncio.sleep(self.page.resolve_reconnect_timeout())
             for t in self.disconnect_handlers:
                 self.safe_invoke(t)
             for t in core.app._disconnect_handlers:  # pylint: disable=protected-access

+ 11 - 3
nicegui/nicegui.py

@@ -3,7 +3,7 @@ import mimetypes
 import urllib.parse
 from contextlib import asynccontextmanager
 from pathlib import Path
-from typing import Dict
+from typing import Any, Dict
 
 import socketio
 from fastapi import HTTPException, Request
@@ -163,7 +163,7 @@ async def _exception_handler_500(request: Request, exception: Exception) -> Resp
 
 
 @sio.on('handshake')
-async def _on_handshake(sid: str, data: Dict[str, str]) -> bool:
+async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool:
     client = Client.instances.get(data['client_id'])
     if not client:
         return False
@@ -175,7 +175,7 @@ async def _on_handshake(sid: str, data: Dict[str, str]) -> bool:
     else:
         client.environ = sio.get_environ(sid)
         await sio.enter_room(sid, client.id)
-    client.handle_handshake()
+    client.handle_handshake(data.get('next_message_id'))
     return True
 
 
@@ -203,3 +203,11 @@ def _on_javascript_response(_: str, msg: Dict) -> None:
     if not client:
         return
     client.handle_javascript_response(msg)
+
+
+@sio.on('ack')
+def _on_ack(_: str, msg: Dict) -> None:
+    client = Client.instances.get(msg['client_id'])
+    if not client:
+        return
+    client.outbox.prune_history(msg['next_message_id'])

+ 58 - 10
nicegui/outbox.py

@@ -1,6 +1,7 @@
 from __future__ import annotations
 
 import asyncio
+import time
 from collections import deque
 from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple
 
@@ -10,10 +11,16 @@ if TYPE_CHECKING:
     from .client import Client
     from .element import Element
 
-ClientId = str
 ElementId = int
+
+ClientId = str
 MessageType = str
-Message = Tuple[ClientId, MessageType, Any]
+Payload = Any
+Message = Tuple[ClientId, MessageType, Payload]
+
+MessageId = int
+MessageTime = float
+HistoryEntry = Tuple[MessageId, MessageTime, Message]
 
 
 class Outbox:
@@ -22,8 +29,12 @@ class Outbox:
         self.client = client
         self.updates: Dict[ElementId, Optional[Element]] = {}
         self.messages: Deque[Message] = deque()
+        self.message_history: Deque[HistoryEntry] = deque()
+        self.next_message_id: int = 0
+
         self._should_stop = False
         self._enqueue_event: Optional[asyncio.Event] = None
+
         if core.app.is_started:
             background_tasks.create(self.loop(), name=f'outbox loop {client.id}')
         else:
@@ -46,7 +57,7 @@ class Outbox:
         self.updates[element.id] = None
         self._set_enqueue_event()
 
-    def enqueue_message(self, message_type: MessageType, data: Any, target_id: ClientId) -> None:
+    def enqueue_message(self, message_type: MessageType, data: Payload, target_id: ClientId) -> None:
         """Enqueue a message for the given client."""
         self.client.check_existence()
         self.messages.append((target_id, message_type, data))
@@ -77,12 +88,12 @@ class Outbox:
                         element_id: None if element is None else element._to_dict()  # pylint: disable=protected-access
                         for element_id, element in self.updates.items()
                     }
-                    coros.append(self._emit('update', data, self.client.id))
+                    coros.append(self._emit((self.client.id, 'update', data)))
                     self.updates.clear()
 
                 if self.messages:
-                    for target_id, message_type, data in self.messages:
-                        coros.append(self._emit(message_type, data, target_id))
+                    for message in self.messages:
+                        coros.append(self._emit(message))
                     self.messages.clear()
 
                 for coro in coros:
@@ -95,10 +106,47 @@ class Outbox:
                 core.app.handle_exception(e)
                 await asyncio.sleep(0.1)
 
-    async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None:
-        await core.sio.emit(message_type, data, room=target_id)
-        if core.air is not None and core.air.is_air_target(target_id):
-            await core.air.emit(message_type, data, room=target_id)
+    async def _emit(self, message: Message) -> None:
+        client_id, message_type, data = message
+        data['_id'] = self.next_message_id
+
+        await core.sio.emit(message_type, data, room=client_id)
+        if core.air is not None and core.air.is_air_target(client_id):
+            await core.air.emit(message_type, data, room=client_id)
+
+        if not self.client.shared:
+            self.message_history.append((self.next_message_id, time.time(), message))
+            max_age = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self.client.page.resolve_reconnect_timeout()
+            while self.message_history and self.message_history[0][1] < time.time() - max_age:
+                self.message_history.popleft()
+            while len(self.message_history) > core.app.config.message_history_length:
+                self.message_history.popleft()
+
+        self.next_message_id += 1
+
+    def try_rewind(self, target_message_id: MessageId) -> None:
+        """Rewind to the given message ID and discard all messages before it."""
+        # nothing to do, the next message ID is already the target message ID
+        if self.next_message_id == target_message_id:
+            return
+
+        # rewind to the target message ID
+        while self.message_history:
+            self.next_message_id, _, message = self.message_history.pop()
+            self.messages.appendleft(message)
+            if self.next_message_id == target_message_id:
+                self.message_history.clear()
+                self._set_enqueue_event()
+                return
+
+        # target message ID not found, reload the page
+        if not self.client.shared:
+            self.client.run_javascript('window.location.reload()')
+
+    def prune_history(self, next_message_id: MessageId) -> None:
+        """Prune the message history up to the given message ID."""
+        while self.message_history and self.message_history[0][0] < next_message_id:
+            self.message_history.popleft()
 
     def stop(self) -> None:
         """Stop the outbox loop."""

+ 5 - 1
nicegui/page.py

@@ -56,7 +56,7 @@ class page:
         :param dark: whether to use Quasar's dark mode (defaults to `dark` argument of `run` command)
         :param language: language of the page (defaults to `language` argument of `run` command)
         :param response_timeout: maximum time for the decorated function to build the page (default: 3.0 seconds)
-        :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 0.0 seconds)
+        :param reconnect_timeout: maximum time the server waits for the browser to reconnect (defaults to `reconnect_timeout` argument of `run` command))
         :param api_router: APIRouter instance to use, can be left `None` to use the default
         :param kwargs: additional keyword arguments passed to FastAPI's @app.get method
         """
@@ -94,6 +94,10 @@ class page:
         """Return the language of the page."""
         return self.language if self.language is not ... else core.app.config.language
 
+    def resolve_reconnect_timeout(self) -> float:
+        """Return the reconnect_timeout of the page."""
+        return self.reconnect_timeout if self.reconnect_timeout is not None else core.app.config.reconnect_timeout
+
     def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]:
         core.app.remove_route(self.path)  # NOTE make sure only the latest route definition is used
         parameters_of_decorated_func = list(inspect.signature(func).parameters.keys())

+ 20 - 1
nicegui/static/nicegui.js

@@ -269,6 +269,15 @@ function download(src, filename, mediaType, prefix) {
   }
 }
 
+function ack() {
+  if (window.ackedMessageId >= window.nextMessageId) return;
+  window.socket.emit("ack", {
+    client_id: window.clientId,
+    next_message_id: window.nextMessageId,
+  });
+  window.ackedMessageId = window.nextMessageId;
+}
+
 async function loadDependencies(element, prefix, version) {
   if (element.component) {
     const { name, key, tag } = element.component;
@@ -310,6 +319,7 @@ window.onbeforeunload = function () {
 
 function createApp(elements, options) {
   replaceUndefinedAttributes(elements, 0);
+  setInterval(() => ack(), 3000);
   return (app = Vue.createApp({
     data() {
       return {
@@ -324,6 +334,8 @@ function createApp(elements, options) {
       window.clientId = options.query.client_id;
       const url = window.location.protocol === "https:" ? "wss://" : "ws://" + window.location.host;
       window.path_prefix = options.prefix;
+      window.nextMessageId = options.query.next_message_id;
+      window.ackedMessageId = -1;
       window.socket = io(url, {
         path: `${options.prefix}/_nicegui_ws/socket.io`,
         query: options.query,
@@ -337,6 +349,7 @@ function createApp(elements, options) {
             client_id: window.clientId,
             tab_id: TAB_ID,
             old_tab_id: OLD_TAB_ID,
+            next_message_id: window.nextMessageId,
           };
           window.socket.emit("handshake", args, (ok) => {
             if (!ok) {
@@ -375,7 +388,7 @@ function createApp(elements, options) {
             replaceUndefinedAttributes(this.elements, id);
           }
         },
-        run_javascript: (msg) => runJavascript(msg["code"], msg["request_id"]),
+        run_javascript: (msg) => runJavascript(msg.code, msg.request_id),
         open: (msg) => {
           const url = msg.path.startsWith("/") ? options.prefix + msg.path : msg.path;
           const target = msg.new_tab ? "_blank" : "_self";
@@ -388,6 +401,12 @@ function createApp(elements, options) {
       let isProcessingSocketMessage = false;
       for (const [event, handler] of Object.entries(messageHandlers)) {
         window.socket.on(event, async (...args) => {
+          if (args.length > 0 && args[0]._id !== undefined) {
+            const message_id = args[0]._id;
+            if (message_id < window.nextMessageId) return;
+            window.nextMessageId = message_id + 1;
+            delete args[0]._id;
+          }
           socketMessageQueue.push(() => handler(...args));
           if (!isProcessingSocketMessage) {
             while (socketMessageQueue.length > 0) {

+ 1 - 0
nicegui/testing/general_fixtures.py

@@ -91,6 +91,7 @@ def prepare_simulation(request: pytest.FixtureRequest) -> None:
         language='en-US',
         binding_refresh_interval=0.1,
         reconnect_timeout=3.0,
+        message_history_length=1000,
         tailwind=True,
         prod_js=True,
         show_welcome_message=False,

+ 3 - 0
nicegui/ui_run.py

@@ -53,6 +53,7 @@ def run(*,
         language: Language = 'en-US',
         binding_refresh_interval: float = 0.1,
         reconnect_timeout: float = 3.0,
+        message_history_length: int = 1000,
         fastapi_docs: Union[bool, DocsConfig] = False,
         show: bool = True,
         on_air: Optional[Union[str, Literal[True]]] = None,
@@ -86,6 +87,7 @@ def run(*,
     :param language: language for Quasar elements (default: `'en-US'`)
     :param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly)
     :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds)
+    :param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable)
     :param fastapi_docs: enable FastAPI's automatic documentation with Swagger UI, ReDoc, and OpenAPI JSON (bool or dictionary as described `here<https://fastapi.tiangolo.com/tutorial/metadata/>`_, default: `False`)
     :param show: automatically open the UI in a browser tab (default: `True`)
     :param on_air: tech preview: `allows temporary remote access <https://nicegui.io/documentation/section_configuration_deployment#nicegui_on_air>`_ if set to `True` (default: disabled)
@@ -114,6 +116,7 @@ def run(*,
         language=language,
         binding_refresh_interval=binding_refresh_interval,
         reconnect_timeout=reconnect_timeout,
+        message_history_length=message_history_length,
         tailwind=tailwind,
         prod_js=prod_js,
         show_welcome_message=show_welcome_message,

+ 3 - 0
nicegui/ui_run_with.py

@@ -19,6 +19,7 @@ def run_with(
     language: Language = 'en-US',
     binding_refresh_interval: float = 0.1,
     reconnect_timeout: float = 3.0,
+    message_history_length: int = 1000,
     mount_path: str = '/',
     on_air: Optional[Union[str, Literal[True]]] = None,
     tailwind: bool = True,
@@ -36,6 +37,7 @@ def run_with(
     :param language: language for Quasar elements (default: `'en-US'`)
     :param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly)
     :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds)
+    :param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable)
     :param mount_path: mount NiceGUI at this path (default: `'/'`)
     :param on_air: tech preview: `allows temporary remote access <https://nicegui.io/documentation/section_configuration_deployment#nicegui_on_air>`_ if set to `True` (default: disabled)
     :param tailwind: whether to use Tailwind CSS (experimental, default: `True`)
@@ -52,6 +54,7 @@ def run_with(
         language=language,
         binding_refresh_interval=binding_refresh_interval,
         reconnect_timeout=reconnect_timeout,
+        message_history_length=message_history_length,
         tailwind=tailwind,
         prod_js=prod_js,
         show_welcome_message=show_welcome_message,