test_storage.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. import asyncio
  2. import copy
  3. from pathlib import Path
  4. import httpx
  5. import pytest
  6. from nicegui import app, background_tasks, context, core, ui
  7. from nicegui import storage as storage_module
  8. from nicegui.testing import Screen
  9. def test_browser_data_is_stored_in_the_browser(screen: Screen):
  10. @ui.page('/')
  11. def page():
  12. app.storage.browser['count'] = app.storage.browser.get('count', 0) + 1
  13. ui.label().bind_text_from(app.storage.browser, 'count')
  14. @app.get('/count')
  15. def count():
  16. return 'count = ' + str(app.storage.browser['count'])
  17. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  18. screen.open('/')
  19. screen.should_contain('1')
  20. screen.open('/')
  21. screen.should_contain('2')
  22. screen.open('/')
  23. screen.should_contain('3')
  24. screen.open('/count')
  25. screen.should_contain('count = 3') # also works with FastAPI endpoints
  26. def test_browser_storage_supports_asyncio(screen: Screen):
  27. @ui.page('/')
  28. async def page():
  29. app.storage.browser['count'] = app.storage.browser.get('count', 0) + 1
  30. await asyncio.sleep(0.5)
  31. ui.label(app.storage.browser['count'])
  32. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  33. screen.open('/')
  34. screen.switch_to(1)
  35. screen.open('/')
  36. screen.should_contain('2')
  37. screen.switch_to(0)
  38. screen.open('/')
  39. screen.should_contain('3')
  40. def test_browser_storage_modifications_after_page_load_are_forbidden(screen: Screen):
  41. @ui.page('/')
  42. async def page():
  43. await ui.context.client.connected()
  44. try:
  45. app.storage.browser['test'] = 'data'
  46. except TypeError as e:
  47. ui.label(str(e))
  48. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  49. screen.open('/')
  50. screen.should_contain('response to the browser has already been built')
  51. def test_user_storage_modifications(screen: Screen):
  52. @ui.page('/')
  53. async def page(delayed: bool = False):
  54. if delayed:
  55. await ui.context.client.connected()
  56. app.storage.user['count'] = app.storage.user.get('count', 0) + 1
  57. ui.label().bind_text_from(app.storage.user, 'count')
  58. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  59. screen.open('/')
  60. screen.should_contain('1')
  61. screen.open('/?delayed=True')
  62. screen.should_contain('2')
  63. screen.open('/')
  64. screen.should_contain('3')
  65. async def test_access_user_storage_from_fastapi(screen: Screen):
  66. @app.get('/api')
  67. def api():
  68. app.storage.user['msg'] = 'yes'
  69. return 'OK'
  70. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  71. screen.open('/')
  72. async with httpx.AsyncClient() as http_client:
  73. response = await http_client.get(f'http://localhost:{Screen.PORT}/api')
  74. assert response.status_code == 200
  75. assert response.text == '"OK"'
  76. await asyncio.sleep(0.5) # wait for storage to be written
  77. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text('utf-8') == '{"msg":"yes"}'
  78. def test_access_user_storage_on_interaction(screen: Screen):
  79. @ui.page('/')
  80. async def page():
  81. if 'test_switch' not in app.storage.user:
  82. app.storage.user['test_switch'] = False
  83. ui.switch('switch').bind_value(app.storage.user, 'test_switch')
  84. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  85. screen.open('/')
  86. screen.click('switch')
  87. screen.wait(0.5)
  88. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text('utf-8') == '{"test_switch":true}'
  89. def test_access_user_storage_from_button_click_handler(screen: Screen):
  90. @ui.page('/')
  91. async def page():
  92. ui.button('test', on_click=app.storage.user.update(inner_function='works'))
  93. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  94. screen.open('/')
  95. screen.click('test')
  96. screen.wait(1)
  97. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text('utf-8') == '{"inner_function":"works"}'
  98. async def test_access_user_storage_from_background_task(screen: Screen):
  99. @ui.page('/')
  100. def page():
  101. async def subtask():
  102. await asyncio.sleep(0.1)
  103. app.storage.user['subtask'] = 'works'
  104. background_tasks.create(subtask())
  105. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  106. screen.open('/')
  107. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text('utf-8') == '{"subtask":"works"}'
  108. def test_user_and_general_storage_is_persisted(screen: Screen):
  109. @ui.page('/')
  110. def page():
  111. app.storage.user['count'] = app.storage.user.get('count', 0) + 1
  112. app.storage.general['count'] = app.storage.general.get('count', 0) + 1
  113. ui.label(f'user: {app.storage.user["count"]}')
  114. ui.label(f'general: {app.storage.general["count"]}')
  115. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  116. screen.open('/')
  117. screen.open('/')
  118. screen.open('/')
  119. screen.should_contain('user: 3')
  120. screen.should_contain('general: 3')
  121. screen.selenium.delete_all_cookies()
  122. screen.open('/')
  123. screen.should_contain('user: 1')
  124. screen.should_contain('general: 4')
  125. def test_rapid_storage(screen: Screen):
  126. # https://github.com/zauberzeug/nicegui/issues/1099
  127. ui.button('test', on_click=lambda: (
  128. app.storage.general.update(one=1),
  129. app.storage.general.update(two=2),
  130. app.storage.general.update(three=3),
  131. ))
  132. screen.open('/')
  133. screen.click('test')
  134. screen.wait(0.5)
  135. assert Path('.nicegui', 'storage-general.json').read_text('utf-8') == '{"one":1,"two":2,"three":3}'
  136. def test_tab_storage_is_local(screen: Screen):
  137. @ui.page('/')
  138. async def page():
  139. await context.client.connected()
  140. app.storage.tab['count'] = app.storage.tab.get('count', 0) + 1
  141. ui.label().bind_text_from(app.storage.tab, 'count')
  142. screen.open('/')
  143. screen.should_contain('1')
  144. screen.open('/')
  145. screen.should_contain('2')
  146. screen.switch_to(1)
  147. screen.open('/')
  148. screen.should_contain('1')
  149. screen.switch_to(0)
  150. screen.open('/')
  151. screen.should_contain('3')
  152. def test_tab_storage_is_auto_removed(screen: Screen):
  153. storage_module.PURGE_INTERVAL = 0.1
  154. app.storage.max_tab_storage_age = 0.5
  155. @ui.page('/')
  156. async def page():
  157. await context.client.connected()
  158. app.storage.tab['count'] = app.storage.tab.get('count', 0) + 1
  159. ui.label().bind_text_from(app.storage.tab, 'count')
  160. screen.open('/')
  161. screen.should_contain('1')
  162. screen.open('/')
  163. screen.should_contain('2')
  164. screen.wait(1)
  165. screen.open('/')
  166. screen.should_contain('1')
  167. def test_clear_tab_storage(screen: Screen):
  168. storage_module.PURGE_INTERVAL = 60
  169. @ui.page('/')
  170. async def page():
  171. await context.client.connected()
  172. app.storage.tab['test'] = '123'
  173. ui.button('clear', on_click=app.storage.clear)
  174. screen.open('/')
  175. screen.should_contain('clear')
  176. tab_storages = app.storage._tabs # pylint: disable=protected-access
  177. assert len(tab_storages) == 1
  178. assert next(iter(tab_storages.values())) == {'test': '123'}
  179. screen.click('clear')
  180. screen.wait(0.5)
  181. assert not tab_storages
  182. def test_client_storage(screen: Screen):
  183. def increment():
  184. app.storage.client['counter'] = app.storage.client['counter'] + 1
  185. @ui.page('/')
  186. def page():
  187. app.storage.client['counter'] = 123
  188. ui.button('Increment').on_click(increment)
  189. ui.label().bind_text(app.storage.client, 'counter')
  190. screen.open('/')
  191. screen.should_contain('123')
  192. screen.click('Increment')
  193. screen.wait_for('124')
  194. screen.switch_to(1)
  195. screen.open('/')
  196. screen.should_contain('123')
  197. screen.switch_to(0)
  198. screen.should_contain('124')
  199. def test_clear_client_storage(screen: Screen):
  200. with pytest.raises(RuntimeError): # no context (auto index)
  201. app.storage.client.clear()
  202. @ui.page('/')
  203. def page():
  204. app.storage.client['counter'] = 123
  205. app.storage.client.clear()
  206. assert app.storage.client == {}
  207. screen.open('/')
  208. def test_deepcopy(screen: Screen):
  209. # https://github.com/zauberzeug/nicegui/issues/3023
  210. @ui.page('/')
  211. def page():
  212. app.storage.general['a'] = {'b': 0}
  213. copy.deepcopy(app.storage.general['a'])
  214. ui.label('Loaded')
  215. screen.open('/')
  216. screen.should_contain('Loaded')
  217. screen.wait(0.5)
  218. assert Path('.nicegui', 'storage-general.json').read_text('utf-8') == '{"a":{"b":0}}'
  219. def test_missing_storage_secret(screen: Screen):
  220. @ui.page('/')
  221. def page():
  222. ui.label(app.storage.user.get('message', 'no message'))
  223. core.app.user_middleware.clear() # remove the session middlewares added by prepare_simulation by default
  224. screen.open('/')
  225. screen.assert_py_logger('ERROR', 'app.storage.user needs a storage_secret passed in ui.run()')
  226. def test_storage_access_in_on_connect(screen: Screen):
  227. @ui.page('/')
  228. def root():
  229. app.storage.user['value'] = 'Test'
  230. app.on_connect(lambda: ui.label(app.storage.user.get('value')))
  231. screen.ui_run_kwargs['storage_secret'] = 'secret'
  232. screen.open('/')
  233. screen.should_contain('Test')
  234. def test_storage_access_in_binding_function(screen: Screen):
  235. model = {'name': 'John'}
  236. @ui.page('/')
  237. def index():
  238. def f(v):
  239. return v + app.storage.user.get('surname', '')
  240. ui.label().bind_text_from(model, 'name', backward=f)
  241. screen.ui_run_kwargs['storage_secret'] = 'secret'
  242. screen.open('/')
  243. screen.assert_py_logger('ERROR', 'app.storage.user can only be used within a UI context')