Explorar o código

stop timer on private page after socket has been closed

Falko Schindler %!s(int64=2) %!d(string=hai) anos
pai
achega
f806bad4e4
Modificáronse 5 ficheiros con 74 adicións e 4 borrados
  1. 1 0
      nicegui/page.py
  2. 12 2
      nicegui/timer.py
  3. 2 1
      tests/conftest.py
  4. 14 1
      tests/screen.py
  5. 45 0
      tests/test_timer.py

+ 1 - 0
nicegui/page.py

@@ -49,6 +49,7 @@ class Page(jp.QuasarPage):
         self.page_ready_handler = on_page_ready
         self.page_ready_generator: Optional[Generator[None, None, None]] = None
         self.disconnect_handler = on_disconnect
+        self.shared = shared
         self.delete_flag = not shared
 
         self.waiting_javascript_commands: Dict[str, str] = {}

+ 12 - 2
nicegui/timer.py

@@ -2,12 +2,14 @@ import asyncio
 import time
 import traceback
 from collections import namedtuple
-from typing import Callable, List
+from typing import Callable, List, Optional
+
+from starlette.websockets import WebSocket
 
 from . import globals
 from .binding import BindableProperty
 from .helpers import is_coroutine
-from .page import find_parent_view
+from .page import Page, find_parent_page, find_parent_view
 from .task_logger import create_task
 
 NamedCoroutine = namedtuple('NamedCoroutine', ['name', 'coro'])
@@ -33,6 +35,8 @@ class Timer:
 
         self.active = active
         self.interval = interval
+        self.socket: Optional[WebSocket] = None
+        self.parent_page = find_parent_page()
         self.parent_view = find_parent_view()
 
         async def do_callback():
@@ -50,6 +54,12 @@ class Timer:
 
         async def loop():
             while True:
+                if not self.parent_page.shared:
+                    sockets = list(Page.sockets.get(self.parent_page.page_id, {}).values())
+                    if not self.socket and sockets:
+                        self.socket = sockets[0]
+                    elif self.socket and not sockets:
+                        return
                 try:
                     start = time.time()
                     if self.active:

+ 2 - 1
tests/conftest.py

@@ -58,5 +58,6 @@ def remove_all_screenshots() -> None:
 def screen(selenium: webdriver.Chrome, request: pytest.FixtureRequest) -> Generator[Screen, None, None]:
     screen = Screen(selenium)
     yield screen
-    screen.shot(request.node.name)
+    if screen.is_open:
+        screen.shot(request.node.name)
     screen.stop_server()

+ 14 - 1
tests/screen.py

@@ -31,9 +31,18 @@ class Screen:
         self.server_thread = threading.Thread(target=ui.run, kwargs=self.UI_RUN_KWARGS)
         self.server_thread.start()
 
+    @property
+    def is_open(self) -> None:
+        # https://stackoverflow.com/a/66150779/3419103
+        try:
+            self.selenium.current_url
+            return True
+        except:
+            return False
+
     def stop_server(self) -> None:
         '''Stop the webserver.'''
-        self.selenium.close()
+        self.close()
         globals.server.should_exit = True
         self.server_thread.join()
 
@@ -52,6 +61,10 @@ class Screen:
                 if not self.server_thread.is_alive():
                     raise RuntimeError('The NiceGUI server has stopped running')
 
+    def close(self) -> None:
+        if self.is_open:
+            self.selenium.close()
+
     def should_contain(self, text: str) -> None:
         assert self.selenium.title == text or self.find(text), \
             f'could not find "{text}" on:\n{self.render_content()}'

+ 45 - 0
tests/test_timer.py

@@ -0,0 +1,45 @@
+from nicegui import ui
+
+from .screen import Screen
+
+
+class Counter:
+    value = 0
+
+    def increment(self):
+        self.value += 1
+
+
+def test_timer(screen: Screen):
+    counter = Counter()
+    ui.timer(0.1, counter.increment)
+
+    assert counter.value == 0, 'count is initially zero'
+    screen.wait(0.5)
+    assert counter.value == 0, 'timer is not running'
+
+    screen.start_server()
+    screen.wait(0.5)
+    assert counter.value > 0, 'timer is running after starting the server'
+
+
+def test_timer_on_private_page(screen: Screen):
+    counter = Counter()
+
+    @ui.page('/')
+    def page():
+        ui.timer(0.1, counter.increment)
+
+    assert counter.value == 0, 'count is initially zero'
+    screen.start_server()
+    screen.wait(0.5)
+    assert counter.value == 0, 'timer is not running even after starting the server'
+
+    screen.open('/')
+    screen.wait(0.5)
+    assert counter.value > 0, 'timer is running after opening the page'
+
+    screen.close()
+    count = counter.value
+    screen.wait(0.5)
+    assert counter.value == count, 'timer is not running anymore after closing the page'