소스 검색

Event Loop Refactor (#1590)

Masen Furer 1 년 전
부모
커밋
2ff823e89a

+ 1 - 1
integration/conftest.py

@@ -48,7 +48,7 @@ def pytest_exception_interact(node, call, report):
     safe_filename = re.sub(
         r"(?u)[^-\w.]",
         "_",
-        str(node.nodeid).strip().replace(" ", "_"),
+        str(node.nodeid).strip().replace(" ", "_").replace(":", "_"),
     )
 
     DISPLAY.waitgrab().save(

+ 193 - 0
integration/test_dynamic_routes.py

@@ -0,0 +1,193 @@
+"""Integration tests for dynamic route page behavior."""
+import time
+from contextlib import contextmanager
+from typing import Generator
+from urllib.parse import urlsplit
+
+import pytest
+from selenium.webdriver.common.by import By
+
+from reflex.testing import AppHarness
+
+
+def DynamicRoute():
+    """App for testing dynamic routes."""
+    import reflex as rx
+
+    class DynamicState(rx.State):
+        order: list[str] = []
+        page_id: str = ""
+
+        def on_load(self):
+            self.order.append(self.page_id or "no page id")
+
+        @rx.var
+        def next_page(self) -> str:
+            try:
+                return str(int(self.page_id) + 1)
+            except ValueError:
+                return "0"
+
+        @rx.var
+        def token(self) -> str:
+            return self.get_token()
+
+    def index():
+        return rx.fragment(
+            rx.input(value=DynamicState.token, is_read_only=True, id="token"),
+            rx.input(value=DynamicState.page_id, is_read_only=True, id="page_id"),
+            rx.link("index", href="/", id="link_index"),  # type: ignore
+            rx.link("page_X", href="/static/x", id="link_page_x"),  # type: ignore
+            rx.link(
+                "next", href="/page/" + DynamicState.next_page, id="link_page_next"  # type: ignore
+            ),
+            rx.list(
+                rx.foreach(DynamicState.order, lambda i: rx.list_item(rx.text(i))),  # type: ignore
+            ),
+        )
+
+    app = rx.App(state=DynamicState)
+    app.add_page(index)
+    app.add_page(index, route="/page/[page_id]", on_load=DynamicState.on_load)  # type: ignore
+    app.add_page(index, route="/static/x", on_load=DynamicState.on_load)  # type: ignore
+    app.compile()
+
+
+@pytest.fixture(scope="session")
+def dynamic_route(tmp_path_factory) -> Generator[AppHarness, None, None]:
+    """Start DynamicRoute app at tmp_path via AppHarness.
+
+    Args:
+        tmp_path_factory: pytest tmp_path_factory fixture
+
+    Yields:
+        running AppHarness instance
+    """
+    with AppHarness.create(
+        root=tmp_path_factory.mktemp("dynamic_route"),
+        app_source=DynamicRoute,  # type: ignore
+    ) as harness:
+        yield harness
+
+
+@pytest.fixture
+def driver(dynamic_route: AppHarness):
+    """Get an instance of the browser open to the dynamic_route app.
+
+    Args:
+        dynamic_route: harness for DynamicRoute app
+
+    Yields:
+        WebDriver instance.
+    """
+    assert dynamic_route.app_instance is not None, "app is not running"
+    driver = dynamic_route.frontend()
+    try:
+        assert dynamic_route.poll_for_clients()
+        yield driver
+    finally:
+        driver.quit()
+
+
+@contextmanager
+def poll_for_navigation(driver, timeout: int = 5) -> Generator[None, None, None]:
+    """Wait for driver url to change.
+
+    Use as a contextmanager, and apply the navigation event inside the context
+    block, polling will occur after the context block exits.
+
+    Args:
+        driver: WebDriver instance.
+        timeout: Time to wait for url to change.
+
+    Yields:
+        None
+    """
+    prev_url = driver.current_url
+
+    yield
+
+    AppHarness._poll_for(lambda: prev_url != driver.current_url, timeout=timeout)
+
+
+def test_on_load_navigate(dynamic_route: AppHarness, driver):
+    """Click links to navigate between dynamic pages with on_load event.
+
+    Args:
+        dynamic_route: harness for DynamicRoute app.
+        driver: WebDriver instance.
+    """
+    assert dynamic_route.app_instance is not None
+    token_input = driver.find_element(By.ID, "token")
+    link = driver.find_element(By.ID, "link_page_next")
+    assert token_input
+    assert link
+
+    # wait for the backend connection to send the token
+    token = dynamic_route.poll_for_value(token_input)
+    assert token is not None
+
+    # click the link a few times
+    for ix in range(10):
+        # wait for navigation, then assert on url
+        with poll_for_navigation(driver):
+            link.click()
+        assert urlsplit(driver.current_url).path == f"/page/{ix}/"
+
+        link = driver.find_element(By.ID, "link_page_next")
+        page_id_input = driver.find_element(By.ID, "page_id")
+
+        assert link
+        assert page_id_input
+
+        assert dynamic_route.poll_for_value(page_id_input) == str(ix)
+
+    # look up the backend state and assert that `on_load` was called for all
+    # navigation events
+    backend_state = dynamic_route.app_instance.state_manager.states[token]
+    # TODO: navigating to dynamic page initially fires hydrate twice
+    # because the new page re-initializes `useEventLoop`, with the same hydrate event
+    # but routeChangeComplete also still fires.
+    time.sleep(0.2)
+    assert backend_state.order[1:] == [str(ix) for ix in range(10)]
+
+
+def test_on_load_navigate_non_dynamic(dynamic_route: AppHarness, driver):
+    """Click links to navigate between static pages with on_load event.
+
+
+    Args:
+        dynamic_route: harness for DynamicRoute app.
+        driver: WebDriver instance.
+    """
+    assert dynamic_route.app_instance is not None
+    token_input = driver.find_element(By.ID, "token")
+    link = driver.find_element(By.ID, "link_page_x")
+    assert token_input
+    assert link
+
+    # wait for the backend connection to send the token
+    token = dynamic_route.poll_for_value(token_input)
+    assert token is not None
+
+    with poll_for_navigation(driver):
+        link.click()
+    assert urlsplit(driver.current_url).path == "/static/x/"
+
+    # look up the backend state and assert that `on_load` was called once
+    backend_state = dynamic_route.app_instance.state_manager.states[token]
+    time.sleep(0.2)
+    assert backend_state.order == ["no page id"]
+
+    # go back to the index and navigate back to the static route
+    link = driver.find_element(By.ID, "link_index")
+    with poll_for_navigation(driver):
+        link.click()
+    assert urlsplit(driver.current_url).path == "/"
+
+    link = driver.find_element(By.ID, "link_page_x")
+    with poll_for_navigation(driver):
+        link.click()
+    assert urlsplit(driver.current_url).path == "/static/x/"
+    time.sleep(0.2)
+    assert backend_state.order == ["no page id", "no page id"]

+ 332 - 0
integration/test_event_chain.py

@@ -0,0 +1,332 @@
+"""Ensure that Event Chains are properly queued and handled between frontend and backend."""
+
+import time
+from typing import Generator
+
+import pytest
+from selenium.webdriver.common.by import By
+
+from reflex.testing import AppHarness
+
+MANY_EVENTS = 50
+
+
+def EventChain():
+    """App with chained event handlers."""
+    import reflex as rx
+
+    # repeated here since the outer global isn't exported into the App module
+    MANY_EVENTS = 50
+
+    class State(rx.State):
+        event_order: list[str] = []
+
+        @rx.var
+        def token(self) -> str:
+            return self.get_token()
+
+        def event_no_args(self):
+            self.event_order.append("event_no_args")
+
+        def event_arg(self, arg):
+            self.event_order.append(f"event_arg:{arg}")
+
+        def event_nested_1(self):
+            self.event_order.append("event_nested_1")
+            yield State.event_nested_2
+            yield State.event_arg("nested_1")  # type: ignore
+
+        def event_nested_2(self):
+            self.event_order.append("event_nested_2")
+            yield State.event_nested_3
+            yield rx.console_log("event_nested_2")
+            yield State.event_arg("nested_2")  # type: ignore
+
+        def event_nested_3(self):
+            self.event_order.append("event_nested_3")
+            yield State.event_no_args
+            yield State.event_arg("nested_3")  # type: ignore
+
+        def on_load_return_chain(self):
+            self.event_order.append("on_load_return_chain")
+            return [State.event_arg(1), State.event_arg(2), State.event_arg(3)]  # type: ignore
+
+        def on_load_yield_chain(self):
+            self.event_order.append("on_load_yield_chain")
+            yield State.event_arg(4)  # type: ignore
+            yield State.event_arg(5)  # type: ignore
+            yield State.event_arg(6)  # type: ignore
+
+        def click_return_event(self):
+            self.event_order.append("click_return_event")
+            return State.event_no_args
+
+        def click_return_events(self):
+            self.event_order.append("click_return_events")
+            return [
+                State.event_arg(7),  # type: ignore
+                rx.console_log("click_return_events"),
+                State.event_arg(8),  # type: ignore
+                State.event_arg(9),  # type: ignore
+            ]
+
+        def click_yield_chain(self):
+            self.event_order.append("click_yield_chain:0")
+            yield State.event_arg(10)  # type: ignore
+            self.event_order.append("click_yield_chain:1")
+            yield rx.console_log("click_yield_chain")
+            yield State.event_arg(11)  # type: ignore
+            self.event_order.append("click_yield_chain:2")
+            yield State.event_arg(12)  # type: ignore
+            self.event_order.append("click_yield_chain:3")
+
+        def click_yield_many_events(self):
+            self.event_order.append("click_yield_many_events")
+            for ix in range(MANY_EVENTS):
+                yield State.event_arg(ix)  # type: ignore
+                yield rx.console_log(f"many_events_{ix}")
+            self.event_order.append("click_yield_many_events_done")
+
+        def click_yield_nested(self):
+            self.event_order.append("click_yield_nested")
+            yield State.event_nested_1
+            yield State.event_arg("yield_nested")  # type: ignore
+
+        def redirect_return_chain(self):
+            self.event_order.append("redirect_return_chain")
+            yield rx.redirect("/on-load-return-chain")
+
+        def redirect_yield_chain(self):
+            self.event_order.append("redirect_yield_chain")
+            yield rx.redirect("/on-load-yield-chain")
+
+    app = rx.App(state=State)
+
+    @app.add_page
+    def index():
+        return rx.fragment(
+            rx.input(value=State.token, readonly=True, id="token"),
+            rx.button(
+                "Return Event",
+                id="return_event",
+                on_click=State.click_return_event,
+            ),
+            rx.button(
+                "Return Events",
+                id="return_events",
+                on_click=State.click_return_events,
+            ),
+            rx.button(
+                "Yield Chain",
+                id="yield_chain",
+                on_click=State.click_yield_chain,
+            ),
+            rx.button(
+                "Yield Many events",
+                id="yield_many_events",
+                on_click=State.click_yield_many_events,
+            ),
+            rx.button(
+                "Yield Nested",
+                id="yield_nested",
+                on_click=State.click_yield_nested,
+            ),
+            rx.button(
+                "Redirect Yield Chain",
+                id="redirect_yield_chain",
+                on_click=State.redirect_yield_chain,
+            ),
+            rx.button(
+                "Redirect Return Chain",
+                id="redirect_return_chain",
+                on_click=State.redirect_return_chain,
+            ),
+        )
+
+    def on_load_return_chain():
+        return rx.fragment(
+            rx.text("return"),
+            rx.input(value=State.token, readonly=True, id="token"),
+        )
+
+    def on_load_yield_chain():
+        return rx.fragment(
+            rx.text("yield"),
+            rx.input(value=State.token, readonly=True, id="token"),
+        )
+
+    app.add_page(on_load_return_chain, on_load=State.on_load_return_chain)  # type: ignore
+    app.add_page(on_load_yield_chain, on_load=State.on_load_yield_chain)  # type: ignore
+
+    app.compile()
+
+
+@pytest.fixture(scope="session")
+def event_chain(tmp_path_factory) -> Generator[AppHarness, None, None]:
+    """Start EventChain app at tmp_path via AppHarness.
+
+    Args:
+        tmp_path_factory: pytest tmp_path_factory fixture
+
+    Yields:
+        running AppHarness instance
+    """
+    with AppHarness.create(
+        root=tmp_path_factory.mktemp("event_chain"),
+        app_source=EventChain,  # type: ignore
+    ) as harness:
+        yield harness
+
+
+@pytest.fixture
+def driver(event_chain: AppHarness):
+    """Get an instance of the browser open to the event_chain app.
+
+    Args:
+        event_chain: harness for EventChain app
+
+    Yields:
+        WebDriver instance.
+    """
+    assert event_chain.app_instance is not None, "app is not running"
+    driver = event_chain.frontend()
+    try:
+        assert event_chain.poll_for_clients()
+        yield driver
+    finally:
+        driver.quit()
+
+
+@pytest.mark.parametrize(
+    ("button_id", "exp_event_order"),
+    [
+        ("return_event", ["click_return_event", "event_no_args"]),
+        (
+            "return_events",
+            ["click_return_events", "event_arg:7", "event_arg:8", "event_arg:9"],
+        ),
+        (
+            "yield_chain",
+            [
+                "click_yield_chain:0",
+                "click_yield_chain:1",
+                "click_yield_chain:2",
+                "click_yield_chain:3",
+                "event_arg:10",
+                "event_arg:11",
+                "event_arg:12",
+            ],
+        ),
+        (
+            "yield_many_events",
+            [
+                "click_yield_many_events",
+                "click_yield_many_events_done",
+                *[f"event_arg:{ix}" for ix in range(MANY_EVENTS)],
+            ],
+        ),
+        (
+            "yield_nested",
+            [
+                "click_yield_nested",
+                "event_nested_1",
+                "event_arg:yield_nested",
+                "event_nested_2",
+                "event_arg:nested_1",
+                "event_nested_3",
+                "event_arg:nested_2",
+                "event_no_args",
+                "event_arg:nested_3",
+            ],
+        ),
+        (
+            "redirect_return_chain",
+            [
+                "redirect_return_chain",
+                "on_load_return_chain",
+                "event_arg:1",
+                "event_arg:2",
+                "event_arg:3",
+            ],
+        ),
+        (
+            "redirect_yield_chain",
+            [
+                "redirect_yield_chain",
+                "on_load_yield_chain",
+                "event_arg:4",
+                "event_arg:5",
+                "event_arg:6",
+            ],
+        ),
+    ],
+)
+def test_event_chain_click(event_chain, driver, button_id, exp_event_order):
+    """Click the button, assert that the events are handled in the correct order.
+
+    Args:
+        event_chain: AppHarness for the event_chain app
+        driver: selenium WebDriver open to the app
+        button_id: the ID of the button to click
+        exp_event_order: the expected events recorded in the State
+    """
+    token_input = driver.find_element(By.ID, "token")
+    btn = driver.find_element(By.ID, button_id)
+    assert token_input
+    assert btn
+
+    token = event_chain.poll_for_value(token_input)
+
+    btn.click()
+    if "redirect" in button_id:
+        # wait a bit longer if we're redirecting
+        time.sleep(1)
+    if "many_events" in button_id:
+        # wait a bit longer if we have loads of events
+        time.sleep(1)
+    time.sleep(0.5)
+    backend_state = event_chain.app_instance.state_manager.states[token]
+    assert backend_state.event_order == exp_event_order
+
+
+@pytest.mark.parametrize(
+    ("uri", "exp_event_order"),
+    [
+        (
+            "/on-load-return-chain",
+            [
+                "on_load_return_chain",
+                "event_arg:1",
+                "event_arg:2",
+                "event_arg:3",
+            ],
+        ),
+        (
+            "/on-load-yield-chain",
+            [
+                "on_load_yield_chain",
+                "event_arg:4",
+                "event_arg:5",
+                "event_arg:6",
+            ],
+        ),
+    ],
+)
+def test_event_chain_on_load(event_chain, driver, uri, exp_event_order):
+    """Load the URI, assert that the events are handled in the correct order.
+
+    Args:
+        event_chain: AppHarness for the event_chain app
+        driver: selenium WebDriver open to the app
+        uri: the page to load
+        exp_event_order: the expected events recorded in the State
+    """
+    driver.get(event_chain.frontend_url + uri)
+    token_input = driver.find_element(By.ID, "token")
+    assert token_input
+
+    token = event_chain.poll_for_value(token_input)
+
+    time.sleep(0.5)
+    backend_state = event_chain.app_instance.state_manager.states[token]
+    assert backend_state.event_order == exp_event_order

+ 4 - 3
integration/test_input.py

@@ -78,6 +78,7 @@ async def test_fully_controlled_input(fully_controlled_input: AppHarness):
     # move cursor to home, then to the right and type characters
     debounce_input.send_keys(Keys.HOME, Keys.ARROW_RIGHT)
     debounce_input.send_keys("foo")
+    time.sleep(0.5)
     assert debounce_input.get_attribute("value") == "ifoonitial"
     assert backend_state.text == "ifoonitial"
     assert fully_controlled_input.poll_for_value(value_input) == "ifoonitial"
@@ -96,21 +97,21 @@ async def test_fully_controlled_input(fully_controlled_input: AppHarness):
 
     # type more characters
     debounce_input.send_keys("getting testing done")
-    time.sleep(0.2)
+    time.sleep(0.5)
     assert debounce_input.get_attribute("value") == "getting testing done"
     assert backend_state.text == "getting testing done"
     assert fully_controlled_input.poll_for_value(value_input) == "getting testing done"
 
     # type into the on_change input
     on_change_input.send_keys("overwrite the state")
-    time.sleep(0.2)
+    time.sleep(0.5)
     assert debounce_input.get_attribute("value") == "overwrite the state"
     assert on_change_input.get_attribute("value") == "overwrite the state"
     assert backend_state.text == "overwrite the state"
     assert fully_controlled_input.poll_for_value(value_input) == "overwrite the state"
 
     clear_button.click()
-    time.sleep(0.2)
+    time.sleep(0.5)
     assert on_change_input.get_attribute("value") == ""
     # potential bug: clearing the on_change field doesn't itself trigger on_change
     # assert backend_state.text == ""

+ 6 - 54
reflex/.templates/jinja/web/pages/index.js.jinja2

@@ -8,24 +8,10 @@
 
 {% block export %}
 export default function Component() {
-  const [{{state_name}}, {{state_name|react_setter}}] = useState({{initial_state|json_dumps}})
-  const [{{const.result}}, {{const.result|react_setter}}] = useState({{const.initial_result|json_dumps}})
-  const [notConnected, setNotConnected] = useState(false)
   const {{const.router}} = useRouter()
-  const {{const.socket}} = useRef(null)
-  const { isReady } = {{const.router}}
   const { {{const.color_mode}}, {{const.toggle_color_mode}} } = {{const.use_color_mode}}()
   const focusRef = useRef();
   
-  // Function to add new events to the event queue.
-  const Event = (events, _e) => {
-      preventDefault(_e);
-      {{state_name|react_setter}}(state => ({
-        ...state,
-        events: [...state.events, ...events],
-      }))
-  }
-
   // Function to add new files to be uploaded.
   const File = files => {{state_name|react_setter}}(state => ({
     ...state,
@@ -33,46 +19,10 @@ export default function Component() {
   }))
 
   // Main event loop.
-  useEffect(()=> {
-    // Skip if the router is not ready.
-    if (!isReady) {
-      return;
-    }
-
-    // Initialize the websocket connection.
-    if (!{{const.socket}}.current) {
-      connect({{const.socket}}, {{state_name}}, {{state_name|react_setter}}, {{const.result}}, {{const.result|react_setter}}, {{const.router}}, {{transports}}, setNotConnected)
-    }
-
-    // If we are not processing an event, process the next event.
-    if (!{{const.result}}.{{const.processing}}) {
-      processEvent({{state_name}}, {{state_name|react_setter}}, {{const.result}}, {{const.result|react_setter}}, {{const.router}}, {{const.socket}}.current)
-    }
-
-    // Reset the result.
-    {{const.result|react_setter}}(result => {
-      // If there is a new result, update the state.
-      if ({{const.result}}.{{const.state}} != null) {
-        // Apply the new result to the state and the new events to the queue.
-        {{state_name|react_setter}}(state => {
-          return {
-            ...{{const.result}}.{{const.state}},
-            events: [...state.{{const.events}}, ...{{const.result}}.{{const.events}}],
-          } 
-        })
-        return {
-          {{const.state}}: null,
-          {{const.events}}: [],
-          {{const.final}}: true,
-          {{const.processing}}: !{{const.result}}.{{const.final}},
-        }
-      }
-      return result;
-    })
-
-    // Process the next event.
-    processEvent({{state_name}}, {{state_name|react_setter}}, {{const.result}}, {{const.result|react_setter}}, {{const.router}}, {{const.socket}}.current)
-  })
+  const [{{state_name}}, Event, notConnected] = useEventLoop(
+    {{initial_state|json_dumps}},
+    [E('{{state_name}}.{{const.hydrate}}', {})],
+  )
 
   // Set focus to the specified element.
   useEffect(() => {
@@ -81,6 +31,7 @@ export default function Component() {
     }
   })
 
+  {% if is_dynamic %}
   // Route after the initial page hydration.
   useEffect(() => {
     const change_complete = () => Event([E('{{state_name}}.{{const.hydrate}}', {})])
@@ -89,6 +40,7 @@ export default function Component() {
       {{const.router}}.events.off('routeChangeComplete', change_complete)
     }
   }, [{{const.router}}])
+  {% endif %}
 
   {% for hook in hooks %}
   {{ hook }}

+ 102 - 60
reflex/.templates/web/utils/state.js

@@ -4,6 +4,8 @@ import io from "socket.io-client";
 import JSON5 from "json5";
 import env from "env.json";
 import Cookies from "universal-cookie";
+import { useEffect, useReducer, useRef, useState } from "react";
+import Router, { useRouter } from "next/router";
 
 
 // Endpoint URLs.
@@ -23,6 +25,11 @@ const cookies = new Cookies();
 // Dictionary holding component references.
 export const refs = {};
 
+// Flag ensures that only one event is processing on the backend concurrently.
+let event_processing = false
+// Array holding pending events to be processed.
+const event_queue = [];
+
 /**
  * Generate a UUID (Used for session tokens).
  * Taken from: https://stackoverflow.com/questions/105034/how-do-i-create-a-guid-uuid
@@ -67,6 +74,7 @@ export const getToken = () => {
  * @param delta The delta to apply.
  */
 export const applyDelta = (state, delta) => {
+  const new_state = {...state}
   for (const substate in delta) {
     let s = state;
     const path = substate.split(".").slice(1);
@@ -77,6 +85,7 @@ export const applyDelta = (state, delta) => {
       s[key] = delta[substate][key];
     }
   }
+  return new_state
 };
 
 
@@ -97,17 +106,16 @@ export const getAllLocalStorageItems = () => {
 
 
 /**
- * Send an event to the server.
+ * Handle frontend event or send the event to the backend via Websocket.
  * @param event The event to send.
- * @param router The router object.
  * @param socket The socket object to send the event on.
  *
  * @returns True if the event was sent, false if it was handled locally.
  */
-export const applyEvent = async (event, router, socket) => {
+export const applyEvent = async (event, socket) => {
   // Handle special events
   if (event.name == "_redirect") {
-    router.push(event.payload.path);
+    Router.push(event.payload.path);
     return false;
   }
 
@@ -168,7 +176,7 @@ export const applyEvent = async (event, router, socket) => {
 
   // Send the event to the server.
   event.token = getToken();
-  event.router_data = (({ pathname, query, asPath }) => ({ pathname, query, asPath }))(router);
+  event.router_data = (({ pathname, query, asPath }) => ({ pathname, query, asPath }))(Router);
 
   if (socket) {
     socket.emit("event", JSON.stringify(event));
@@ -179,87 +187,80 @@ export const applyEvent = async (event, router, socket) => {
 };
 
 /**
- * Process an event off the event queue.
- * @param event The current event
+ * Send an event to the server via REST.
+ * @param event The current event.
  * @param state The state with the event queue.
- * @param setResult The function to set the result.
  *
  * @returns Whether the event was sent.
  */
-export const applyRestEvent = async (event, state, setResult) => {
+export const applyRestEvent = async (event, state) => {
   let eventSent = false;
   if (event.handler == "uploadFiles") {
-    eventSent = await uploadFiles(state, setResult, event.name);
+    eventSent = await uploadFiles(state, event.name);
   }
   return eventSent;
 };
 
+/**
+ * Queue events to be processed and trigger processing of queue.
+ * @param events Array of events to queue.
+ * @param socket The socket object to send the event on.
+ */
+export const queueEvents = async (events, socket) => {
+  event_queue.push(...events)
+  await processEvent(socket.current)
+}
+
 /**
  * Process an event off the event queue.
- * @param state The state with the event queue.
- * @param setState The function to set the state.
- * @param result The current result.
- * @param setResult The function to set the result.
- * @param router The router object.
  * @param socket The socket object to send the event on.
  */
 export const processEvent = async (
-  state,
-  setState,
-  result,
-  setResult,
-  router,
   socket
 ) => {
-  // If we are already processing an event, or there are no events to process, return.
-  if (result.processing || state.events.length == 0) {
+  // Only proceed if we're not already processing an event.
+  if (event_queue.length === 0 || event_processing) {
     return;
   }
 
   // Set processing to true to block other events from being processed.
-  setResult({ ...result, processing: true });
+  event_processing = true
 
   // Apply the next event in the queue.
-  const event = state.events.shift();
-
-  // Set new events to avoid reprocessing the same event.
-  setState(currentState => ({ ...currentState, events: state.events }));
+  const event = event_queue.shift();
 
+  let eventSent = false
   // Process events with handlers via REST and all others via websockets.
-  let eventSent = false;
   if (event.handler) {
-    eventSent = await applyRestEvent(event, state, setResult);
+    eventSent = await applyRestEvent(event, currentState);
   } else {
-    eventSent = await applyEvent(event, router, socket);
+    eventSent = await applyEvent(event, socket);
   }
-
   // If no event was sent, set processing to false.
   if (!eventSent) {
-    setResult({ ...result, final: true, processing: false });
+    event_processing = false;
+    // recursively call processEvent to drain the queue, since there is
+    // no state update to trigger the useEffect event loop.
+    await processEvent(socket)
   }
-};
+}
 
 /**
  * Connect to a websocket and set the handlers.
  * @param socket The socket object to connect.
- * @param state The state object to apply the deltas to.
- * @param setState The function to set the state.
- * @param result The current result.
- * @param setResult The function to set the result.
- * @param endpoint The endpoint to connect to.
+ * @param dispatch The function to queue state update
  * @param transports The transports to use.
+ * @param setNotConnected The function to update connection state.
+ * @param initial_events Array of events to seed the queue after connecting.
  */
 export const connect = async (
   socket,
-  state,
-  setState,
-  result,
-  setResult,
-  router,
+  dispatch,
   transports,
-  setNotConnected
+  setNotConnected,
+  initial_events = [],
 ) => {
-  // Get backend URL object from the endpoint
+  // Get backend URL object from the endpoint.
   const endpoint = new URL(EVENTURL);
   // Create the socket.
   socket.current = io(EVENTURL, {
@@ -270,7 +271,7 @@ export const connect = async (
 
   // Once the socket is open, hydrate the page.
   socket.current.on("connect", () => {
-    processEvent(state, setState, result, setResult, router, socket.current);
+    queueEvents(initial_events, socket)
     setNotConnected(false)
   });
 
@@ -278,16 +279,14 @@ export const connect = async (
     setNotConnected(true)
   });
 
-  // On each received message, apply the delta and set the result.
-  socket.current.on("event", update => {
-    update = JSON5.parse(update);
-    applyDelta(state, update.delta);
-    setResult(result => ({
-      state: state,
-      events: [...result.events, ...update.events],
-      final: update.final,
-      processing: true,
-    }));
+  // On each received message, queue the updates and events.
+  socket.current.on("event", message => {
+    const update = JSON5.parse(message)
+    dispatch(update.delta)
+    event_processing = !update.final
+    if (update.events) {
+      queueEvents(update.events, socket)
+    }
   });
 };
 
@@ -295,13 +294,11 @@ export const connect = async (
  * Upload files to the server.
  *
  * @param state The state to apply the delta to.
- * @param setResult The function to set the result.
  * @param handler The handler to use.
- * @param endpoint The endpoint to upload to.
  *
  * @returns Whether the files were uploaded.
  */
-export const uploadFiles = async (state, setResult, handler) => {
+export const uploadFiles = async (state, handler) => {
   const files = state.files;
 
   // return if there's no file to upload
@@ -350,7 +347,6 @@ export const uploadFiles = async (state, setResult, handler) => {
  * Create an event object.
  * @param name The name of the event.
  * @param payload The payload of the event.
- * @param use_websocket Whether the event uses websocket.
  * @param handler The client handler to process event.
  * @returns The event object.
  */
@@ -358,6 +354,52 @@ export const E = (name, payload = {}, handler = null) => {
   return { name, payload, handler };
 };
 
+/**
+ * Establish websocket event loop for a NextJS page.
+ * @param initial_state The initial page state.
+ * @param initial_events Array of events to seed the queue after connecting.
+ *
+ * @returns [state, Event, notConnected] -
+ *   state is a reactive dict,
+ *   Event is used to queue an event, and
+ *   notConnected is a reactive boolean indicating whether the websocket is connected.
+ */
+export const useEventLoop = (
+  initial_state = {},
+  initial_events = [],
+) => {
+  const socket = useRef(null)
+  const router = useRouter()
+  const [state, dispatch] = useReducer(applyDelta, initial_state)
+  const [notConnected, setNotConnected] = useState(false)
+  
+  // Function to add new events to the event queue.
+  const Event = (events, _e) => {
+      preventDefault(_e);
+      queueEvents(events, socket)
+  }
+
+  // Main event loop.
+  useEffect(() => {
+    // Skip if the router is not ready.
+    if (!router.isReady) {
+      return;
+    }
+
+    // Initialize the websocket connection.
+    if (!socket.current) {
+      connect(socket, dispatch, ['websocket', 'polling'], setNotConnected, initial_events)
+    }
+    (async () => {
+      // Process all outstanding events.
+      while (event_queue.length > 0 && !event_processing) {
+        await processEvent(socket.current)
+      }
+    })()
+  })
+  return [state, Event, notConnected]
+}
+
 /***
  * Check if a value is truthy in python.
  * @param val The value to check.

+ 11 - 5
reflex/compiler/compiler.py

@@ -6,6 +6,7 @@ from typing import List, Set, Tuple, Type
 from reflex import constants
 from reflex.compiler import templates, utils
 from reflex.components.component import Component, ComponentStyle, CustomComponent
+from reflex.route import get_route_args
 from reflex.state import State
 from reflex.utils import imports
 from reflex.vars import ImportVar
@@ -20,8 +21,6 @@ DEFAULT_IMPORTS: imports.ImportDict = {
     },
     "next/router": {ImportVar(tag="useRouter")},
     f"/{constants.STATE_PATH}": {
-        ImportVar(tag="connect"),
-        ImportVar(tag="processEvent"),
         ImportVar(tag="uploadFiles"),
         ImportVar(tag="E"),
         ImportVar(tag="isTrue"),
@@ -30,6 +29,7 @@ DEFAULT_IMPORTS: imports.ImportDict = {
         ImportVar(tag="getRefValue"),
         ImportVar(tag="getRefValues"),
         ImportVar(tag="getAllLocalStorageItems"),
+        ImportVar(tag="useEventLoop"),
     },
     "": {ImportVar(tag="focus-visible/dist/focus-visible")},
     "@chakra-ui/react": {
@@ -68,7 +68,10 @@ def _compile_theme(theme: dict) -> str:
 
 
 def _compile_page(
-    component: Component, state: Type[State], connect_error_component
+    component: Component,
+    state: Type[State],
+    connect_error_component,
+    is_dynamic: bool,
 ) -> str:
     """Compile the component given the app state.
 
@@ -76,7 +79,7 @@ def _compile_page(
         component: The component to compile.
         state: The app state.
         connect_error_component: The component to render on sever connection error.
-
+        is_dynamic: if True, include route change re-hydration logic
 
     Returns:
         The compiled component.
@@ -96,6 +99,7 @@ def _compile_page(
         render=component.render(),
         transports=constants.Transports.POLLING_WEBSOCKET.get_transports(),
         err_comp=connect_error_component.render() if connect_error_component else None,
+        is_dynamic=is_dynamic,
     )
 
 
@@ -203,7 +207,9 @@ def compile_page(
     output_path = utils.get_page_path(path)
 
     # Add the style to the component.
-    code = _compile_page(component, state, connect_error_component)
+    code = _compile_page(
+        component, state, connect_error_component, is_dynamic=bool(get_route_args(path))
+    )
     return output_path, code
 
 

+ 0 - 2
reflex/compiler/utils.py

@@ -19,7 +19,6 @@ from reflex.components.base import (
     Title,
 )
 from reflex.components.component import Component, ComponentStyle, CustomComponent
-from reflex.event import get_hydrate_event
 from reflex.state import State
 from reflex.style import Style
 from reflex.utils import format, imports, path_ops
@@ -129,7 +128,6 @@ def compile_state(state: Type[State]) -> Dict:
         initial_state = state().dict(include_computed=False)
     initial_state.update(
         {
-            "events": [{"name": get_hydrate_event(state)}],
             "files": [],
         }
     )