test_storage.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import asyncio
  2. from pathlib import Path
  3. import httpx
  4. from nicegui import Client, app, background_tasks, ui
  5. from nicegui.testing import Screen
  6. def test_browser_data_is_stored_in_the_browser(screen: Screen):
  7. @ui.page('/')
  8. def page():
  9. app.storage.browser['count'] = app.storage.browser.get('count', 0) + 1
  10. ui.label().bind_text_from(app.storage.browser, 'count')
  11. @app.get('/count')
  12. def count():
  13. return 'count = ' + str(app.storage.browser['count'])
  14. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  15. screen.open('/')
  16. screen.should_contain('1')
  17. screen.open('/')
  18. screen.should_contain('2')
  19. screen.open('/')
  20. screen.should_contain('3')
  21. screen.open('/count')
  22. screen.should_contain('count = 3') # also works with FastAPI endpoints
  23. def test_browser_storage_supports_asyncio(screen: Screen):
  24. @ui.page('/')
  25. async def page():
  26. app.storage.browser['count'] = app.storage.browser.get('count', 0) + 1
  27. await asyncio.sleep(0.5)
  28. ui.label(app.storage.browser['count'])
  29. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  30. screen.open('/')
  31. screen.switch_to(1)
  32. screen.open('/')
  33. screen.should_contain('2')
  34. screen.switch_to(0)
  35. screen.open('/')
  36. screen.should_contain('3')
  37. def test_browser_storage_modifications_after_page_load_are_forbidden(screen: Screen):
  38. @ui.page('/')
  39. async def page(client: Client):
  40. await client.connected()
  41. try:
  42. app.storage.browser['test'] = 'data'
  43. except TypeError as e:
  44. ui.label(str(e))
  45. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  46. screen.open('/')
  47. screen.should_contain('response to the browser has already been built')
  48. def test_user_storage_modifications(screen: Screen):
  49. @ui.page('/')
  50. async def page(client: Client, delayed: bool = False):
  51. if delayed:
  52. await client.connected()
  53. app.storage.user['count'] = app.storage.user.get('count', 0) + 1
  54. ui.label().bind_text_from(app.storage.user, 'count')
  55. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  56. screen.open('/')
  57. screen.should_contain('1')
  58. screen.open('/?delayed=True')
  59. screen.should_contain('2')
  60. screen.open('/')
  61. screen.should_contain('3')
  62. async def test_access_user_storage_from_fastapi(screen: Screen):
  63. @app.get('/api')
  64. def api():
  65. app.storage.user['msg'] = 'yes'
  66. return 'OK'
  67. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  68. screen.open('/')
  69. async with httpx.AsyncClient() as http_client:
  70. response = await http_client.get(f'http://localhost:{Screen.PORT}/api')
  71. assert response.status_code == 200
  72. assert response.text == '"OK"'
  73. await asyncio.sleep(0.5) # wait for storage to be written
  74. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text('utf-8') == '{"msg":"yes"}'
  75. def test_access_user_storage_on_interaction(screen: Screen):
  76. @ui.page('/')
  77. async def page():
  78. if 'test_switch' not in app.storage.user:
  79. app.storage.user['test_switch'] = False
  80. ui.switch('switch').bind_value(app.storage.user, 'test_switch')
  81. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  82. screen.open('/')
  83. screen.click('switch')
  84. screen.wait(0.5)
  85. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text('utf-8') == '{"test_switch":true}'
  86. def test_access_user_storage_from_button_click_handler(screen: Screen):
  87. @ui.page('/')
  88. async def page():
  89. ui.button('test', on_click=app.storage.user.update(inner_function='works'))
  90. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  91. screen.open('/')
  92. screen.click('test')
  93. screen.wait(1)
  94. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text('utf-8') == '{"inner_function":"works"}'
  95. async def test_access_user_storage_from_background_task(screen: Screen):
  96. @ui.page('/')
  97. def page():
  98. async def subtask():
  99. await asyncio.sleep(0.1)
  100. app.storage.user['subtask'] = 'works'
  101. background_tasks.create(subtask())
  102. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  103. screen.open('/')
  104. assert next(Path('.nicegui').glob('storage-user-*.json')).read_text('utf-8') == '{"subtask":"works"}'
  105. def test_user_and_general_storage_is_persisted(screen: Screen):
  106. @ui.page('/')
  107. def page():
  108. app.storage.user['count'] = app.storage.user.get('count', 0) + 1
  109. app.storage.general['count'] = app.storage.general.get('count', 0) + 1
  110. ui.label(f'user: {app.storage.user["count"]}')
  111. ui.label(f'general: {app.storage.general["count"]}')
  112. screen.ui_run_kwargs['storage_secret'] = 'just a test'
  113. screen.open('/')
  114. screen.open('/')
  115. screen.open('/')
  116. screen.should_contain('user: 3')
  117. screen.should_contain('general: 3')
  118. screen.selenium.delete_all_cookies()
  119. screen.open('/')
  120. screen.should_contain('user: 1')
  121. screen.should_contain('general: 4')
  122. def test_rapid_storage(screen: Screen):
  123. # https://github.com/zauberzeug/nicegui/issues/1099
  124. ui.button('test', on_click=lambda: (
  125. app.storage.general.update(one=1),
  126. app.storage.general.update(two=2),
  127. app.storage.general.update(three=3),
  128. ))
  129. screen.open('/')
  130. screen.click('test')
  131. screen.wait(0.5)
  132. assert Path('.nicegui', 'storage-general.json').read_text('utf-8') == '{"one":1,"two":2,"three":3}'