123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 |
- """Integration tests for file upload."""
- from __future__ import annotations
- import asyncio
- import time
- from pathlib import Path
- from typing import Generator
- import pytest
- from selenium.webdriver.common.by import By
- from reflex.testing import AppHarness, WebDriver
- def UploadFile():
- """App for testing dynamic routes."""
- from typing import Dict, List
- import reflex as rx
- LARGE_DATA = "DUMMY" * 1024 * 512
- class UploadState(rx.State):
- _file_data: Dict[str, str] = {}
- event_order: List[str] = []
- progress_dicts: List[dict] = []
- disabled: bool = False
- large_data: str = ""
- async def handle_upload(self, files: List[rx.UploadFile]):
- for file in files:
- upload_data = await file.read()
- self._file_data[file.filename or ""] = upload_data.decode("utf-8")
- async def handle_upload_secondary(self, files: List[rx.UploadFile]):
- for file in files:
- upload_data = await file.read()
- self._file_data[file.filename or ""] = upload_data.decode("utf-8")
- self.large_data = LARGE_DATA
- yield UploadState.chain_event
- def upload_progress(self, progress):
- assert progress
- self.event_order.append("upload_progress")
- self.progress_dicts.append(progress)
- def chain_event(self):
- assert self.large_data == LARGE_DATA
- self.large_data = ""
- self.event_order.append("chain_event")
- def index():
- return rx.vstack(
- rx.input(
- value=UploadState.router.session.client_token,
- read_only=True,
- id="token",
- ),
- rx.heading("Default Upload"),
- rx.upload.root(
- rx.vstack(
- rx.button("Select File"),
- rx.text("Drag and drop files here or click to select files"),
- ),
- disabled=UploadState.disabled,
- ),
- rx.button(
- "Upload",
- on_click=lambda: UploadState.handle_upload(rx.upload_files()), # type: ignore
- id="upload_button",
- ),
- rx.box(
- rx.foreach(
- rx.selected_files,
- lambda f: rx.text(f, as_="p"),
- ),
- id="selected_files",
- ),
- rx.button(
- "Clear",
- on_click=rx.clear_selected_files,
- id="clear_button",
- ),
- rx.heading("Secondary Upload"),
- rx.upload.root(
- rx.vstack(
- rx.button("Select File"),
- rx.text("Drag and drop files here or click to select files"),
- ),
- id="secondary",
- ),
- rx.button(
- "Upload",
- on_click=UploadState.handle_upload_secondary( # type: ignore
- rx.upload_files(
- upload_id="secondary",
- on_upload_progress=UploadState.upload_progress,
- ),
- ),
- id="upload_button_secondary",
- ),
- rx.box(
- rx.foreach(
- rx.selected_files("secondary"),
- lambda f: rx.text(f, as_="p"),
- ),
- id="selected_files_secondary",
- ),
- rx.button(
- "Clear",
- on_click=rx.clear_selected_files("secondary"),
- id="clear_button_secondary",
- ),
- rx.vstack(
- rx.foreach(
- UploadState.progress_dicts, # type: ignore
- lambda d: rx.text(d.to_string()),
- )
- ),
- rx.button(
- "Cancel",
- on_click=rx.cancel_upload("secondary"),
- id="cancel_button_secondary",
- ),
- )
- app = rx.App(state=rx.State)
- app.add_page(index)
- @pytest.fixture(scope="module")
- def upload_file(tmp_path_factory) -> Generator[AppHarness, None, None]:
- """Start UploadFile app at tmp_path via AppHarness.
- Args:
- tmp_path_factory: pytest tmp_path_factory fixture
- Yields:
- running AppHarness instance
- """
- with AppHarness.create(
- root=tmp_path_factory.mktemp("upload_file"),
- app_source=UploadFile,
- ) as harness:
- yield harness
- @pytest.fixture
- def driver(upload_file: AppHarness):
- """Get an instance of the browser open to the upload_file app.
- Args:
- upload_file: harness for DynamicRoute app
- Yields:
- WebDriver instance.
- """
- assert upload_file.app_instance is not None, "app is not running"
- driver = upload_file.frontend()
- try:
- yield driver
- finally:
- driver.quit()
- @pytest.mark.parametrize("secondary", [False, True])
- @pytest.mark.asyncio
- async def test_upload_file(
- tmp_path, upload_file: AppHarness, driver: WebDriver, secondary: bool
- ):
- """Submit a file upload and check that it arrived on the backend.
- Args:
- tmp_path: pytest tmp_path fixture
- upload_file: harness for UploadFile app.
- driver: WebDriver instance.
- secondary: whether to use the secondary upload form
- """
- assert upload_file.app_instance is not None
- token_input = driver.find_element(By.ID, "token")
- assert token_input
- # wait for the backend connection to send the token
- token = upload_file.poll_for_value(token_input)
- assert token is not None
- full_state_name = upload_file.get_full_state_name(["_upload_state"])
- state_name = upload_file.get_state_name("_upload_state")
- substate_token = f"{token}_{full_state_name}"
- suffix = "_secondary" if secondary else ""
- upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[
- 1 if secondary else 0
- ]
- assert upload_box
- upload_button = driver.find_element(By.ID, f"upload_button{suffix}")
- assert upload_button
- exp_name = "test.txt"
- exp_contents = "test file contents!"
- target_file = tmp_path / exp_name
- target_file.write_text(exp_contents)
- upload_box.send_keys(str(target_file))
- upload_button.click()
- # look up the backend state and assert on uploaded contents
- async def get_file_data():
- return (
- (await upload_file.get_state(substate_token))
- .substates[state_name]
- ._file_data
- )
- file_data = await AppHarness._poll_for_async(get_file_data)
- assert isinstance(file_data, dict)
- normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
- assert normalized_file_data[Path(exp_name).name] == exp_contents
- # check that the selected files are displayed
- selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
- assert Path(selected_files.text).name == Path(exp_name).name
- state = await upload_file.get_state(substate_token)
- if secondary:
- # only the secondary form tracks progress and chain events
- assert state.substates[state_name].event_order.count("upload_progress") == 1
- assert state.substates[state_name].event_order.count("chain_event") == 1
- @pytest.mark.asyncio
- async def test_upload_file_multiple(tmp_path, upload_file: AppHarness, driver):
- """Submit several file uploads and check that they arrived on the backend.
- Args:
- tmp_path: pytest tmp_path fixture
- upload_file: harness for UploadFile app.
- driver: WebDriver instance.
- """
- assert upload_file.app_instance is not None
- token_input = driver.find_element(By.ID, "token")
- assert token_input
- # wait for the backend connection to send the token
- token = upload_file.poll_for_value(token_input)
- assert token is not None
- full_state_name = upload_file.get_full_state_name(["_upload_state"])
- state_name = upload_file.get_state_name("_upload_state")
- substate_token = f"{token}_{full_state_name}"
- upload_box = driver.find_element(By.XPATH, "//input[@type='file']")
- assert upload_box
- upload_button = driver.find_element(By.ID, "upload_button")
- assert upload_button
- exp_files = {
- "test1.txt": "test file contents!",
- "test2.txt": "this is test file number 2!",
- "reflex.txt": "reflex is awesome!",
- }
- for exp_name, exp_contents in exp_files.items():
- target_file = tmp_path / exp_name
- target_file.write_text(exp_contents)
- upload_box.send_keys(str(target_file))
- time.sleep(0.2)
- # check that the selected files are displayed
- selected_files = driver.find_element(By.ID, "selected_files")
- assert [Path(name).name for name in selected_files.text.split("\n")] == [
- Path(name).name for name in exp_files
- ]
- # do the upload
- upload_button.click()
- # look up the backend state and assert on uploaded contents
- async def get_file_data():
- return (
- (await upload_file.get_state(substate_token))
- .substates[state_name]
- ._file_data
- )
- file_data = await AppHarness._poll_for_async(get_file_data)
- assert isinstance(file_data, dict)
- normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
- for exp_name, exp_contents in exp_files.items():
- assert normalized_file_data[Path(exp_name).name] == exp_contents
- @pytest.mark.parametrize("secondary", [False, True])
- def test_clear_files(
- tmp_path, upload_file: AppHarness, driver: WebDriver, secondary: bool
- ):
- """Select then clear several file uploads and check that they are cleared.
- Args:
- tmp_path: pytest tmp_path fixture
- upload_file: harness for UploadFile app.
- driver: WebDriver instance.
- secondary: whether to use the secondary upload form.
- """
- assert upload_file.app_instance is not None
- token_input = driver.find_element(By.ID, "token")
- assert token_input
- # wait for the backend connection to send the token
- token = upload_file.poll_for_value(token_input)
- assert token is not None
- suffix = "_secondary" if secondary else ""
- upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[
- 1 if secondary else 0
- ]
- assert upload_box
- upload_button = driver.find_element(By.ID, f"upload_button{suffix}")
- assert upload_button
- exp_files = {
- "test1.txt": "test file contents!",
- "test2.txt": "this is test file number 2!",
- "reflex.txt": "reflex is awesome!",
- }
- for exp_name, exp_contents in exp_files.items():
- target_file = tmp_path / exp_name
- target_file.write_text(exp_contents)
- upload_box.send_keys(str(target_file))
- time.sleep(0.2)
- # check that the selected files are displayed
- selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
- assert [Path(name).name for name in selected_files.text.split("\n")] == [
- Path(name).name for name in exp_files
- ]
- clear_button = driver.find_element(By.ID, f"clear_button{suffix}")
- assert clear_button
- clear_button.click()
- # check that the selected files are cleared
- selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
- assert selected_files.text == ""
- # TODO: drag and drop directory
- # https://gist.github.com/florentbr/349b1ab024ca9f3de56e6bf8af2ac69e
- @pytest.mark.asyncio
- async def test_cancel_upload(tmp_path, upload_file: AppHarness, driver: WebDriver):
- """Submit a large file upload and cancel it.
- Args:
- tmp_path: pytest tmp_path fixture
- upload_file: harness for UploadFile app.
- driver: WebDriver instance.
- """
- assert upload_file.app_instance is not None
- token_input = driver.find_element(By.ID, "token")
- assert token_input
- # wait for the backend connection to send the token
- token = upload_file.poll_for_value(token_input)
- assert token is not None
- state_name = upload_file.get_state_name("_upload_state")
- state_full_name = upload_file.get_full_state_name(["_upload_state"])
- substate_token = f"{token}_{state_full_name}"
- upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[1]
- upload_button = driver.find_element(By.ID, "upload_button_secondary")
- cancel_button = driver.find_element(By.ID, "cancel_button_secondary")
- exp_name = "large.txt"
- target_file = tmp_path / exp_name
- with target_file.open("wb") as f:
- f.seek(1024 * 1024 * 256)
- f.write(b"0")
- upload_box.send_keys(str(target_file))
- upload_button.click()
- await asyncio.sleep(0.3)
- cancel_button.click()
- # Wait a bit for the upload to get cancelled.
- await asyncio.sleep(0.5)
- # Get interim progress dicts saved in the on_upload_progress handler.
- async def _progress_dicts():
- state = await upload_file.get_state(substate_token)
- return state.substates[state_name].progress_dicts
- # We should have _some_ progress
- assert await AppHarness._poll_for_async(_progress_dicts)
- # But there should never be a final progress record for a cancelled upload.
- for p in await _progress_dicts():
- assert p["progress"] != 1
- state = await upload_file.get_state(substate_token)
- file_data = state.substates[state_name]._file_data
- assert isinstance(file_data, dict)
- normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
- assert Path(exp_name).name not in normalized_file_data
- target_file.unlink()
|