Quellcode durchsuchen

Run init of app.storage.general as early as possible (#4355)

When introducing Redis storage in #4074, the initialization of
persistent storage was made async to not block the main thread when
loading data. For that to work the init must be done async -- which
introduced #4352. This PR fixes the issue by moving the init of
`app.storage.general` from app startup to import time and using
`asyncio.run` to create a temporary event loop for the init.

---------

Co-authored-by: Falko Schindler <falko@zauberzeug.com>
Rodja Trappe vor 2 Monaten
Ursprung
Commit
b71074ef73

+ 0 - 1
nicegui/app/app.py

@@ -46,7 +46,6 @@ class App(FastAPI):
         self._disconnect_handlers: List[Union[Callable[..., Any], Awaitable]] = []
         self._exception_handlers: List[Callable[..., Any]] = [log.exception]
 
-        self.on_startup(self.storage.general.initialize)
         self.on_shutdown(self.storage.on_shutdown)
 
     @property

+ 2 - 2
nicegui/nicegui.py

@@ -43,8 +43,8 @@ class SocketIoApp(socketio.ASGIApp):
 
 
 core.app = app = App(default_response_class=NiceGUIJSONResponse, lifespan=_lifespan)
-# NOTE we use custom json module which wraps orjson
-core.sio = sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*', json=json)
+core.app.storage.general.initialize_sync()
+core.sio = sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*', json=json)  # custom orjson wrapper
 sio_app = SocketIoApp(socketio_server=sio, socketio_path='/socket.io')
 app.mount('/_nicegui_ws/', sio_app)
 

+ 10 - 0
nicegui/persistence/file_persistent_dict.py

@@ -28,6 +28,16 @@ class FilePersistentDict(PersistentDict):
         except Exception:
             log.warning(f'Could not load storage file {self.filepath}')
 
+    def initialize_sync(self) -> None:
+        try:
+            if self.filepath.exists():
+                data = json.loads(self.filepath.read_text(encoding=self.encoding))
+            else:
+                data = {}
+            self.update(data)
+        except Exception:
+            log.warning(f'Could not load storage file {self.filepath}')
+
     def backup(self) -> None:
         """Back up the data to the given file path."""
         if not self.filepath.exists():

+ 4 - 0
nicegui/persistence/persistent_dict.py

@@ -9,5 +9,9 @@ class PersistentDict(observables.ObservableDict, abc.ABC):
     async def initialize(self) -> None:
         """Load initial data from the persistence layer."""
 
+    @abc.abstractmethod
+    def initialize_sync(self) -> None:
+        """Load initial data from the persistence layer in a synchronous context."""
+
     async def close(self) -> None:
         """Clean up the persistence layer."""

+ 25 - 2
nicegui/persistence/redis_persistent_dict.py

@@ -3,6 +3,7 @@ from ..logging import log
 from .persistent_dict import PersistentDict
 
 try:
+    import redis as redis_sync
     import redis.asyncio as redis
     optional_features.register('redis')
 except ImportError:
@@ -14,6 +15,7 @@ class RedisPersistentDict(PersistentDict):
     def __init__(self, *, url: str, id: str, key_prefix: str = 'nicegui:') -> None:  # pylint: disable=redefined-builtin
         if not optional_features.has('redis'):
             raise ImportError('Redis is not installed. Please run "pip install nicegui[redis]".')
+        self.url = url
         self.redis_client = redis.from_url(
             url,
             health_check_interval=10,
@@ -30,18 +32,39 @@ class RedisPersistentDict(PersistentDict):
         try:
             data = await self.redis_client.get(self.key)
             self.update(json.loads(data) if data else {})
+            self._start_listening()
         except Exception:
             log.warning(f'Could not load data from Redis with key {self.key}')
-        await self.pubsub.subscribe(self.key + 'changes')
 
+    def initialize_sync(self) -> None:
+        """Load initial data from Redis and start listening for changes in a synchronous context."""
+        with redis_sync.from_url(
+            self.url,
+            health_check_interval=10,
+            socket_connect_timeout=5,
+            retry_on_timeout=True,
+            socket_keepalive=True,
+        ) as redis_client_sync:
+            try:
+                data = redis_client_sync.get(self.key)
+                self.update(json.loads(data) if data else {})
+                self._start_listening()
+            except Exception:
+                log.warning(f'Could not load data from Redis with key {self.key}')
+
+    def _start_listening(self) -> None:
         async def listen():
+            await self.pubsub.subscribe(self.key + 'changes')
             async for message in self.pubsub.listen():
                 if message['type'] == 'message':
                     new_data = json.loads(message['data'])
                     if new_data != self:
                         self.update(new_data)
 
-        background_tasks.create(listen(), name=f'redis-listen-{self.key}')
+        if core.loop and core.loop.is_running():
+            background_tasks.create(listen(), name=f'redis-listen-{self.key}')
+        else:
+            core.app.on_startup(listen())
 
     def publish(self) -> None:
         """Publish the data to Redis and notify other instances."""