1
0

binding.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. #!/usr/bin/env python3
  2. import asyncio
  3. from collections import defaultdict
  4. from typing import Any, Callable, Optional, Set, Tuple
  5. from justpy.htmlcomponents import HTMLBaseComponent
  6. from .globals import config
  7. from .task_logger import create_task
  8. bindings = defaultdict(list)
  9. bindable_properties = dict()
  10. active_links = []
  11. async def loop():
  12. while True:
  13. visited: Set[Tuple[int, str]] = set()
  14. visited_views: Set[HTMLBaseComponent] = set()
  15. for link in active_links:
  16. (source_obj, source_name, target_obj, target_name, transform) = link
  17. value = transform(getattr(source_obj, source_name))
  18. if getattr(target_obj, target_name) != value:
  19. setattr(target_obj, target_name, value)
  20. propagate(target_obj, target_name, visited, visited_views)
  21. update_views(visited_views)
  22. await asyncio.sleep(config.binding_refresh_interval)
  23. async def update_views_async(views: Set[HTMLBaseComponent]):
  24. for view in views:
  25. await view.update()
  26. def update_views(views: Set[HTMLBaseComponent]):
  27. if asyncio._get_running_loop() is None:
  28. return # NOTE: no need to update view if event loop is not running, yet
  29. create_task(update_views_async(views), name='update_views_async')
  30. def propagate(source_obj,
  31. source_name,
  32. visited: Set[Tuple[int, str]] = None,
  33. visited_views: Set[HTMLBaseComponent] = None) -> Set[HTMLBaseComponent]:
  34. if visited is None:
  35. visited = set()
  36. if visited_views is None:
  37. visited_views = set()
  38. visited.add((id(source_obj), source_name))
  39. if isinstance(source_obj, HTMLBaseComponent):
  40. visited_views.add(source_obj)
  41. for _, target_obj, target_name, transform in bindings[(id(source_obj), source_name)]:
  42. if (id(target_obj), target_name) in visited:
  43. continue
  44. target_value = transform(getattr(source_obj, source_name))
  45. if getattr(target_obj, target_name) != target_value:
  46. setattr(target_obj, target_name, target_value)
  47. propagate(target_obj, target_name, visited, visited_views)
  48. return visited_views
  49. def bind_to(self_obj: Any, self_name: str, other_obj: Any, other_name: str, forward: Callable):
  50. bindings[(id(self_obj), self_name)].append((self_obj, other_obj, other_name, forward))
  51. if (id(self_obj), self_name) not in bindable_properties:
  52. active_links.append((self_obj, self_name, other_obj, other_name, forward))
  53. update_views(propagate(self_obj, self_name))
  54. def bind_from(self_obj: Any, self_name: str, other_obj: Any, other_name: str, backward: Callable):
  55. bindings[(id(other_obj), other_name)].append((other_obj, self_obj, self_name, backward))
  56. if (id(other_obj), other_name) not in bindable_properties:
  57. active_links.append((other_obj, other_name, self_obj, self_name, backward))
  58. update_views(propagate(other_obj, other_name))
  59. class BindableProperty:
  60. def __init__(self, on_change: Optional[Callable] = None):
  61. self.on_change = on_change
  62. def __set_name__(self, _, name: str):
  63. self.name = name
  64. def __get__(self, owner: Any, _=None):
  65. return getattr(owner, '_' + self.name)
  66. def __set__(self, owner: Any, value: Any):
  67. value_changed = getattr(owner, '_' + self.name, value) != value
  68. setattr(owner, '_' + self.name, value)
  69. bindable_properties[(id(owner), self.name)] = owner
  70. update_views(propagate(owner, self.name))
  71. if value_changed and self.on_change is not None:
  72. self.on_change(owner, value)