test_storage.py 10.0 KB


  1. import asyncio
  2. import copy
  3. from pathlib import Path
  4. import httpx
  5. import pytest
  6. from nicegui import app, background_tasks, context, core, ui
  7. from nicegui import storage as storage_module
  8. from nicegui.testing import Screen
  9. def test_browser_data_is_stored_in_the_browser(screen: Screen):
  10. @ui.page('/')
  11. def page():
  12. app.storage.browser['count'] = app.storage.browser.get('count', 0) + 1
  13. ui.label().bind_text_from(app.storage.browser, 'count')
  14. @app.get('/count')
  15. def count():
  16. return 'count = ' + str(app.storage.browser['count'])
  17. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  18. screen.open('/')
  19. screen.should_contain('1')
  20. screen.open('/')
  21. screen.should_contain('2')
  22. screen.open('/')
  23. screen.should_contain('3')
  24. screen.open('/count')
  25. screen.should_contain('count = 3') # also works with FastAPI endpoints
  26. def test_browser_storage_supports_asyncio(screen: Screen):
  27. @ui.page('/')
  28. async def page():
  29. app.storage.browser['count'] = app.storage.browser.get('count', 0) + 1
  30. await asyncio.sleep(0.5)
  31. ui.label(app.storage.browser['count'])
  32. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  33. screen.open('/')
  34. screen.switch_to(1)
  35. screen.open('/')
  36. screen.should_contain('2')
  37. screen.switch_to(0)
  38. screen.open('/')
  39. screen.should_contain('3')
  40. def test_browser_storage_modifications_after_page_load_are_forbidden(screen: Screen):
  41. @ui.page('/')
  42. async def page():
  43. await ui.context.client.connected()
  44. try:
  45. app.storage.browser['test'] = 'data'
  46. except TypeError as e:
  47. ui.label(str(e))
  48. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  49. screen.open('/')
  50. screen.should_contain('response to the browser has already been built')
  51. def test_user_storage_modifications(screen: Screen):
  52. @ui.page('/')
  53. async def page(delayed: bool = False):
  54. if delayed:
  55. await ui.context.client.connected()
  56. app.storage.user['count'] = app.storage.user.get('count', 0) + 1
  57. ui.label().bind_text_from(app.storage.user, 'count')
  58. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  59. screen.open('/')
  60. screen.should_contain('1')
  61. screen.open('/?delayed=True')
  62. screen.should_contain('2')
  63. screen.open('/')
  64. screen.should_contain('3')
  65. async def test_access_user_storage_from_fastapi(screen: Screen):
  66. @app.get('/api')
  67. def api():
  68. app.storage.user['msg'] = 'yes'
  69. return 'OK'
  70. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  71. screen.open('/')
  72. async with httpx.AsyncClient() as http_client:
  73. response = await http_client.get(f'http://localhost:{Screen.PORT}/api')
  74. assert response.status_code == 200
  75. assert response.text == '"OK"'
  76. await asyncio.sleep(0.5) # wait for storage to be written
  77. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text(encoding='utf-8') == '{"msg":"yes"}'
  78. def test_access_user_storage_on_interaction(screen: Screen):
  79. @ui.page('/')
  80. async def page():
  81. if 'test_switch' not in app.storage.user:
  82. app.storage.user['test_switch'] = False
  83. ui.switch('switch').bind_value(app.storage.user, 'test_switch')
  84. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  85. screen.open('/')
  86. screen.click('switch')
  87. screen.wait(0.5)
  88. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text(encoding='utf-8') == '{"test_switch":true}'
  89. def test_access_user_storage_from_button_click_handler(screen: Screen):
  90. @ui.page('/')
  91. async def page():
  92. ui.button('test', on_click=app.storage.user.update(inner_function='works'))
  93. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  94. screen.open('/')
  95. screen.click('test')
  96. screen.wait(1)
  97. assert \
  98. next(Path('.nicegui').glob('storage-user-*.json')).read_text(encoding='utf-8') == '{"inner_function":"works"}'
  99. async def test_access_user_storage_from_background_task(screen: Screen):
  100. @ui.page('/')
  101. def page():
  102. async def subtask():
  103. await asyncio.sleep(0.1)
  104. app.storage.user['subtask'] = 'works'
  105. background_tasks.create(subtask())
  106. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  107. screen.open('/')
  108. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text(encoding='utf-8') == '{"subtask":"works"}'
  109. def test_user_and_general_storage_is_persisted(screen: Screen):
  110. @ui.page('/')
  111. def page():
  112. app.storage.user['count'] = app.storage.user.get('count', 0) + 1
  113. app.storage.general['count'] = app.storage.general.get('count', 0) + 1
  114. ui.label(f'user: {app.storage.user["count"]}')
  115. ui.label(f'general: {app.storage.general["count"]}')
  116. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  117. screen.open('/')
  118. screen.open('/')
  119. screen.open('/')
  120. screen.should_contain('user: 3')
  121. screen.should_contain('general: 3')
  122. screen.selenium.delete_all_cookies()
  123. screen.open('/')
  124. screen.should_contain('user: 1')
  125. screen.should_contain('general: 4')
  126. def test_rapid_storage(screen: Screen):
  127. # https://github.com/zauberzeug/nicegui/issues/1099
  128. ui.button('test', on_click=lambda: (
  129. app.storage.general.update(one=1),
  130. app.storage.general.update(two=2),
  131. app.storage.general.update(three=3),
  132. ))
  133. screen.open('/')
  134. screen.click('test')
  135. screen.wait(0.5)
  136. assert Path('.nicegui', 'storage-general.json').read_text(encoding='utf-8') == '{"one":1,"two":2,"three":3}'
  137. def test_tab_storage_is_local(screen: Screen):
  138. @ui.page('/')
  139. async def page():
  140. await context.client.connected()
  141. app.storage.tab['count'] = app.storage.tab.get('count', 0) + 1
  142. ui.label().bind_text_from(app.storage.tab, 'count')
  143. screen.open('/')
  144. screen.should_contain('1')
  145. screen.open('/')
  146. screen.should_contain('2')
  147. screen.switch_to(1)
  148. screen.open('/')
  149. screen.should_contain('1')
  150. screen.switch_to(0)
  151. screen.open('/')
  152. screen.should_contain('3')
  153. def test_tab_storage_is_auto_removed(screen: Screen):
  154. storage_module.PURGE_INTERVAL = 0.1
  155. app.storage.max_tab_storage_age = 0.5
  156. @ui.page('/')
  157. async def page():
  158. await context.client.connected()
  159. app.storage.tab['count'] = app.storage.tab.get('count', 0) + 1
  160. ui.label().bind_text_from(app.storage.tab, 'count')
  161. screen.open('/')
  162. screen.should_contain('1')
  163. screen.open('/')
  164. screen.should_contain('2')
  165. screen.wait(1)
  166. screen.open('/')
  167. screen.should_contain('1')
  168. def test_clear_tab_storage(screen: Screen):
  169. storage_module.PURGE_INTERVAL = 60
  170. @ui.page('/')
  171. async def page():
  172. await context.client.connected()
  173. app.storage.tab['test'] = '123'
  174. ui.button('clear', on_click=app.storage.clear)
  175. screen.open('/')
  176. screen.should_contain('clear')
  177. tab_storages = app.storage._tabs # pylint: disable=protected-access
  178. assert len(tab_storages) == 1
  179. assert next(iter(tab_storages.values())) == {'test': '123'}
  180. screen.click('clear')
  181. screen.wait(0.5)
  182. assert not tab_storages
  183. def test_client_storage(screen: Screen):
  184. def increment():
  185. app.storage.client['counter'] = app.storage.client['counter'] + 1
  186. @ui.page('/')
  187. def page():
  188. app.storage.client['counter'] = 123
  189. ui.button('Increment').on_click(increment)
  190. ui.label().bind_text(app.storage.client, 'counter')
  191. screen.open('/')
  192. screen.should_contain('123')
  193. screen.click('Increment')
  194. screen.wait_for('124')
  195. screen.switch_to(1)
  196. screen.open('/')
  197. screen.should_contain('123')
  198. screen.switch_to(0)
  199. screen.should_contain('124')
  200. def test_clear_client_storage(screen: Screen):
  201. with pytest.raises(RuntimeError): # no context (auto index)
  202. app.storage.client.clear()
  203. @ui.page('/')
  204. def page():
  205. app.storage.client['counter'] = 123
  206. app.storage.client.clear()
  207. assert app.storage.client == {}
  208. screen.open('/')
  209. def test_deepcopy(screen: Screen):
  210. # https://github.com/zauberzeug/nicegui/issues/3023
  211. @ui.page('/')
  212. def page():
  213. app.storage.general['a'] = {'b': 0}
  214. copy.deepcopy(app.storage.general['a'])
  215. ui.label('Loaded')
  216. screen.open('/')
  217. screen.should_contain('Loaded')
  218. screen.wait(0.5)
  219. assert Path('.nicegui', 'storage-general.json').read_text(encoding='utf-8') == '{"a":{"b":0}}'
  220. def test_missing_storage_secret(screen: Screen):
  221. @ui.page('/')
  222. def page():
  223. ui.label(app.storage.user.get('message', 'no message'))
  224. core.app.user_middleware.clear() # remove the session middlewares added by prepare_simulation by default
  225. screen.open('/')
  226. screen.assert_py_logger('ERROR', 'app.storage.user needs a storage_secret passed in ui.run()')
  227. def test_storage_access_in_on_connect(screen: Screen):
  228. @ui.page('/')
  229. def root():
  230. app.storage.user['value'] = 'Test'
  231. app.on_connect(lambda: ui.label(app.storage.user.get('value')))
  232. screen.ui_run_kwargs['storage_secret'] = 'secret'
  233. screen.open('/')
  234. screen.should_contain('Test')
  235. def test_storage_access_in_binding_function(screen: Screen):
  236. model = {'name': 'John'}
  237. @ui.page('/')
  238. def index():
  239. def f(v):
  240. return v + app.storage.user.get('surname', '')
  241. ui.label().bind_text_from(model, 'name', backward=f)
  242. screen.ui_run_kwargs['storage_secret'] = 'secret'
  243. screen.open('/')
  244. screen.assert_py_logger('ERROR', 'app.storage.user can only be used within a UI context')
  245. def test_client_storage_holds_non_serializable_objects(screen: Screen):
  246. @ui.page('/')
  247. def page():
  248. ui.button('Update storage', on_click=lambda: app.storage.client.update(x=len))
  249. screen.open('/')
  250. screen.click('Update storage')
  251. screen.wait(0.5)
  252. def test_tab_storage_holds_non_serializable_objects(screen: Screen):
  253. @ui.page('/')
  254. def page():
  255. ui.button('Update storage', on_click=lambda: app.storage.tab.update(x=len))
  256. screen.open('/')
  257. screen.click('Update storage')
  258. screen.wait(0.5)