123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- from __future__ import annotations
- import asyncio
- import re
- from typing import Any, List, Optional, Set, Type, TypeVar, Union, overload
- from uuid import uuid4
- import httpx
- import socketio
- from nicegui import Client, ElementFilter, ui
- from nicegui.element import Element
- from nicegui.nicegui import _on_handshake
- from .user_download import UserDownload
- from .user_interaction import UserInteraction
- from .user_navigate import UserNavigate
- from .user_notify import UserNotify
- # pylint: disable=protected-access
- T = TypeVar('T', bound=Element)
- class User:
- current_user: Optional[User] = None
- def __init__(self, client: httpx.AsyncClient) -> None:
- self.http_client = client
- self.sio = socketio.AsyncClient()
- self.client: Optional[Client] = None
- self.back_history: List[str] = []
- self.forward_history: List[str] = []
- self.navigate = UserNavigate(self)
- self.notify = UserNotify()
- self.download = UserDownload(self)
- def __getattribute__(self, name: str) -> Any:
- if name not in {'notify', 'navigate', 'download'}: # NOTE: avoid infinite recursion
- ui.navigate = self.navigate
- ui.notify = self.notify
- ui.download = self.download
- return super().__getattribute__(name)
- async def open(self, path: str, *, clear_forward_history: bool = True) -> Client:
- """Open the given path."""
- response = await self.http_client.get(path, follow_redirects=True)
- assert response.status_code == 200, f'Expected status code 200, got {response.status_code}'
- if response.headers.get('X-Nicegui-Content') != 'page':
- raise ValueError(f'Expected a page response, got {response.text}')
- match = re.search(r"'client_id': '([0-9a-f-]+)'", response.text)
- assert match is not None
- client_id = match.group(1)
- self.client = Client.instances[client_id]
- self.sio.on('connect')
- await _on_handshake(f'test-{uuid4()}', {'client_id': self.client.id, 'tab_id': str(uuid4())})
- self.back_history.append(path)
- if clear_forward_history:
- self.forward_history.clear()
- return self.client
- @overload
- async def should_see(self,
- target: Union[str, Type[T]],
- *,
- retries: int = 3,
- ) -> None:
- ...
- @overload
- async def should_see(self,
- *,
- kind: Optional[Type[T]] = None,
- marker: Union[str, List[str], None] = None,
- content: Union[str, List[str], None] = None,
- retries: int = 3,
- ) -> None:
- ...
- async def should_see(self,
- target: Union[str, Type[T], None] = None,
- *,
- kind: Optional[Type[T]] = None,
- marker: Union[str, List[str], None] = None,
- content: Union[str, List[str], None] = None,
- retries: int = 3,
- ) -> None:
- """Assert that the page contains an element fulfilling certain filter rules.
- Note that there is no scrolling in the user simulation -- the entire page is always *visible*.
- Due to asynchronous execution, sometimes the expected elements only appear after a short delay.
- By default `should_see` makes three attempts to find the element before failing.
- This can be adjusted with the `retries` parameter.
- """
- assert self.client
- for _ in range(retries):
- with self.client:
- if self.notify.contains(target) or self._gather_elements(target, kind, marker, content):
- return
- await asyncio.sleep(0.1)
- raise AssertionError('expected to see at least one ' + self._build_error_message(target, kind, marker, content))
- @overload
- async def should_not_see(self,
- target: Union[str, Type[T]],
- *,
- retries: int = 3,
- ) -> None:
- ...
- @overload
- async def should_not_see(self,
- *,
- kind: Optional[Type[T]] = None,
- marker: Union[str, List[str], None] = None,
- content: Union[str, List[str], None] = None,
- retries: int = 3,
- ) -> None:
- ...
- async def should_not_see(self,
- target: Union[str, Type[T], None] = None,
- *,
- kind: Optional[Type[T]] = None,
- marker: Union[str, List[str], None] = None,
- content: Union[str, List[str], None] = None,
- retries: int = 3,
- ) -> None:
- """Assert that the page does not contain an input with the given value."""
- assert self.client
- for _ in range(retries):
- with self.client:
- if not self.notify.contains(target) and not self._gather_elements(target, kind, marker, content):
- return
- await asyncio.sleep(0.05)
- raise AssertionError('expected not to see any ' + self._build_error_message(target, kind, marker, content))
- @overload
- def find(self,
- target: str,
- ) -> UserInteraction[Element]:
- ...
- @overload
- def find(self,
- target: Type[T],
- ) -> UserInteraction[T]:
- ...
- @overload
- def find(self: User,
- *,
- marker: Union[str, List[str], None] = None,
- content: Union[str, List[str], None] = None,
- ) -> UserInteraction[Element]:
- ...
- @overload
- def find(self,
- *,
- kind: Type[T],
- marker: Union[str, List[str], None] = None,
- content: Union[str, List[str], None] = None,
- ) -> UserInteraction[T]:
- ...
- def find(self,
- target: Union[str, Type[T], None] = None,
- *,
- kind: Optional[Type[T]] = None,
- marker: Union[str, List[str], None] = None,
- content: Union[str, List[str], None] = None,
- ) -> UserInteraction[T]:
- """Select elements for interaction."""
- assert self.client
- with self.client:
- elements = self._gather_elements(target, kind, marker, content)
- if not elements:
- raise AssertionError('expected to find at least one ' +
- self._build_error_message(target, kind, marker, content))
- return UserInteraction(self, elements, target)
- @property
- def current_layout(self) -> Element:
- """Return the root layout element of the current page."""
- assert self.client
- return self.client.layout
- def _gather_elements(self,
- target: Union[str, Type[T], None] = None,
- kind: Optional[Type[T]] = None,
- marker: Union[str, List[str], None] = None,
- content: Union[str, List[str], None] = None,
- ) -> Set[T]:
- if target is None:
- if kind is None:
- return set(ElementFilter(marker=marker, content=content)) # type: ignore
- return set(ElementFilter(kind=kind, marker=marker, content=content))
- elif isinstance(target, str):
- return set(ElementFilter(marker=target)).union(ElementFilter(content=target)) # type: ignore
- else:
- return set(ElementFilter(kind=target))
- def _build_error_message(self,
- target: Union[str, Type[T], None] = None,
- kind: Optional[Type[T]] = None,
- marker: Union[str, List[str], None] = None,
- content: Union[str, List[str], None] = None,
- ) -> str:
- if isinstance(target, str):
- return f'element with marker={target} or content={target} on the page:\n{self.current_layout}'
- elif target is not None:
- return f'element of type {target.__name__} on the page:\n{self.current_layout}'
- elif kind is not None:
- return f'element of type {kind.__name__} with {marker=} and {content=} on the page:\n{self.current_layout}'
- else:
- return f'element with {marker=} and {content=} on the page:\n{self.current_layout}'
|