test_upload.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. """Integration tests for file upload."""
  2. from __future__ import annotations
  3. import asyncio
  4. import time
  5. from typing import Generator
  6. import pytest
  7. from selenium.webdriver.common.by import By
  8. from reflex.testing import AppHarness, WebDriver
  9. def UploadFile():
  10. """App for testing dynamic routes."""
  11. import reflex as rx
  12. class UploadState(rx.State):
  13. _file_data: dict[str, str] = {}
  14. event_order: list[str] = []
  15. progress_dicts: list[dict] = []
  16. async def handle_upload(self, files: list[rx.UploadFile]):
  17. for file in files:
  18. upload_data = await file.read()
  19. self._file_data[file.filename or ""] = upload_data.decode("utf-8")
  20. async def handle_upload_secondary(self, files: list[rx.UploadFile]):
  21. for file in files:
  22. upload_data = await file.read()
  23. self._file_data[file.filename or ""] = upload_data.decode("utf-8")
  24. yield UploadState.chain_event
  25. def upload_progress(self, progress):
  26. assert progress
  27. self.event_order.append("upload_progress")
  28. self.progress_dicts.append(progress)
  29. def chain_event(self):
  30. self.event_order.append("chain_event")
  31. def index():
  32. return rx.vstack(
  33. rx.chakra.input(
  34. value=UploadState.router.session.client_token,
  35. is_read_only=True,
  36. id="token",
  37. ),
  38. rx.heading("Default Upload"),
  39. rx.upload(
  40. rx.vstack(
  41. rx.button("Select File"),
  42. rx.text("Drag and drop files here or click to select files"),
  43. ),
  44. ),
  45. rx.button(
  46. "Upload",
  47. on_click=lambda: UploadState.handle_upload(rx.upload_files()), # type: ignore
  48. id="upload_button",
  49. ),
  50. rx.box(
  51. rx.foreach(
  52. rx.selected_files,
  53. lambda f: rx.text(f, as_="p"),
  54. ),
  55. id="selected_files",
  56. ),
  57. rx.button(
  58. "Clear",
  59. on_click=rx.clear_selected_files,
  60. id="clear_button",
  61. ),
  62. rx.heading("Secondary Upload"),
  63. rx.upload(
  64. rx.vstack(
  65. rx.button("Select File"),
  66. rx.text("Drag and drop files here or click to select files"),
  67. ),
  68. id="secondary",
  69. ),
  70. rx.button(
  71. "Upload",
  72. on_click=UploadState.handle_upload_secondary( # type: ignore
  73. rx.upload_files(
  74. upload_id="secondary",
  75. on_upload_progress=UploadState.upload_progress,
  76. ),
  77. ),
  78. id="upload_button_secondary",
  79. ),
  80. rx.box(
  81. rx.foreach(
  82. rx.selected_files("secondary"),
  83. lambda f: rx.text(f, as_="p"),
  84. ),
  85. id="selected_files_secondary",
  86. ),
  87. rx.button(
  88. "Clear",
  89. on_click=rx.clear_selected_files("secondary"),
  90. id="clear_button_secondary",
  91. ),
  92. rx.vstack(
  93. rx.foreach(
  94. UploadState.progress_dicts, # type: ignore
  95. lambda d: rx.text(d.to_string()),
  96. )
  97. ),
  98. rx.button(
  99. "Cancel",
  100. on_click=rx.cancel_upload("secondary"),
  101. id="cancel_button_secondary",
  102. ),
  103. )
  104. app = rx.App(state=rx.State)
  105. app.add_page(index)
  106. @pytest.fixture(scope="session")
  107. def upload_file(tmp_path_factory) -> Generator[AppHarness, None, None]:
  108. """Start UploadFile app at tmp_path via AppHarness.
  109. Args:
  110. tmp_path_factory: pytest tmp_path_factory fixture
  111. Yields:
  112. running AppHarness instance
  113. """
  114. with AppHarness.create(
  115. root=tmp_path_factory.mktemp("upload_file"),
  116. app_source=UploadFile, # type: ignore
  117. ) as harness:
  118. yield harness
  119. @pytest.fixture
  120. def driver(upload_file: AppHarness):
  121. """Get an instance of the browser open to the upload_file app.
  122. Args:
  123. upload_file: harness for DynamicRoute app
  124. Yields:
  125. WebDriver instance.
  126. """
  127. assert upload_file.app_instance is not None, "app is not running"
  128. driver = upload_file.frontend()
  129. try:
  130. yield driver
  131. finally:
  132. driver.quit()
  133. @pytest.mark.parametrize("secondary", [False, True])
  134. @pytest.mark.asyncio
  135. async def test_upload_file(
  136. tmp_path, upload_file: AppHarness, driver: WebDriver, secondary: bool
  137. ):
  138. """Submit a file upload and check that it arrived on the backend.
  139. Args:
  140. tmp_path: pytest tmp_path fixture
  141. upload_file: harness for UploadFile app.
  142. driver: WebDriver instance.
  143. secondary: whether to use the secondary upload form
  144. """
  145. assert upload_file.app_instance is not None
  146. token_input = driver.find_element(By.ID, "token")
  147. assert token_input
  148. # wait for the backend connection to send the token
  149. token = upload_file.poll_for_value(token_input)
  150. assert token is not None
  151. substate_token = f"{token}_state.upload_state"
  152. suffix = "_secondary" if secondary else ""
  153. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[
  154. 1 if secondary else 0
  155. ]
  156. assert upload_box
  157. upload_button = driver.find_element(By.ID, f"upload_button{suffix}")
  158. assert upload_button
  159. exp_name = "test.txt"
  160. exp_contents = "test file contents!"
  161. target_file = tmp_path / exp_name
  162. target_file.write_text(exp_contents)
  163. upload_box.send_keys(str(target_file))
  164. upload_button.click()
  165. # look up the backend state and assert on uploaded contents
  166. async def get_file_data():
  167. return (
  168. (await upload_file.get_state(substate_token))
  169. .substates["upload_state"]
  170. ._file_data
  171. )
  172. file_data = await AppHarness._poll_for_async(get_file_data)
  173. assert isinstance(file_data, dict)
  174. assert file_data[exp_name] == exp_contents
  175. # check that the selected files are displayed
  176. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  177. assert selected_files.text == exp_name
  178. state = await upload_file.get_state(substate_token)
  179. if secondary:
  180. # only the secondary form tracks progress and chain events
  181. assert state.substates["upload_state"].event_order.count("upload_progress") == 1
  182. assert state.substates["upload_state"].event_order.count("chain_event") == 1
  183. @pytest.mark.asyncio
  184. async def test_upload_file_multiple(tmp_path, upload_file: AppHarness, driver):
  185. """Submit several file uploads and check that they arrived on the backend.
  186. Args:
  187. tmp_path: pytest tmp_path fixture
  188. upload_file: harness for UploadFile app.
  189. driver: WebDriver instance.
  190. """
  191. assert upload_file.app_instance is not None
  192. token_input = driver.find_element(By.ID, "token")
  193. assert token_input
  194. # wait for the backend connection to send the token
  195. token = upload_file.poll_for_value(token_input)
  196. assert token is not None
  197. substate_token = f"{token}_state.upload_state"
  198. upload_box = driver.find_element(By.XPATH, "//input[@type='file']")
  199. assert upload_box
  200. upload_button = driver.find_element(By.ID, "upload_button")
  201. assert upload_button
  202. exp_files = {
  203. "test1.txt": "test file contents!",
  204. "test2.txt": "this is test file number 2!",
  205. "reflex.txt": "reflex is awesome!",
  206. }
  207. for exp_name, exp_contents in exp_files.items():
  208. target_file = tmp_path / exp_name
  209. target_file.write_text(exp_contents)
  210. upload_box.send_keys(str(target_file))
  211. time.sleep(0.2)
  212. # check that the selected files are displayed
  213. selected_files = driver.find_element(By.ID, "selected_files")
  214. assert selected_files.text == "\n".join(exp_files)
  215. # do the upload
  216. upload_button.click()
  217. # look up the backend state and assert on uploaded contents
  218. async def get_file_data():
  219. return (
  220. (await upload_file.get_state(substate_token))
  221. .substates["upload_state"]
  222. ._file_data
  223. )
  224. file_data = await AppHarness._poll_for_async(get_file_data)
  225. assert isinstance(file_data, dict)
  226. for exp_name, exp_contents in exp_files.items():
  227. assert file_data[exp_name] == exp_contents
  228. @pytest.mark.parametrize("secondary", [False, True])
  229. def test_clear_files(
  230. tmp_path, upload_file: AppHarness, driver: WebDriver, secondary: bool
  231. ):
  232. """Select then clear several file uploads and check that they are cleared.
  233. Args:
  234. tmp_path: pytest tmp_path fixture
  235. upload_file: harness for UploadFile app.
  236. driver: WebDriver instance.
  237. secondary: whether to use the secondary upload form.
  238. """
  239. assert upload_file.app_instance is not None
  240. token_input = driver.find_element(By.ID, "token")
  241. assert token_input
  242. # wait for the backend connection to send the token
  243. token = upload_file.poll_for_value(token_input)
  244. assert token is not None
  245. suffix = "_secondary" if secondary else ""
  246. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[
  247. 1 if secondary else 0
  248. ]
  249. assert upload_box
  250. upload_button = driver.find_element(By.ID, f"upload_button{suffix}")
  251. assert upload_button
  252. exp_files = {
  253. "test1.txt": "test file contents!",
  254. "test2.txt": "this is test file number 2!",
  255. "reflex.txt": "reflex is awesome!",
  256. }
  257. for exp_name, exp_contents in exp_files.items():
  258. target_file = tmp_path / exp_name
  259. target_file.write_text(exp_contents)
  260. upload_box.send_keys(str(target_file))
  261. time.sleep(0.2)
  262. # check that the selected files are displayed
  263. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  264. assert selected_files.text == "\n".join(exp_files)
  265. clear_button = driver.find_element(By.ID, f"clear_button{suffix}")
  266. assert clear_button
  267. clear_button.click()
  268. # check that the selected files are cleared
  269. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  270. assert selected_files.text == ""
  271. # TODO: drag and drop directory
  272. # https://gist.github.com/florentbr/349b1ab024ca9f3de56e6bf8af2ac69e
  273. @pytest.mark.asyncio
  274. async def test_cancel_upload(tmp_path, upload_file: AppHarness, driver: WebDriver):
  275. """Submit a large file upload and cancel it.
  276. Args:
  277. tmp_path: pytest tmp_path fixture
  278. upload_file: harness for UploadFile app.
  279. driver: WebDriver instance.
  280. """
  281. assert upload_file.app_instance is not None
  282. token_input = driver.find_element(By.ID, "token")
  283. assert token_input
  284. # wait for the backend connection to send the token
  285. token = upload_file.poll_for_value(token_input)
  286. assert token is not None
  287. substate_token = f"{token}_state.upload_state"
  288. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[1]
  289. upload_button = driver.find_element(By.ID, f"upload_button_secondary")
  290. cancel_button = driver.find_element(By.ID, f"cancel_button_secondary")
  291. exp_name = "large.txt"
  292. target_file = tmp_path / exp_name
  293. with target_file.open("wb") as f:
  294. f.seek(1024 * 1024 * 256)
  295. f.write(b"0")
  296. upload_box.send_keys(str(target_file))
  297. upload_button.click()
  298. await asyncio.sleep(0.3)
  299. cancel_button.click()
  300. # look up the backend state and assert on progress
  301. state = await upload_file.get_state(substate_token)
  302. assert state.substates["upload_state"].progress_dicts
  303. assert exp_name not in state.substates["upload_state"]._file_data
  304. target_file.unlink()