1
0

screen.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. import os
  2. import threading
  3. import time
  4. from typing import List
  5. import pytest
  6. from bs4 import BeautifulSoup
  7. from selenium import webdriver
  8. from selenium.common.exceptions import ElementNotInteractableException, NoSuchElementException
  9. from selenium.webdriver import ActionChains
  10. from selenium.webdriver.common.by import By
  11. from selenium.webdriver.remote.webelement import WebElement
  12. from nicegui import globals, ui
  13. PORT = 3392
  14. IGNORED_CLASSES = ['row', 'column', 'q-card', 'q-field', 'q-field__label', 'q-input']
  15. def remove_prefix(text: str, prefix: str) -> str:
  16. if prefix and text.startswith(prefix):
  17. return text[len(prefix):]
  18. return text
  19. class Screen:
  20. SCREENSHOT_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'screenshots')
  21. UI_RUN_KWARGS = {'port': PORT, 'show': False, 'reload': False}
  22. def __init__(self, selenium: webdriver.Chrome, caplog: pytest.LogCaptureFixture) -> None:
  23. self.selenium = selenium
  24. self.caplog = caplog
  25. self.server_thread = None
  26. def start_server(self) -> None:
  27. '''Start the webserver in a separate thread. This is the equivalent of `ui.run()` in a normal script.'''
  28. self.server_thread = threading.Thread(target=ui.run, kwargs=self.UI_RUN_KWARGS)
  29. self.server_thread.start()
  30. @property
  31. def is_open(self) -> None:
  32. # https://stackoverflow.com/a/66150779/3419103
  33. try:
  34. self.selenium.current_url
  35. return True
  36. except:
  37. return False
  38. def stop_server(self) -> None:
  39. '''Stop the webserver.'''
  40. self.close()
  41. globals.server.should_exit = True
  42. self.server_thread.join()
  43. def open(self, path: str) -> None:
  44. if self.server_thread is None:
  45. self.start_server()
  46. start = time.time()
  47. while True:
  48. try:
  49. self.selenium.get(f'http://localhost:{PORT}{path}')
  50. break
  51. except Exception:
  52. if time.time() - start > 3:
  53. raise
  54. time.sleep(0.1)
  55. if not self.server_thread.is_alive():
  56. raise RuntimeError('The NiceGUI server has stopped running')
  57. def close(self) -> None:
  58. if self.is_open:
  59. self.selenium.close()
  60. def should_contain(self, text: str) -> None:
  61. assert self.selenium.title == text or self.find(text), f'could not find "{text}"'
  62. def should_not_contain(self, text: str) -> None:
  63. assert self.selenium.title != text
  64. with pytest.raises(AssertionError):
  65. self.find(text)
  66. def click(self, target_text: str) -> WebElement:
  67. element = self.find(target_text)
  68. try:
  69. element.click()
  70. except ElementNotInteractableException:
  71. raise AssertionError(f'Could not click on "{target_text}" on:\n{element.get_attribute("outerHTML")}')
  72. return element
  73. def click_at_position(self, element: WebElement, x: int, y: int) -> None:
  74. action = ActionChains(self.selenium)
  75. action.move_to_element_with_offset(element, x, y).click().perform()
  76. def find(self, text: str) -> WebElement:
  77. try:
  78. query = f'//*[not(self::script) and not(self::style) and contains(text(), "{text}")]'
  79. return self.selenium.find_element(By.XPATH, query)
  80. except NoSuchElementException:
  81. raise AssertionError(f'Could not find "{text}"')
  82. def render_html(self) -> str:
  83. body = self.selenium.page_source
  84. soup = BeautifulSoup(body, 'html.parser')
  85. for element in soup.find_all():
  86. if element.name in ['script', 'style'] and len(element.text) > 10:
  87. element.string = '... removed lengthy content ...'
  88. return soup.prettify()
  89. def render_js_logs(self) -> str:
  90. console = '\n'.join(l['message'] for l in self.selenium.get_log('browser'))
  91. return f'-- console logs ---\n{console}\n---------------------'
  92. @staticmethod
  93. def simplify_input_tags(soup: BeautifulSoup) -> None:
  94. for element in soup.find_all(class_='q-field'):
  95. new = soup.new_tag('simple_input')
  96. name = element.find(class_='q-field__label').text
  97. placeholder = element.find(class_='q-field__native').get('placeholder')
  98. messages = element.find(class_='q-field__messages')
  99. value = element.find(class_='q-field__native').get('value')
  100. new.string = (f'{name}: ' if name else '') + (value or placeholder or '') + \
  101. (f' \u002A{messages.text}' if messages else '')
  102. new['class'] = element['class']
  103. element.replace_with(new)
  104. def get_tags(self, name: str) -> List[WebElement]:
  105. return self.selenium.find_elements(By.TAG_NAME, name)
  106. def get_attributes(self, tag: str, attribute: str) -> List[str]:
  107. return [t.get_attribute(attribute) for t in self.get_tags(tag)]
  108. def wait(self, t: float) -> None:
  109. time.sleep(t)
  110. def wait_for(self, text: str, *, timeout: float = 1.0) -> None:
  111. deadline = time.time() + timeout
  112. while time.time() < deadline:
  113. try:
  114. self.find(text)
  115. return
  116. except:
  117. self.wait(0.1)
  118. raise TimeoutError()
  119. def shot(self, name: str) -> None:
  120. os.makedirs(self.SCREENSHOT_DIR, exist_ok=True)
  121. filename = f'{self.SCREENSHOT_DIR}/{name}.png'
  122. print(f'Storing screenshot to {filename}')
  123. self.selenium.get_screenshot_as_file(filename)
  124. def assert_py_logger(self, name: str, message: str) -> None:
  125. assert len(self.caplog.records) == 1, 'Expected exactly one log message'
  126. record = self.caplog.records[0]
  127. print('---------------', record.levelname, record.message)
  128. assert record.levelname == name, f'Expected "{name}" but got "{record.levelname}"'
  129. assert record.message == message, f'Expected "{message}" but got "{record.message}"'
  130. self.caplog.records.clear()