浏览代码

[ENG-4783] Handle keyspace notif for Redis logical DB > 0 (#4877)

* Enable keyspace notifications once per process
* Identify the logical db index used by the current redis connection
* Register for keyspace notifications on the active logical db

Fix #3984
Fix #4873
Masen Furer 2 月之前
父节点
当前提交
7364efffae
共有 1 个文件被更改,包括 29 次插入12 次删除
  1. 29 12
      reflex/state.py

+ 29 - 12
reflex/state.py

@@ -3205,7 +3205,7 @@ class StateManagerRedis(StateManager):
         default_factory=_default_lock_warning_threshold
     )
 
-    # The keyspace subscription string when redis is waiting for lock to be released
+    # The keyspace subscription string when redis is waiting for lock to be released.
     _redis_notify_keyspace_events: str = (
         "K"  # Enable keyspace notifications (target a particular key)
         "g"  # For generic commands (DEL, EXPIRE, etc)
@@ -3213,7 +3213,7 @@ class StateManagerRedis(StateManager):
         "e"  # For evicted events (i.e. maxmemory exceeded)
     )
 
-    # These events indicate that a lock is no longer held
+    # These events indicate that a lock is no longer held.
     _redis_keyspace_lock_release_events: Set[bytes] = {
         b"del",
         b"expire",
@@ -3221,6 +3221,12 @@ class StateManagerRedis(StateManager):
         b"evicted",
     }
 
+    # Whether keyspace notifications have been enabled.
+    _redis_notify_keyspace_events_enabled: bool = False
+
+    # The logical database number used by the redis client.
+    _redis_db: int = 0
+
     def _get_required_state_classes(
         self,
         target_state_cls: Type[BaseState],
@@ -3553,20 +3559,17 @@ class StateManagerRedis(StateManager):
                 return
             await self._get_pubsub_message(pubsub, timeout=remaining)
 
-    async def _wait_lock(self, lock_key: bytes, lock_id: bytes) -> None:
-        """Wait for a redis lock to be released via pubsub.
-
-        Coroutine will not return until the lock is obtained.
-
-        Args:
-            lock_key: The redis key for the lock.
-            lock_id: The ID of the lock.
+    async def _enable_keyspace_notifications(self):
+        """Enable keyspace notifications for the redis server.
 
         Raises:
             ResponseError: when the keyspace config cannot be set.
         """
-        lock_key_channel = f"__keyspace@0__:{lock_key.decode()}"
-        # Enable keyspace notifications for the lock key, so we know when it is available.
+        if self._redis_notify_keyspace_events_enabled:
+            return
+        # Find out which logical database index is being used.
+        self._redis_db = self.redis.get_connection_kwargs().get("db", self._redis_db)
+
         try:
             await self.redis.config_set(
                 "notify-keyspace-events",
@@ -3576,6 +3579,20 @@ class StateManagerRedis(StateManager):
             # Some redis servers only allow out-of-band configuration, so ignore errors here.
             if not environment.REFLEX_IGNORE_REDIS_CONFIG_ERROR.get():
                 raise
+        self._redis_notify_keyspace_events_enabled = True
+
+    async def _wait_lock(self, lock_key: bytes, lock_id: bytes) -> None:
+        """Wait for a redis lock to be released via pubsub.
+
+        Coroutine will not return until the lock is obtained.
+
+        Args:
+            lock_key: The redis key for the lock.
+            lock_id: The ID of the lock.
+        """
+        # Enable keyspace notifications for the lock key, so we know when it is available.
+        await self._enable_keyspace_notifications()
+        lock_key_channel = f"__keyspace@{self._redis_db}__:{lock_key.decode()}"
         async with self.redis.pubsub() as pubsub:
             await pubsub.psubscribe(lock_key_channel)
             # wait for the lock to be released