123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- import asyncio
- import copy
- import sys
- from nicegui import ui
- from nicegui.observables import ObservableDict, ObservableList, ObservableSet
- from nicegui.testing import Screen
- # pylint: disable=global-statement
- count = 0
- def reset_counter():
- global count # noqa: PLW0603
- count = 0
- def increment_counter():
- global count # noqa: PLW0603
- count += 1
- async def increment_counter_slowly(_):
- global count # noqa: PLW0603
- await asyncio.sleep(0.1)
- count += 1
- def test_observable_dict():
- reset_counter()
- data = ObservableDict(on_change=increment_counter)
- data['a'] = 1
- assert count == 1
- del data['a']
- assert count == 2
- data.update({'b': 2, 'c': 3})
- assert count == 3
- data.pop('b')
- assert count == 4
- data.popitem()
- assert count == 5
- data.clear()
- assert count == 6
- data.setdefault('a', 1)
- assert count == 7
- if sys.version_info >= (3, 9):
- data |= {'b': 2}
- assert count == 8
- def test_observable_list():
- reset_counter()
- data = ObservableList(on_change=increment_counter)
- data.append(1)
- assert count == 1
- data.extend([2, 3, 4])
- assert count == 2
- data.insert(0, 0)
- assert count == 3
- data.remove(1)
- assert count == 4
- data.pop(-1)
- assert count == 5
- data.sort()
- assert count == 6
- data.reverse()
- assert count == 7
- data[0] = 1
- assert count == 8
- data[0:2] = [1, 2, 3]
- assert count == 9
- del data[0]
- assert count == 10
- del data[0:1]
- assert count == 11
- data.clear()
- assert count == 12
- data += [1, 2, 3]
- assert count == 13
- def test_observable_set():
- reset_counter()
- data = ObservableSet({1, 2, 3, 4, 5}, on_change=increment_counter)
- data.add(1)
- assert count == 1
- data.remove(1)
- assert count == 2
- data.discard(2)
- assert count == 3
- data.pop()
- assert count == 4
- data.clear()
- assert count == 5
- data.update({1, 2, 3})
- assert count == 6
- data.intersection_update({1, 2})
- assert count == 7
- data.difference_update({1})
- assert count == 8
- data.symmetric_difference_update({1, 2})
- assert count == 9
- data |= {1, 2, 3}
- assert count == 10
- data &= {1, 2}
- assert count == 11
- data -= {1}
- assert count == 12
- data ^= {1, 2}
- assert count == 13
- def test_nested_observables():
- reset_counter()
- data = ObservableDict({
- 'a': 1,
- 'b': [1, 2, 3, {'x': 1, 'y': 2, 'z': 3}],
- 'c': {'x': 1, 'y': 2, 'z': 3, 't': [1, 2, 3]},
- 'd': {1, 2, 3},
- }, on_change=increment_counter)
- data['a'] = 42
- assert count == 1
- data['b'].append(4)
- assert count == 2
- data['b'][3].update(t=4)
- assert count == 3
- data['c']['x'] = 2
- assert count == 4
- data['c']['t'].append(4)
- assert count == 5
- data['d'].add(4)
- assert count == 6
- def test_async_handler(screen: Screen):
- reset_counter()
- data = ObservableList(on_change=increment_counter_slowly)
- ui.button('Append 42', on_click=lambda: data.append(42))
- screen.open('/')
- assert count == 0
- screen.click('Append 42')
- screen.wait(0.5)
- assert count == 1
- def test_setting_change_handler():
- reset_counter()
- data = ObservableList()
- data.append(1)
- assert count == 0
- data.on_change(increment_counter)
- data.append(2)
- assert count == 1
- def test_copy():
- a = ObservableList([[1, 2, 3], [4, 5, 6]])
- b = copy.copy(a)
- c = copy.deepcopy(a)
- a.append([7, 8, 9])
- a[0][0] = 0
- assert a == [[0, 2, 3], [4, 5, 6], [7, 8, 9]]
- assert b == [[0, 2, 3], [4, 5, 6]]
- assert c == [[1, 2, 3], [4, 5, 6]]
|