test_upload.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. """Integration tests for file upload."""
  2. from __future__ import annotations
  3. import asyncio
  4. import time
  5. from typing import Generator
  6. import pytest
  7. from selenium.webdriver.common.by import By
  8. from reflex.testing import AppHarness, WebDriver
  9. def UploadFile():
  10. """App for testing dynamic routes."""
  11. from typing import Dict, List
  12. import reflex as rx
  13. class UploadState(rx.State):
  14. _file_data: Dict[str, str] = {}
  15. event_order: List[str] = []
  16. progress_dicts: List[dict] = []
  17. async def handle_upload(self, files: List[rx.UploadFile]):
  18. for file in files:
  19. upload_data = await file.read()
  20. self._file_data[file.filename or ""] = upload_data.decode("utf-8")
  21. async def handle_upload_secondary(self, files: List[rx.UploadFile]):
  22. for file in files:
  23. upload_data = await file.read()
  24. self._file_data[file.filename or ""] = upload_data.decode("utf-8")
  25. yield UploadState.chain_event
  26. def upload_progress(self, progress):
  27. assert progress
  28. self.event_order.append("upload_progress")
  29. self.progress_dicts.append(progress)
  30. def chain_event(self):
  31. self.event_order.append("chain_event")
  32. def index():
  33. return rx.vstack(
  34. rx.chakra.input(
  35. value=UploadState.router.session.client_token,
  36. is_read_only=True,
  37. id="token",
  38. ),
  39. rx.heading("Default Upload"),
  40. rx.upload.root(
  41. rx.vstack(
  42. rx.button("Select File"),
  43. rx.text("Drag and drop files here or click to select files"),
  44. ),
  45. ),
  46. rx.button(
  47. "Upload",
  48. on_click=lambda: UploadState.handle_upload(rx.upload_files()), # type: ignore
  49. id="upload_button",
  50. ),
  51. rx.box(
  52. rx.foreach(
  53. rx.selected_files,
  54. lambda f: rx.text(f, as_="p"),
  55. ),
  56. id="selected_files",
  57. ),
  58. rx.button(
  59. "Clear",
  60. on_click=rx.clear_selected_files,
  61. id="clear_button",
  62. ),
  63. rx.heading("Secondary Upload"),
  64. rx.upload.root(
  65. rx.vstack(
  66. rx.button("Select File"),
  67. rx.text("Drag and drop files here or click to select files"),
  68. ),
  69. id="secondary",
  70. ),
  71. rx.button(
  72. "Upload",
  73. on_click=UploadState.handle_upload_secondary( # type: ignore
  74. rx.upload_files(
  75. upload_id="secondary",
  76. on_upload_progress=UploadState.upload_progress,
  77. ),
  78. ),
  79. id="upload_button_secondary",
  80. ),
  81. rx.box(
  82. rx.foreach(
  83. rx.selected_files("secondary"),
  84. lambda f: rx.text(f, as_="p"),
  85. ),
  86. id="selected_files_secondary",
  87. ),
  88. rx.button(
  89. "Clear",
  90. on_click=rx.clear_selected_files("secondary"),
  91. id="clear_button_secondary",
  92. ),
  93. rx.vstack(
  94. rx.foreach(
  95. UploadState.progress_dicts, # type: ignore
  96. lambda d: rx.text(d.to_string()),
  97. )
  98. ),
  99. rx.button(
  100. "Cancel",
  101. on_click=rx.cancel_upload("secondary"),
  102. id="cancel_button_secondary",
  103. ),
  104. )
  105. app = rx.App(state=rx.State)
  106. app.add_page(index)
  107. @pytest.fixture(scope="module")
  108. def upload_file(tmp_path_factory) -> Generator[AppHarness, None, None]:
  109. """Start UploadFile app at tmp_path via AppHarness.
  110. Args:
  111. tmp_path_factory: pytest tmp_path_factory fixture
  112. Yields:
  113. running AppHarness instance
  114. """
  115. with AppHarness.create(
  116. root=tmp_path_factory.mktemp("upload_file"),
  117. app_source=UploadFile, # type: ignore
  118. ) as harness:
  119. yield harness
  120. @pytest.fixture
  121. def driver(upload_file: AppHarness):
  122. """Get an instance of the browser open to the upload_file app.
  123. Args:
  124. upload_file: harness for DynamicRoute app
  125. Yields:
  126. WebDriver instance.
  127. """
  128. assert upload_file.app_instance is not None, "app is not running"
  129. driver = upload_file.frontend()
  130. try:
  131. yield driver
  132. finally:
  133. driver.quit()
  134. @pytest.mark.parametrize("secondary", [False, True])
  135. @pytest.mark.asyncio
  136. async def test_upload_file(
  137. tmp_path, upload_file: AppHarness, driver: WebDriver, secondary: bool
  138. ):
  139. """Submit a file upload and check that it arrived on the backend.
  140. Args:
  141. tmp_path: pytest tmp_path fixture
  142. upload_file: harness for UploadFile app.
  143. driver: WebDriver instance.
  144. secondary: whether to use the secondary upload form
  145. """
  146. assert upload_file.app_instance is not None
  147. token_input = driver.find_element(By.ID, "token")
  148. assert token_input
  149. # wait for the backend connection to send the token
  150. token = upload_file.poll_for_value(token_input)
  151. assert token is not None
  152. substate_token = f"{token}_state.upload_state"
  153. suffix = "_secondary" if secondary else ""
  154. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[
  155. 1 if secondary else 0
  156. ]
  157. assert upload_box
  158. upload_button = driver.find_element(By.ID, f"upload_button{suffix}")
  159. assert upload_button
  160. exp_name = "test.txt"
  161. exp_contents = "test file contents!"
  162. target_file = tmp_path / exp_name
  163. target_file.write_text(exp_contents)
  164. upload_box.send_keys(str(target_file))
  165. upload_button.click()
  166. # look up the backend state and assert on uploaded contents
  167. async def get_file_data():
  168. return (
  169. (await upload_file.get_state(substate_token))
  170. .substates["upload_state"]
  171. ._file_data
  172. )
  173. file_data = await AppHarness._poll_for_async(get_file_data)
  174. assert isinstance(file_data, dict)
  175. assert file_data[exp_name] == exp_contents
  176. # check that the selected files are displayed
  177. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  178. assert selected_files.text == exp_name
  179. state = await upload_file.get_state(substate_token)
  180. if secondary:
  181. # only the secondary form tracks progress and chain events
  182. assert state.substates["upload_state"].event_order.count("upload_progress") == 1
  183. assert state.substates["upload_state"].event_order.count("chain_event") == 1
  184. @pytest.mark.asyncio
  185. async def test_upload_file_multiple(tmp_path, upload_file: AppHarness, driver):
  186. """Submit several file uploads and check that they arrived on the backend.
  187. Args:
  188. tmp_path: pytest tmp_path fixture
  189. upload_file: harness for UploadFile app.
  190. driver: WebDriver instance.
  191. """
  192. assert upload_file.app_instance is not None
  193. token_input = driver.find_element(By.ID, "token")
  194. assert token_input
  195. # wait for the backend connection to send the token
  196. token = upload_file.poll_for_value(token_input)
  197. assert token is not None
  198. substate_token = f"{token}_state.upload_state"
  199. upload_box = driver.find_element(By.XPATH, "//input[@type='file']")
  200. assert upload_box
  201. upload_button = driver.find_element(By.ID, "upload_button")
  202. assert upload_button
  203. exp_files = {
  204. "test1.txt": "test file contents!",
  205. "test2.txt": "this is test file number 2!",
  206. "reflex.txt": "reflex is awesome!",
  207. }
  208. for exp_name, exp_contents in exp_files.items():
  209. target_file = tmp_path / exp_name
  210. target_file.write_text(exp_contents)
  211. upload_box.send_keys(str(target_file))
  212. time.sleep(0.2)
  213. # check that the selected files are displayed
  214. selected_files = driver.find_element(By.ID, "selected_files")
  215. assert selected_files.text == "\n".join(exp_files)
  216. # do the upload
  217. upload_button.click()
  218. # look up the backend state and assert on uploaded contents
  219. async def get_file_data():
  220. return (
  221. (await upload_file.get_state(substate_token))
  222. .substates["upload_state"]
  223. ._file_data
  224. )
  225. file_data = await AppHarness._poll_for_async(get_file_data)
  226. assert isinstance(file_data, dict)
  227. for exp_name, exp_contents in exp_files.items():
  228. assert file_data[exp_name] == exp_contents
  229. @pytest.mark.parametrize("secondary", [False, True])
  230. def test_clear_files(
  231. tmp_path, upload_file: AppHarness, driver: WebDriver, secondary: bool
  232. ):
  233. """Select then clear several file uploads and check that they are cleared.
  234. Args:
  235. tmp_path: pytest tmp_path fixture
  236. upload_file: harness for UploadFile app.
  237. driver: WebDriver instance.
  238. secondary: whether to use the secondary upload form.
  239. """
  240. assert upload_file.app_instance is not None
  241. token_input = driver.find_element(By.ID, "token")
  242. assert token_input
  243. # wait for the backend connection to send the token
  244. token = upload_file.poll_for_value(token_input)
  245. assert token is not None
  246. suffix = "_secondary" if secondary else ""
  247. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[
  248. 1 if secondary else 0
  249. ]
  250. assert upload_box
  251. upload_button = driver.find_element(By.ID, f"upload_button{suffix}")
  252. assert upload_button
  253. exp_files = {
  254. "test1.txt": "test file contents!",
  255. "test2.txt": "this is test file number 2!",
  256. "reflex.txt": "reflex is awesome!",
  257. }
  258. for exp_name, exp_contents in exp_files.items():
  259. target_file = tmp_path / exp_name
  260. target_file.write_text(exp_contents)
  261. upload_box.send_keys(str(target_file))
  262. time.sleep(0.2)
  263. # check that the selected files are displayed
  264. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  265. assert selected_files.text == "\n".join(exp_files)
  266. clear_button = driver.find_element(By.ID, f"clear_button{suffix}")
  267. assert clear_button
  268. clear_button.click()
  269. # check that the selected files are cleared
  270. selected_files = driver.find_element(By.ID, f"selected_files{suffix}")
  271. assert selected_files.text == ""
  272. # TODO: drag and drop directory
  273. # https://gist.github.com/florentbr/349b1ab024ca9f3de56e6bf8af2ac69e
  274. @pytest.mark.asyncio
  275. async def test_cancel_upload(tmp_path, upload_file: AppHarness, driver: WebDriver):
  276. """Submit a large file upload and cancel it.
  277. Args:
  278. tmp_path: pytest tmp_path fixture
  279. upload_file: harness for UploadFile app.
  280. driver: WebDriver instance.
  281. """
  282. assert upload_file.app_instance is not None
  283. token_input = driver.find_element(By.ID, "token")
  284. assert token_input
  285. # wait for the backend connection to send the token
  286. token = upload_file.poll_for_value(token_input)
  287. assert token is not None
  288. substate_token = f"{token}_state.upload_state"
  289. upload_box = driver.find_elements(By.XPATH, "//input[@type='file']")[1]
  290. upload_button = driver.find_element(By.ID, f"upload_button_secondary")
  291. cancel_button = driver.find_element(By.ID, f"cancel_button_secondary")
  292. exp_name = "large.txt"
  293. target_file = tmp_path / exp_name
  294. with target_file.open("wb") as f:
  295. f.seek(1024 * 1024 * 256)
  296. f.write(b"0")
  297. upload_box.send_keys(str(target_file))
  298. upload_button.click()
  299. await asyncio.sleep(0.3)
  300. cancel_button.click()
  301. # look up the backend state and assert on progress
  302. state = await upload_file.get_state(substate_token)
  303. assert state.substates["upload_state"].progress_dicts
  304. assert exp_name not in state.substates["upload_state"]._file_data
  305. target_file.unlink()