1
0

test_storage.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. import asyncio
  2. import copy
  3. import time
  4. from pathlib import Path
  5. import httpx
  6. import pytest
  7. from nicegui import app, background_tasks, context, core, ui
  8. from nicegui import storage as storage_module
  9. from nicegui.testing import Screen
  10. def test_browser_data_is_stored_in_the_browser(screen: Screen):
  11. @ui.page('/')
  12. def page():
  13. app.storage.browser['count'] = app.storage.browser.get('count', 0) + 1
  14. ui.label().bind_text_from(app.storage.browser, 'count')
  15. @app.get('/count')
  16. def count():
  17. return 'count = ' + str(app.storage.browser['count'])
  18. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  19. screen.open('/')
  20. screen.should_contain('1')
  21. screen.open('/')
  22. screen.should_contain('2')
  23. screen.open('/')
  24. screen.should_contain('3')
  25. screen.open('/count')
  26. screen.should_contain('count = 3') # also works with FastAPI endpoints
  27. def test_browser_storage_supports_asyncio(screen: Screen):
  28. @ui.page('/')
  29. async def page():
  30. app.storage.browser['count'] = app.storage.browser.get('count', 0) + 1
  31. await asyncio.sleep(0.5)
  32. ui.label(app.storage.browser['count'])
  33. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  34. screen.open('/')
  35. screen.switch_to(1)
  36. screen.open('/')
  37. screen.should_contain('2')
  38. screen.switch_to(0)
  39. screen.open('/')
  40. screen.should_contain('3')
  41. def test_browser_storage_modifications_after_page_load_are_forbidden(screen: Screen):
  42. @ui.page('/')
  43. async def page():
  44. await ui.context.client.connected()
  45. try:
  46. app.storage.browser['test'] = 'data'
  47. except TypeError as e:
  48. ui.label(str(e))
  49. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  50. screen.open('/')
  51. screen.should_contain('response to the browser has already been built')
  52. def test_user_storage_modifications(screen: Screen):
  53. @ui.page('/')
  54. async def page(delayed: bool = False):
  55. if delayed:
  56. await ui.context.client.connected()
  57. app.storage.user['count'] = app.storage.user.get('count', 0) + 1
  58. ui.label().bind_text_from(app.storage.user, 'count')
  59. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  60. screen.open('/')
  61. screen.should_contain('1')
  62. screen.open('/?delayed=True')
  63. screen.should_contain('2')
  64. screen.open('/')
  65. screen.should_contain('3')
  66. def test_access_user_storage_from_fastapi(screen: Screen):
  67. @app.get('/api')
  68. def api():
  69. app.storage.user['msg'] = 'yes'
  70. return 'OK'
  71. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  72. screen.open('/')
  73. with httpx.Client() as http_client:
  74. response = http_client.get(f'http://localhost:{Screen.PORT}/api')
  75. assert response.status_code == 200
  76. assert response.text == '"OK"'
  77. time.sleep(0.5) # wait for storage to be written
  78. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text(encoding='utf-8') == '{"msg":"yes"}'
  79. def test_access_user_storage_on_interaction(screen: Screen):
  80. @ui.page('/')
  81. async def page():
  82. if 'test_switch' not in app.storage.user:
  83. app.storage.user['test_switch'] = False
  84. ui.switch('switch').bind_value(app.storage.user, 'test_switch')
  85. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  86. screen.open('/')
  87. screen.click('switch')
  88. screen.wait(0.5)
  89. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text(encoding='utf-8') == '{"test_switch":true}'
  90. def test_access_user_storage_from_button_click_handler(screen: Screen):
  91. @ui.page('/')
  92. async def page():
  93. ui.button('test', on_click=app.storage.user.update(inner_function='works'))
  94. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  95. screen.open('/')
  96. screen.click('test')
  97. screen.wait(1)
  98. assert \
  99. next(Path('.nicegui').glob('storage-user-*.json')).read_text(encoding='utf-8') == '{"inner_function":"works"}'
  100. def test_access_user_storage_from_background_task(screen: Screen):
  101. @ui.page('/')
  102. def page():
  103. async def subtask():
  104. await asyncio.sleep(0.1)
  105. app.storage.user['subtask'] = 'works'
  106. background_tasks.create(subtask())
  107. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  108. screen.open('/')
  109. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text(encoding='utf-8') == '{"subtask":"works"}'
  110. def test_user_and_general_storage_is_persisted(screen: Screen):
  111. @ui.page('/')
  112. def page():
  113. app.storage.user['count'] = app.storage.user.get('count', 0) + 1
  114. app.storage.general['count'] = app.storage.general.get('count', 0) + 1
  115. ui.label(f'user: {app.storage.user["count"]}')
  116. ui.label(f'general: {app.storage.general["count"]}')
  117. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  118. screen.open('/')
  119. screen.open('/')
  120. screen.open('/')
  121. screen.should_contain('user: 3')
  122. screen.should_contain('general: 3')
  123. screen.selenium.delete_all_cookies()
  124. screen.open('/')
  125. screen.should_contain('user: 1')
  126. screen.should_contain('general: 4')
  127. def test_rapid_storage(screen: Screen):
  128. # https://github.com/zauberzeug/nicegui/issues/1099
  129. ui.button('test', on_click=lambda: (
  130. app.storage.general.update(one=1),
  131. app.storage.general.update(two=2),
  132. app.storage.general.update(three=3),
  133. ))
  134. screen.open('/')
  135. screen.click('test')
  136. screen.wait(0.5)
  137. assert Path('.nicegui', 'storage-general.json').read_text(encoding='utf-8') == '{"one":1,"two":2,"three":3}'
  138. def test_tab_storage_is_local(screen: Screen):
  139. @ui.page('/')
  140. async def page():
  141. await context.client.connected()
  142. app.storage.tab['count'] = app.storage.tab.get('count', 0) + 1
  143. ui.label().bind_text_from(app.storage.tab, 'count')
  144. screen.open('/')
  145. screen.should_contain('1')
  146. screen.open('/')
  147. screen.should_contain('2')
  148. screen.switch_to(1)
  149. screen.open('/')
  150. screen.should_contain('1')
  151. screen.switch_to(0)
  152. screen.open('/')
  153. screen.should_contain('3')
  154. def test_tab_storage_is_auto_removed(screen: Screen):
  155. storage_module.PURGE_INTERVAL = 0.1
  156. app.storage.max_tab_storage_age = 0.5
  157. @ui.page('/')
  158. async def page():
  159. await context.client.connected()
  160. app.storage.tab['count'] = app.storage.tab.get('count', 0) + 1
  161. ui.label().bind_text_from(app.storage.tab, 'count')
  162. screen.open('/')
  163. screen.should_contain('1')
  164. screen.open('/')
  165. screen.should_contain('2')
  166. screen.wait(1)
  167. screen.open('/')
  168. screen.should_contain('1')
  169. def test_clear_tab_storage(screen: Screen):
  170. storage_module.PURGE_INTERVAL = 60
  171. @ui.page('/')
  172. async def page():
  173. await context.client.connected()
  174. app.storage.tab['test'] = '123'
  175. ui.button('clear', on_click=app.storage.clear)
  176. screen.open('/')
  177. screen.should_contain('clear')
  178. tab_storages = app.storage._tabs # pylint: disable=protected-access
  179. assert len(tab_storages) == 1
  180. assert next(iter(tab_storages.values())) == {'test': '123'}
  181. screen.click('clear')
  182. screen.wait(0.5)
  183. assert not tab_storages
  184. def test_client_storage(screen: Screen):
  185. def increment():
  186. app.storage.client['counter'] = app.storage.client['counter'] + 1
  187. @ui.page('/')
  188. def page():
  189. app.storage.client['counter'] = 123
  190. ui.button('Increment').on_click(increment)
  191. ui.label().bind_text(app.storage.client, 'counter')
  192. screen.open('/')
  193. screen.should_contain('123')
  194. screen.click('Increment')
  195. screen.wait_for('124')
  196. screen.switch_to(1)
  197. screen.open('/')
  198. screen.should_contain('123')
  199. screen.switch_to(0)
  200. screen.should_contain('124')
  201. def test_clear_client_storage(screen: Screen):
  202. with pytest.raises(RuntimeError): # no context (auto index)
  203. app.storage.client.clear()
  204. @ui.page('/')
  205. def page():
  206. app.storage.client['counter'] = 123
  207. app.storage.client.clear()
  208. assert app.storage.client == {}
  209. screen.open('/')
  210. def test_deepcopy(screen: Screen):
  211. # https://github.com/zauberzeug/nicegui/issues/3023
  212. @ui.page('/')
  213. def page():
  214. app.storage.general['a'] = {'b': 0}
  215. copy.deepcopy(app.storage.general['a'])
  216. ui.label('Loaded')
  217. screen.open('/')
  218. screen.should_contain('Loaded')
  219. screen.wait(0.5)
  220. assert Path('.nicegui', 'storage-general.json').read_text(encoding='utf-8') == '{"a":{"b":0}}'
  221. def test_missing_storage_secret(screen: Screen):
  222. @ui.page('/')
  223. def page():
  224. ui.label(app.storage.user.get('message', 'no message'))
  225. core.app.user_middleware.clear() # remove the session middlewares added by prepare_simulation by default
  226. screen.open('/')
  227. screen.assert_py_logger('ERROR', 'app.storage.user needs a storage_secret passed in ui.run()')
  228. def test_storage_access_in_on_connect(screen: Screen):
  229. @ui.page('/')
  230. def root():
  231. app.storage.user['value'] = 'Test'
  232. app.on_connect(lambda: ui.label(app.storage.user.get('value')))
  233. screen.ui_run_kwargs['storage_secret'] = 'secret'
  234. screen.open('/')
  235. screen.should_contain('Test')
  236. def test_storage_access_in_binding_function(screen: Screen):
  237. model = {'name': 'John'}
  238. @ui.page('/')
  239. def index():
  240. def f(v):
  241. return v + app.storage.user.get('surname', '')
  242. ui.label().bind_text_from(model, 'name', backward=f)
  243. screen.ui_run_kwargs['storage_secret'] = 'secret'
  244. screen.open('/')
  245. screen.assert_py_logger('ERROR', 'app.storage.user can only be used within a UI context')
  246. def test_client_storage_holds_non_serializable_objects(screen: Screen):
  247. @ui.page('/')
  248. def page():
  249. ui.button('Update storage', on_click=lambda: app.storage.client.update(x=len))
  250. screen.open('/')
  251. screen.click('Update storage')
  252. screen.wait(0.5)
  253. def test_tab_storage_holds_non_serializable_objects(screen: Screen):
  254. @ui.page('/')
  255. def page():
  256. ui.button('Update storage', on_click=lambda: app.storage.tab.update(x=len))
  257. screen.open('/')
  258. screen.click('Update storage')
  259. screen.wait(0.5)