test_upload.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. """Integration tests for file upload."""
  2. from __future__ import annotations
  3. import asyncio
  4. import time
  5. from pathlib import Path
  6. from typing import Generator
  7. import pytest
  8. from selenium.webdriver.common.by import By
  9. from reflex.testing import AppHarness, WebDriver
  10. def UploadFile():
  11. """App for testing dynamic routes."""
  12. from typing import Dict, List
  13. import reflex as rx
  14. LARGE_DATA = "DUMMY" * 1024 * 512
  15. class UploadState(rx.State):
  16. _file_data: Dict[str, str] = {}
  17. event_order: List[str] = []
  18. progress_dicts: List[dict] = []
  19. disabled: bool = False
  20. large_data: str = ""
  21. async def handle_upload(self, files: List[rx.UploadFile]):
  22. for file in files:
  23. upload_data = await file.read()
  24. self._file_data[file.filename or ""] = upload_data.decode("utf-8")
  25. async def handle_upload_secondary(self, files: List[rx.UploadFile]):
  26. for file in files:
  27. upload_data = await file.read()
  28. self._file_data[file.filename or ""] = upload_data.decode("utf-8")
  29. self.large_data = LARGE_DATA
  30. yield UploadState.chain_event
  31. def upload_progress(self, progress):
  32. assert progress
  33. self.event_order.append("upload_progress")
  34. self.progress_dicts.append(progress)
  35. def chain_event(self):
  36. assert self.large_data == LARGE_DATA
  37. self.large_data = ""
  38. self.event_order.append("chain_event")
  39. def index():
  40. return rx.vstack(
  41. rx.input(
  42. value=UploadState.router.session.client_token,
  43. read_only=True,
  44. id="token",
  45. ),
  46. rx.heading("Default Upload"),
  47. rx.upload.root(
  48. rx.vstack(
  49. rx.button("Select File"),
  50. rx.text("Drag and drop files here or click to select files"),
  51. ),
  52. disabled=UploadState.disabled,
  53. ),
  54. rx.button(
  55. "Upload",
  56. on_click=lambda: UploadState.handle_upload(rx.upload_files()), # type: ignore
  57. id="upload_button",
  58. ),
  59. rx.box(
  60. rx.foreach(
  61. rx.selected_files,
  62. lambda f: rx.text(f, as_="p"),
  63. ),
  64. id="selected_files",
  65. ),
  66. rx.button(
  67. "Clear",
  68. on_click=rx.clear_selected_files,
  69. id="clear_button",
  70. ),
  71. rx.heading("Secondary Upload"),
  72. rx.upload.root(
  73. rx.vstack(
  74. rx.button("Select File"),
  75. rx.text("Drag and drop files here or click to select files"),
  76. ),
  77. id="secondary",
  78. ),
  79. rx.button(
  80. "Upload",
  81. on_click=UploadState.handle_upload_secondary( # type: ignore
  82. rx.upload_files(
  83. upload_id="secondary",
  84. on_upload_progress=UploadState.upload_progress,
  85. ),
  86. ),
  87. id="upload_button_secondary",
  88. ),
  89. rx.box(
  90. rx.foreach(
  91. rx.selected_files("secondary"),
  92. lambda f: rx.text(f, as_="p"),
  93. ),
  94. id="selected_files_secondary",
  95. ),
  96. rx.button(
  97. "Clear",
  98. on_click=rx.clear_selected_files("secondary"),
  99. id="clear_button_secondary",
  100. ),
  101. rx.vstack(
  102. rx.foreach(
  103. UploadState.progress_dicts, # type: ignore
  104. lambda d: rx.text(d.to_string()),
  105. )
  106. ),
  107. rx.button(
  108. "Cancel",
  109. on_click=rx.cancel_upload("secondary"),
  110. id="cancel_button_secondary",
  111. ),
  112. )
  113. app = rx.App(state=rx.State)
  114. app.add_page(index)
  115. @pytest.fixture(scope="module")
  116. def upload_file(tmp_path_factory) -> Generator[AppHarness, None, None]:
  117. """Start UploadFile app at tmp_path via AppHarness.
  118. Args:
  119. tmp_path_factory: pytest tmp_path_factory fixture
  120. Yields:
  121. running AppHarness instance
  122. """
  123. with AppHarness.create(
  124. root=tmp_path_factory.mktemp("upload_file"),
  125. app_source=UploadFile,
  126. ) as harness:
  127. yield harness
  128. @pytest.fixture
  129. def driver(upload_file: AppHarness):
  130. """Get an instance of the browser open to the upload_file app.
  131. Args:
  132. upload_file: harness for DynamicRoute app
  133. Yields:
  134. WebDriver instance.
  135. """
  136. assert upload_file.app_instance is not None, "app is not running"
  137. driver = upload_file.frontend()
  138. try:
  139. yield driver
  140. finally:
  141. driver.quit()
  142. @pytest.mark.parametrize("secondary", [False, True])
  143. @pytest.mark.asyncio
  144. async def test_upload_file(
  145. tmp_path, upload_file: AppHarness, driver: WebDriver, secondary: bool
  146. ):
  147. """Submit a file upload and check that it arrived on the backend.
  148. Args:
  149. tmp_path: pytest tmp_path fixture
  150. upload_file: harness for UploadFile app.
  151. driver: WebDriver instance.
  152. secondary: whether to use the secondary upload form
  153. """
  154. assert upload_file.app_instance is not None
  155. token_input = driver.find_element(By.ID, "token")
  156. assert token_input
  157. # wait for the backend connection to send the token
  158. token = upload_file.poll_for_value(token_input)
  159. assert token is not None
  160. full_state_name = upload_file.get_full_state_name(["_upload_state"])
  161. state_name = upload_file.get_state_name("_upload_state")
  162. substate_token = f"{token}_{full_state_name}"
  163. suffix = "_secondary" if secondary else ""
  164. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[
  165. 1 if secondary else 0
  166. ]
  167. assert upload_box
  168. upload_button = driver.find_element(By.ID, f"upload_button{suffix}")
  169. assert upload_button
  170. exp_name = "test.txt"
  171. exp_contents = "test file contents!"
  172. target_file = tmp_path / exp_name
  173. target_file.write_text(exp_contents)
  174. upload_box.send_keys(str(target_file))
  175. upload_button.click()
  176. # look up the backend state and assert on uploaded contents
  177. async def get_file_data():
  178. return (
  179. (await upload_file.get_state(substate_token))
  180. .substates[state_name]
  181. ._file_data
  182. )
  183. file_data = await AppHarness._poll_for_async(get_file_data)
  184. assert isinstance(file_data, dict)
  185. normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
  186. assert normalized_file_data[Path(exp_name).name] == exp_contents
  187. # check that the selected files are displayed
  188. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  189. assert Path(selected_files.text).name == Path(exp_name).name
  190. state = await upload_file.get_state(substate_token)
  191. if secondary:
  192. # only the secondary form tracks progress and chain events
  193. assert state.substates[state_name].event_order.count("upload_progress") == 1
  194. assert state.substates[state_name].event_order.count("chain_event") == 1
  195. @pytest.mark.asyncio
  196. async def test_upload_file_multiple(tmp_path, upload_file: AppHarness, driver):
  197. """Submit several file uploads and check that they arrived on the backend.
  198. Args:
  199. tmp_path: pytest tmp_path fixture
  200. upload_file: harness for UploadFile app.
  201. driver: WebDriver instance.
  202. """
  203. assert upload_file.app_instance is not None
  204. token_input = driver.find_element(By.ID, "token")
  205. assert token_input
  206. # wait for the backend connection to send the token
  207. token = upload_file.poll_for_value(token_input)
  208. assert token is not None
  209. full_state_name = upload_file.get_full_state_name(["_upload_state"])
  210. state_name = upload_file.get_state_name("_upload_state")
  211. substate_token = f"{token}_{full_state_name}"
  212. upload_box = driver.find_element(By.XPATH, "//input[@type='file']")
  213. assert upload_box
  214. upload_button = driver.find_element(By.ID, "upload_button")
  215. assert upload_button
  216. exp_files = {
  217. "test1.txt": "test file contents!",
  218. "test2.txt": "this is test file number 2!",
  219. "reflex.txt": "reflex is awesome!",
  220. }
  221. for exp_name, exp_contents in exp_files.items():
  222. target_file = tmp_path / exp_name
  223. target_file.write_text(exp_contents)
  224. upload_box.send_keys(str(target_file))
  225. time.sleep(0.2)
  226. # check that the selected files are displayed
  227. selected_files = driver.find_element(By.ID, "selected_files")
  228. assert [Path(name).name for name in selected_files.text.split("\n")] == [
  229. Path(name).name for name in exp_files
  230. ]
  231. # do the upload
  232. upload_button.click()
  233. # look up the backend state and assert on uploaded contents
  234. async def get_file_data():
  235. return (
  236. (await upload_file.get_state(substate_token))
  237. .substates[state_name]
  238. ._file_data
  239. )
  240. file_data = await AppHarness._poll_for_async(get_file_data)
  241. assert isinstance(file_data, dict)
  242. normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
  243. for exp_name, exp_contents in exp_files.items():
  244. assert normalized_file_data[Path(exp_name).name] == exp_contents
  245. @pytest.mark.parametrize("secondary", [False, True])
  246. def test_clear_files(
  247. tmp_path, upload_file: AppHarness, driver: WebDriver, secondary: bool
  248. ):
  249. """Select then clear several file uploads and check that they are cleared.
  250. Args:
  251. tmp_path: pytest tmp_path fixture
  252. upload_file: harness for UploadFile app.
  253. driver: WebDriver instance.
  254. secondary: whether to use the secondary upload form.
  255. """
  256. assert upload_file.app_instance is not None
  257. token_input = driver.find_element(By.ID, "token")
  258. assert token_input
  259. # wait for the backend connection to send the token
  260. token = upload_file.poll_for_value(token_input)
  261. assert token is not None
  262. suffix = "_secondary" if secondary else ""
  263. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[
  264. 1 if secondary else 0
  265. ]
  266. assert upload_box
  267. upload_button = driver.find_element(By.ID, f"upload_button{suffix}")
  268. assert upload_button
  269. exp_files = {
  270. "test1.txt": "test file contents!",
  271. "test2.txt": "this is test file number 2!",
  272. "reflex.txt": "reflex is awesome!",
  273. }
  274. for exp_name, exp_contents in exp_files.items():
  275. target_file = tmp_path / exp_name
  276. target_file.write_text(exp_contents)
  277. upload_box.send_keys(str(target_file))
  278. time.sleep(0.2)
  279. # check that the selected files are displayed
  280. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  281. assert [Path(name).name for name in selected_files.text.split("\n")] == [
  282. Path(name).name for name in exp_files
  283. ]
  284. clear_button = driver.find_element(By.ID, f"clear_button{suffix}")
  285. assert clear_button
  286. clear_button.click()
  287. # check that the selected files are cleared
  288. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  289. assert selected_files.text == ""
  290. # TODO: drag and drop directory
  291. # https://gist.github.com/florentbr/349b1ab024ca9f3de56e6bf8af2ac69e
  292. @pytest.mark.asyncio
  293. async def test_cancel_upload(tmp_path, upload_file: AppHarness, driver: WebDriver):
  294. """Submit a large file upload and cancel it.
  295. Args:
  296. tmp_path: pytest tmp_path fixture
  297. upload_file: harness for UploadFile app.
  298. driver: WebDriver instance.
  299. """
  300. assert upload_file.app_instance is not None
  301. token_input = driver.find_element(By.ID, "token")
  302. assert token_input
  303. # wait for the backend connection to send the token
  304. token = upload_file.poll_for_value(token_input)
  305. assert token is not None
  306. state_name = upload_file.get_state_name("_upload_state")
  307. state_full_name = upload_file.get_full_state_name(["_upload_state"])
  308. substate_token = f"{token}_{state_full_name}"
  309. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[1]
  310. upload_button = driver.find_element(By.ID, "upload_button_secondary")
  311. cancel_button = driver.find_element(By.ID, "cancel_button_secondary")
  312. exp_name = "large.txt"
  313. target_file = tmp_path / exp_name
  314. with target_file.open("wb") as f:
  315. f.seek(1024 * 1024 * 256)
  316. f.write(b"0")
  317. upload_box.send_keys(str(target_file))
  318. upload_button.click()
  319. await asyncio.sleep(0.3)
  320. cancel_button.click()
  321. # Wait a bit for the upload to get cancelled.
  322. await asyncio.sleep(0.5)
  323. # Get interim progress dicts saved in the on_upload_progress handler.
  324. async def _progress_dicts():
  325. state = await upload_file.get_state(substate_token)
  326. return state.substates[state_name].progress_dicts
  327. # We should have _some_ progress
  328. assert await AppHarness._poll_for_async(_progress_dicts)
  329. # But there should never be a final progress record for a cancelled upload.
  330. for p in await _progress_dicts():
  331. assert p["progress"] != 1
  332. state = await upload_file.get_state(substate_token)
  333. file_data = state.substates[state_name]._file_data
  334. assert isinstance(file_data, dict)
  335. normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
  336. assert Path(exp_name).name not in normalized_file_data
  337. target_file.unlink()