screen.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import os
  2. import threading
  3. import time
  4. from contextlib import contextmanager
  5. from typing import List, Optional
  6. import pytest
  7. from selenium import webdriver
  8. from selenium.common.exceptions import (ElementNotInteractableException, NoSuchElementException,
  9. StaleElementReferenceException)
  10. from selenium.webdriver import ActionChains
  11. from selenium.webdriver.common.by import By
  12. from selenium.webdriver.remote.webelement import WebElement
  13. from nicegui import Server, app, ui
  14. from .test_helpers import TEST_DIR
  15. class Screen:
  16. PORT = 3392
  17. IMPLICIT_WAIT = 4
  18. SCREENSHOT_DIR = TEST_DIR / 'screenshots'
  19. def __init__(self, selenium: webdriver.Chrome, caplog: pytest.LogCaptureFixture) -> None:
  20. self.selenium = selenium
  21. self.caplog = caplog
  22. self.server_thread: Optional[threading.Thread] = None
  23. self.ui_run_kwargs = {'port': self.PORT, 'show': False, 'reload': False}
  24. self.connected = threading.Event()
  25. app.on_connect(self.connected.set)
  26. def start_server(self) -> None:
  27. """Start the webserver in a separate thread. This is the equivalent of `ui.run()` in a normal script."""
  28. self.server_thread = threading.Thread(target=ui.run, kwargs=self.ui_run_kwargs)
  29. self.server_thread.start()
  30. @property
  31. def is_open(self) -> None:
  32. # https://stackoverflow.com/a/66150779/3419103
  33. try:
  34. self.selenium.current_url # pylint: disable=pointless-statement
  35. return True
  36. except Exception as e:
  37. print(e)
  38. return False
  39. def stop_server(self) -> None:
  40. """Stop the webserver."""
  41. self.close()
  42. self.caplog.clear()
  43. Server.instance.should_exit = True
  44. if self.server_thread:
  45. self.server_thread.join()
  46. def open(self, path: str, timeout: float = 3.0) -> None:
  47. """Try to open the page until the server is ready or we time out.
  48. If the server is not yet running, start it.
  49. """
  50. if self.server_thread is None:
  51. self.start_server()
  52. deadline = time.time() + timeout
  53. self.connected.clear()
  54. while True:
  55. try:
  56. self.selenium.get(f'http://localhost:{self.PORT}{path}')
  57. self.selenium.find_element(By.XPATH, '//body') # ensure page and JS are loaded
  58. self.connected.wait(1) # Ensure that the client has connected to the API
  59. break
  60. except Exception as e:
  61. if time.time() > deadline:
  62. raise
  63. time.sleep(0.1)
  64. if not self.server_thread.is_alive():
  65. raise RuntimeError('The NiceGUI server has stopped running') from e
  66. def close(self) -> None:
  67. if self.is_open:
  68. self.selenium.close()
  69. def switch_to(self, tab_id: int) -> None:
  70. window_count = len(self.selenium.window_handles)
  71. if tab_id > window_count:
  72. raise IndexError(f'Could not go to or create tab {tab_id}, there are only {window_count} tabs')
  73. elif tab_id == window_count:
  74. self.selenium.switch_to.new_window('tab')
  75. else:
  76. self.selenium.switch_to.window(self.selenium.window_handles[tab_id])
  77. def should_contain(self, text: str) -> None:
  78. if self.selenium.title == text:
  79. return
  80. self.find(text)
  81. def wait_for(self, text: str) -> None:
  82. self.should_contain(text)
  83. def should_not_contain(self, text: str, wait: float = 0.5) -> None:
  84. assert self.selenium.title != text
  85. self.selenium.implicitly_wait(wait)
  86. with pytest.raises(AssertionError):
  87. self.find(text)
  88. self.selenium.implicitly_wait(self.IMPLICIT_WAIT)
  89. def should_contain_input(self, text: str) -> None:
  90. deadline = time.time() + self.IMPLICIT_WAIT
  91. while time.time() < deadline:
  92. for input_element in self.find_all_by_tag('input'):
  93. if input_element.get_attribute('value') == text:
  94. return
  95. self.wait(0.1)
  96. raise AssertionError(f'Could not find input with value "{text}"')
  97. def click(self, target_text: str) -> WebElement:
  98. element = self.find(target_text)
  99. try:
  100. element.click()
  101. except ElementNotInteractableException as e:
  102. raise AssertionError(f'Could not click on "{target_text}" on:\n{element.get_attribute("outerHTML")}') from e
  103. return element
  104. def context_click(self, target_text: str) -> WebElement:
  105. element = self.find(target_text)
  106. action = ActionChains(self.selenium)
  107. action.context_click(element).perform()
  108. return element
  109. def click_at_position(self, element: WebElement, x: int, y: int) -> None:
  110. action = ActionChains(self.selenium)
  111. action.move_to_element_with_offset(element, x, y).click().perform()
  112. def type(self, text: str) -> None:
  113. self.selenium.execute_script("window.focus();")
  114. self.wait(0.2)
  115. self.selenium.switch_to.active_element.send_keys(text)
  116. def find(self, text: str) -> WebElement:
  117. try:
  118. query = f'//*[not(self::script) and not(self::style) and text()[contains(., "{text}")]]'
  119. element = self.selenium.find_element(By.XPATH, query)
  120. try:
  121. if not element.is_displayed():
  122. self.wait(0.1) # HACK: repeat check after a short delay to avoid timing issue on fast machines
  123. if not element.is_displayed():
  124. raise AssertionError(f'Found "{text}" but it is hidden')
  125. except StaleElementReferenceException as e:
  126. raise AssertionError(f'Found "{text}" but it is hidden') from e
  127. return element
  128. except NoSuchElementException as e:
  129. raise AssertionError(f'Could not find "{text}"') from e
  130. def find_element(self, element: ui.element) -> WebElement:
  131. return self.selenium.find_element(By.ID, f'c{element.id}')
  132. def find_by_class(self, name: str) -> WebElement:
  133. return self.selenium.find_element(By.CLASS_NAME, name)
  134. def find_all_by_class(self, name: str) -> WebElement:
  135. return self.selenium.find_elements(By.CLASS_NAME, name)
  136. def find_by_tag(self, name: str) -> WebElement:
  137. return self.selenium.find_element(By.TAG_NAME, name)
  138. def find_all_by_tag(self, name: str) -> List[WebElement]:
  139. return self.selenium.find_elements(By.TAG_NAME, name)
  140. def render_js_logs(self) -> str:
  141. console = '\n'.join(l['message'] for l in self.selenium.get_log('browser'))
  142. return f'-- console logs ---\n{console}\n---------------------'
  143. def get_attributes(self, tag: str, attribute: str) -> List[str]:
  144. return [t.get_attribute(attribute) for t in self.find_all_by_tag(tag)]
  145. def wait(self, t: float) -> None:
  146. time.sleep(t)
  147. def shot(self, name: str) -> None:
  148. os.makedirs(self.SCREENSHOT_DIR, exist_ok=True)
  149. filename = f'{self.SCREENSHOT_DIR}/{name}.png'
  150. print(f'Storing screenshot to {filename}')
  151. self.selenium.get_screenshot_as_file(filename)
  152. def assert_py_logger(self, level: str, message: str) -> None:
  153. """Assert that the Python logger has received a message with the given level and text."""
  154. try:
  155. assert self.caplog.records, 'Expected a log message'
  156. record = self.caplog.records[0]
  157. print(record.levelname, record.message)
  158. assert record.levelname.strip() == level, f'Expected "{level}" but got "{record.levelname}"'
  159. assert record.message.strip() == message, f'Expected "{message}" but got "{record.message}"'
  160. finally:
  161. self.caplog.records.clear()
  162. @contextmanager
  163. def implicitly_wait(self, t: float) -> None:
  164. self.selenium.implicitly_wait(t)
  165. yield
  166. self.selenium.implicitly_wait(self.IMPLICIT_WAIT)