test_upload.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. """Integration tests for file upload."""
  2. from __future__ import annotations
  3. import asyncio
  4. import time
  5. from pathlib import Path
  6. from typing import Generator
  7. import pytest
  8. from selenium.webdriver.common.by import By
  9. from reflex.testing import AppHarness, WebDriver
  10. def UploadFile():
  11. """App for testing dynamic routes."""
  12. from typing import Dict, List
  13. import reflex as rx
  14. class UploadState(rx.State):
  15. _file_data: Dict[str, str] = {}
  16. event_order: List[str] = []
  17. progress_dicts: List[dict] = []
  18. async def handle_upload(self, files: List[rx.UploadFile]):
  19. for file in files:
  20. upload_data = await file.read()
  21. self._file_data[file.filename or ""] = upload_data.decode("utf-8")
  22. async def handle_upload_secondary(self, files: List[rx.UploadFile]):
  23. for file in files:
  24. upload_data = await file.read()
  25. self._file_data[file.filename or ""] = upload_data.decode("utf-8")
  26. yield UploadState.chain_event
  27. def upload_progress(self, progress):
  28. assert progress
  29. self.event_order.append("upload_progress")
  30. self.progress_dicts.append(progress)
  31. def chain_event(self):
  32. self.event_order.append("chain_event")
  33. def index():
  34. return rx.vstack(
  35. rx.input(
  36. value=UploadState.router.session.client_token,
  37. is_read_only=True,
  38. id="token",
  39. ),
  40. rx.heading("Default Upload"),
  41. rx.upload.root(
  42. rx.vstack(
  43. rx.button("Select File"),
  44. rx.text("Drag and drop files here or click to select files"),
  45. ),
  46. ),
  47. rx.button(
  48. "Upload",
  49. on_click=lambda: UploadState.handle_upload(rx.upload_files()), # type: ignore
  50. id="upload_button",
  51. ),
  52. rx.box(
  53. rx.foreach(
  54. rx.selected_files,
  55. lambda f: rx.text(f, as_="p"),
  56. ),
  57. id="selected_files",
  58. ),
  59. rx.button(
  60. "Clear",
  61. on_click=rx.clear_selected_files,
  62. id="clear_button",
  63. ),
  64. rx.heading("Secondary Upload"),
  65. rx.upload.root(
  66. rx.vstack(
  67. rx.button("Select File"),
  68. rx.text("Drag and drop files here or click to select files"),
  69. ),
  70. id="secondary",
  71. ),
  72. rx.button(
  73. "Upload",
  74. on_click=UploadState.handle_upload_secondary( # type: ignore
  75. rx.upload_files(
  76. upload_id="secondary",
  77. on_upload_progress=UploadState.upload_progress,
  78. ),
  79. ),
  80. id="upload_button_secondary",
  81. ),
  82. rx.box(
  83. rx.foreach(
  84. rx.selected_files("secondary"),
  85. lambda f: rx.text(f, as_="p"),
  86. ),
  87. id="selected_files_secondary",
  88. ),
  89. rx.button(
  90. "Clear",
  91. on_click=rx.clear_selected_files("secondary"),
  92. id="clear_button_secondary",
  93. ),
  94. rx.vstack(
  95. rx.foreach(
  96. UploadState.progress_dicts, # type: ignore
  97. lambda d: rx.text(d.to_string()),
  98. )
  99. ),
  100. rx.button(
  101. "Cancel",
  102. on_click=rx.cancel_upload("secondary"),
  103. id="cancel_button_secondary",
  104. ),
  105. )
  106. app = rx.App(state=rx.State)
  107. app.add_page(index)
  108. @pytest.fixture(scope="module")
  109. def upload_file(tmp_path_factory) -> Generator[AppHarness, None, None]:
  110. """Start UploadFile app at tmp_path via AppHarness.
  111. Args:
  112. tmp_path_factory: pytest tmp_path_factory fixture
  113. Yields:
  114. running AppHarness instance
  115. """
  116. with AppHarness.create(
  117. root=tmp_path_factory.mktemp("upload_file"),
  118. app_source=UploadFile, # type: ignore
  119. ) as harness:
  120. yield harness
  121. @pytest.fixture
  122. def driver(upload_file: AppHarness):
  123. """Get an instance of the browser open to the upload_file app.
  124. Args:
  125. upload_file: harness for DynamicRoute app
  126. Yields:
  127. WebDriver instance.
  128. """
  129. assert upload_file.app_instance is not None, "app is not running"
  130. driver = upload_file.frontend()
  131. try:
  132. yield driver
  133. finally:
  134. driver.quit()
  135. @pytest.mark.parametrize("secondary", [False, True])
  136. @pytest.mark.asyncio
  137. async def test_upload_file(
  138. tmp_path, upload_file: AppHarness, driver: WebDriver, secondary: bool
  139. ):
  140. """Submit a file upload and check that it arrived on the backend.
  141. Args:
  142. tmp_path: pytest tmp_path fixture
  143. upload_file: harness for UploadFile app.
  144. driver: WebDriver instance.
  145. secondary: whether to use the secondary upload form
  146. """
  147. assert upload_file.app_instance is not None
  148. token_input = driver.find_element(By.ID, "token")
  149. assert token_input
  150. # wait for the backend connection to send the token
  151. token = upload_file.poll_for_value(token_input)
  152. assert token is not None
  153. full_state_name = upload_file.get_full_state_name(["_upload_state"])
  154. state_name = upload_file.get_state_name("_upload_state")
  155. substate_token = f"{token}_{full_state_name}"
  156. suffix = "_secondary" if secondary else ""
  157. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[
  158. 1 if secondary else 0
  159. ]
  160. assert upload_box
  161. upload_button = driver.find_element(By.ID, f"upload_button{suffix}")
  162. assert upload_button
  163. exp_name = "test.txt"
  164. exp_contents = "test file contents!"
  165. target_file = tmp_path / exp_name
  166. target_file.write_text(exp_contents)
  167. upload_box.send_keys(str(target_file))
  168. upload_button.click()
  169. # look up the backend state and assert on uploaded contents
  170. async def get_file_data():
  171. return (
  172. (await upload_file.get_state(substate_token))
  173. .substates[state_name]
  174. ._file_data
  175. )
  176. file_data = await AppHarness._poll_for_async(get_file_data)
  177. assert isinstance(file_data, dict)
  178. normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
  179. assert normalized_file_data[Path(exp_name).name] == exp_contents
  180. # check that the selected files are displayed
  181. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  182. assert Path(selected_files.text).name == Path(exp_name).name
  183. state = await upload_file.get_state(substate_token)
  184. if secondary:
  185. # only the secondary form tracks progress and chain events
  186. assert state.substates[state_name].event_order.count("upload_progress") == 1
  187. assert state.substates[state_name].event_order.count("chain_event") == 1
  188. @pytest.mark.asyncio
  189. async def test_upload_file_multiple(tmp_path, upload_file: AppHarness, driver):
  190. """Submit several file uploads and check that they arrived on the backend.
  191. Args:
  192. tmp_path: pytest tmp_path fixture
  193. upload_file: harness for UploadFile app.
  194. driver: WebDriver instance.
  195. """
  196. assert upload_file.app_instance is not None
  197. token_input = driver.find_element(By.ID, "token")
  198. assert token_input
  199. # wait for the backend connection to send the token
  200. token = upload_file.poll_for_value(token_input)
  201. assert token is not None
  202. full_state_name = upload_file.get_full_state_name(["_upload_state"])
  203. state_name = upload_file.get_state_name("_upload_state")
  204. substate_token = f"{token}_{full_state_name}"
  205. upload_box = driver.find_element(By.XPATH, "//input[@type='file']")
  206. assert upload_box
  207. upload_button = driver.find_element(By.ID, "upload_button")
  208. assert upload_button
  209. exp_files = {
  210. "test1.txt": "test file contents!",
  211. "test2.txt": "this is test file number 2!",
  212. "reflex.txt": "reflex is awesome!",
  213. }
  214. for exp_name, exp_contents in exp_files.items():
  215. target_file = tmp_path / exp_name
  216. target_file.write_text(exp_contents)
  217. upload_box.send_keys(str(target_file))
  218. time.sleep(0.2)
  219. # check that the selected files are displayed
  220. selected_files = driver.find_element(By.ID, "selected_files")
  221. assert [Path(name).name for name in selected_files.text.split("\n")] == [
  222. Path(name).name for name in exp_files
  223. ]
  224. # do the upload
  225. upload_button.click()
  226. # look up the backend state and assert on uploaded contents
  227. async def get_file_data():
  228. return (
  229. (await upload_file.get_state(substate_token))
  230. .substates[state_name]
  231. ._file_data
  232. )
  233. file_data = await AppHarness._poll_for_async(get_file_data)
  234. assert isinstance(file_data, dict)
  235. normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
  236. for exp_name, exp_contents in exp_files.items():
  237. assert normalized_file_data[Path(exp_name).name] == exp_contents
  238. @pytest.mark.parametrize("secondary", [False, True])
  239. def test_clear_files(
  240. tmp_path, upload_file: AppHarness, driver: WebDriver, secondary: bool
  241. ):
  242. """Select then clear several file uploads and check that they are cleared.
  243. Args:
  244. tmp_path: pytest tmp_path fixture
  245. upload_file: harness for UploadFile app.
  246. driver: WebDriver instance.
  247. secondary: whether to use the secondary upload form.
  248. """
  249. assert upload_file.app_instance is not None
  250. token_input = driver.find_element(By.ID, "token")
  251. assert token_input
  252. # wait for the backend connection to send the token
  253. token = upload_file.poll_for_value(token_input)
  254. assert token is not None
  255. suffix = "_secondary" if secondary else ""
  256. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[
  257. 1 if secondary else 0
  258. ]
  259. assert upload_box
  260. upload_button = driver.find_element(By.ID, f"upload_button{suffix}")
  261. assert upload_button
  262. exp_files = {
  263. "test1.txt": "test file contents!",
  264. "test2.txt": "this is test file number 2!",
  265. "reflex.txt": "reflex is awesome!",
  266. }
  267. for exp_name, exp_contents in exp_files.items():
  268. target_file = tmp_path / exp_name
  269. target_file.write_text(exp_contents)
  270. upload_box.send_keys(str(target_file))
  271. time.sleep(0.2)
  272. # check that the selected files are displayed
  273. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  274. assert [Path(name).name for name in selected_files.text.split("\n")] == [
  275. Path(name).name for name in exp_files
  276. ]
  277. clear_button = driver.find_element(By.ID, f"clear_button{suffix}")
  278. assert clear_button
  279. clear_button.click()
  280. # check that the selected files are cleared
  281. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  282. assert selected_files.text == ""
  283. # TODO: drag and drop directory
  284. # https://gist.github.com/florentbr/349b1ab024ca9f3de56e6bf8af2ac69e
  285. @pytest.mark.asyncio
  286. async def test_cancel_upload(tmp_path, upload_file: AppHarness, driver: WebDriver):
  287. """Submit a large file upload and cancel it.
  288. Args:
  289. tmp_path: pytest tmp_path fixture
  290. upload_file: harness for UploadFile app.
  291. driver: WebDriver instance.
  292. """
  293. assert upload_file.app_instance is not None
  294. token_input = driver.find_element(By.ID, "token")
  295. assert token_input
  296. # wait for the backend connection to send the token
  297. token = upload_file.poll_for_value(token_input)
  298. assert token is not None
  299. state_name = upload_file.get_state_name("_upload_state")
  300. state_full_name = upload_file.get_full_state_name(["_upload_state"])
  301. substate_token = f"{token}_{state_full_name}"
  302. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[1]
  303. upload_button = driver.find_element(By.ID, f"upload_button_secondary")
  304. cancel_button = driver.find_element(By.ID, f"cancel_button_secondary")
  305. exp_name = "large.txt"
  306. target_file = tmp_path / exp_name
  307. with target_file.open("wb") as f:
  308. f.seek(1024 * 1024 * 256)
  309. f.write(b"0")
  310. upload_box.send_keys(str(target_file))
  311. upload_button.click()
  312. await asyncio.sleep(0.3)
  313. cancel_button.click()
  314. # look up the backend state and assert on progress
  315. state = await upload_file.get_state(substate_token)
  316. assert state.substates[state_name].progress_dicts
  317. file_data = state.substates[state_name]._file_data
  318. assert isinstance(file_data, dict)
  319. normalized_file_data = {Path(k).name: v for k, v in file_data.items()}
  320. assert Path(exp_name).name not in normalized_file_data
  321. target_file.unlink()