user.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. from __future__ import annotations
  2. import asyncio
  3. import re
  4. from typing import Any, List, Optional, Set, Type, TypeVar, Union, overload
  5. from uuid import uuid4
  6. import httpx
  7. import socketio
  8. from nicegui import Client, ElementFilter, ui
  9. from nicegui.element import Element
  10. from nicegui.nicegui import _on_handshake
  11. from .user_download import UserDownload
  12. from .user_interaction import UserInteraction
  13. from .user_navigate import UserNavigate
  14. from .user_notify import UserNotify
  15. # pylint: disable=protected-access
  16. T = TypeVar('T', bound=Element)
  17. class User:
  18. current_user: Optional[User] = None
  19. def __init__(self, client: httpx.AsyncClient) -> None:
  20. self.http_client = client
  21. self.sio = socketio.AsyncClient()
  22. self.client: Optional[Client] = None
  23. self.back_history: List[str] = []
  24. self.forward_history: List[str] = []
  25. self.navigate = UserNavigate(self)
  26. self.notify = UserNotify()
  27. self.download = UserDownload(self)
  28. def __getattribute__(self, name: str) -> Any:
  29. if name not in {'notify', 'navigate', 'download'}: # NOTE: avoid infinite recursion
  30. ui.navigate = self.navigate
  31. ui.notify = self.notify
  32. ui.download = self.download
  33. return super().__getattribute__(name)
  34. async def open(self, path: str, *, clear_forward_history: bool = True) -> Client:
  35. """Open the given path."""
  36. response = await self.http_client.get(path, follow_redirects=True)
  37. assert response.status_code == 200, f'Expected status code 200, got {response.status_code}'
  38. if response.headers.get('X-Nicegui-Content') != 'page':
  39. raise ValueError(f'Expected a page response, got {response.text}')
  40. match = re.search(r"'client_id': '([0-9a-f-]+)'", response.text)
  41. assert match is not None
  42. client_id = match.group(1)
  43. self.client = Client.instances[client_id]
  44. self.sio.on('connect')
  45. await _on_handshake(f'test-{uuid4()}', {'client_id': self.client.id, 'tab_id': str(uuid4())})
  46. self.back_history.append(path)
  47. if clear_forward_history:
  48. self.forward_history.clear()
  49. return self.client
  50. @overload
  51. async def should_see(self,
  52. target: Union[str, Type[T]],
  53. *,
  54. retries: int = 3,
  55. ) -> None:
  56. ...
  57. @overload
  58. async def should_see(self,
  59. *,
  60. kind: Optional[Type[T]] = None,
  61. marker: Union[str, List[str], None] = None,
  62. content: Union[str, List[str], None] = None,
  63. retries: int = 3,
  64. ) -> None:
  65. ...
  66. async def should_see(self,
  67. target: Union[str, Type[T], None] = None,
  68. *,
  69. kind: Optional[Type[T]] = None,
  70. marker: Union[str, List[str], None] = None,
  71. content: Union[str, List[str], None] = None,
  72. retries: int = 3,
  73. ) -> None:
  74. """Assert that the page contains an element fulfilling certain filter rules.
  75. Note that there is no scrolling in the user simulation -- the entire page is always *visible*.
  76. Due to asynchronous execution, sometimes the expected elements only appear after a short delay.
  77. By default `should_see` makes three attempts to find the element before failing.
  78. This can be adjusted with the `retries` parameter.
  79. """
  80. assert self.client
  81. for _ in range(retries):
  82. with self.client:
  83. if self.notify.contains(target) or self._gather_elements(target, kind, marker, content):
  84. return
  85. await asyncio.sleep(0.1)
  86. raise AssertionError('expected to see at least one ' + self._build_error_message(target, kind, marker, content))
  87. @overload
  88. async def should_not_see(self,
  89. target: Union[str, Type[T]],
  90. *,
  91. retries: int = 3,
  92. ) -> None:
  93. ...
  94. @overload
  95. async def should_not_see(self,
  96. *,
  97. kind: Optional[Type[T]] = None,
  98. marker: Union[str, List[str], None] = None,
  99. content: Union[str, List[str], None] = None,
  100. retries: int = 3,
  101. ) -> None:
  102. ...
  103. async def should_not_see(self,
  104. target: Union[str, Type[T], None] = None,
  105. *,
  106. kind: Optional[Type[T]] = None,
  107. marker: Union[str, List[str], None] = None,
  108. content: Union[str, List[str], None] = None,
  109. retries: int = 3,
  110. ) -> None:
  111. """Assert that the page does not contain an input with the given value."""
  112. assert self.client
  113. for _ in range(retries):
  114. with self.client:
  115. if not self.notify.contains(target) and not self._gather_elements(target, kind, marker, content):
  116. return
  117. await asyncio.sleep(0.05)
  118. raise AssertionError('expected not to see any ' + self._build_error_message(target, kind, marker, content))
  119. @overload
  120. def find(self,
  121. target: str,
  122. ) -> UserInteraction[Element]:
  123. ...
  124. @overload
  125. def find(self,
  126. target: Type[T],
  127. ) -> UserInteraction[T]:
  128. ...
  129. @overload
  130. def find(self: User,
  131. *,
  132. marker: Union[str, List[str], None] = None,
  133. content: Union[str, List[str], None] = None,
  134. ) -> UserInteraction[Element]:
  135. ...
  136. @overload
  137. def find(self,
  138. *,
  139. kind: Type[T],
  140. marker: Union[str, List[str], None] = None,
  141. content: Union[str, List[str], None] = None,
  142. ) -> UserInteraction[T]:
  143. ...
  144. def find(self,
  145. target: Union[str, Type[T], None] = None,
  146. *,
  147. kind: Optional[Type[T]] = None,
  148. marker: Union[str, List[str], None] = None,
  149. content: Union[str, List[str], None] = None,
  150. ) -> UserInteraction[T]:
  151. """Select elements for interaction."""
  152. assert self.client
  153. with self.client:
  154. elements = self._gather_elements(target, kind, marker, content)
  155. if not elements:
  156. raise AssertionError('expected to find at least one ' +
  157. self._build_error_message(target, kind, marker, content))
  158. return UserInteraction(self, elements, target)
  159. @property
  160. def current_layout(self) -> Element:
  161. """Return the root layout element of the current page."""
  162. assert self.client
  163. return self.client.layout
  164. def _gather_elements(self,
  165. target: Union[str, Type[T], None] = None,
  166. kind: Optional[Type[T]] = None,
  167. marker: Union[str, List[str], None] = None,
  168. content: Union[str, List[str], None] = None,
  169. ) -> Set[T]:
  170. if target is None:
  171. if kind is None:
  172. return set(ElementFilter(marker=marker, content=content)) # type: ignore
  173. return set(ElementFilter(kind=kind, marker=marker, content=content))
  174. elif isinstance(target, str):
  175. return set(ElementFilter(marker=target)).union(ElementFilter(content=target)) # type: ignore
  176. else:
  177. return set(ElementFilter(kind=target))
  178. def _build_error_message(self,
  179. target: Union[str, Type[T], None] = None,
  180. kind: Optional[Type[T]] = None,
  181. marker: Union[str, List[str], None] = None,
  182. content: Union[str, List[str], None] = None,
  183. ) -> str:
  184. if isinstance(target, str):
  185. return f'element with marker={target} or content={target} on the page:\n{self.current_layout}'
  186. elif target is not None:
  187. return f'element of type {target.__name__} on the page:\n{self.current_layout}'
  188. elif kind is not None:
  189. return f'element of type {kind.__name__} with {marker=} and {content=} on the page:\n{self.current_layout}'
  190. else:
  191. return f'element with {marker=} and {content=} on the page:\n{self.current_layout}'