binding.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import asyncio
  2. import time
  3. from collections import defaultdict
  4. from collections.abc import Mapping
  5. from typing import Any, Callable, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple, Type, Union
  6. from . import core
  7. from .logging import log
  8. MAX_PROPAGATION_TIME = 0.01
  9. bindings: DefaultDict[Tuple[int, str], List] = defaultdict(list)
  10. bindable_properties: Dict[Tuple[int, str], Any] = {}
  11. active_links: List[Tuple[Any, str, Any, str, Callable[[Any], Any]]] = []
  12. def _has_attribute(obj: Union[object, Mapping], name: str) -> Any:
  13. if isinstance(obj, Mapping):
  14. return name in obj
  15. return hasattr(obj, name)
  16. def _get_attribute(obj: Union[object, Mapping], name: str) -> Any:
  17. if isinstance(obj, Mapping):
  18. return obj[name]
  19. return getattr(obj, name)
  20. def _set_attribute(obj: Union[object, Mapping], name: str, value: Any) -> None:
  21. if isinstance(obj, dict):
  22. obj[name] = value
  23. else:
  24. setattr(obj, name, value)
  25. async def refresh_loop() -> None:
  26. """Refresh all bindings in an endless loop."""
  27. while True:
  28. _refresh_step()
  29. await asyncio.sleep(core.app.config.binding_refresh_interval)
  30. def _refresh_step() -> None:
  31. visited: Set[Tuple[int, str]] = set()
  32. t = time.time()
  33. for link in active_links:
  34. (source_obj, source_name, target_obj, target_name, transform) = link
  35. if _has_attribute(source_obj, source_name):
  36. value = transform(_get_attribute(source_obj, source_name))
  37. if not _has_attribute(target_obj, target_name) or _get_attribute(target_obj, target_name) != value:
  38. _set_attribute(target_obj, target_name, value)
  39. _propagate(target_obj, target_name, visited)
  40. del link, source_obj, target_obj # pylint: disable=modified-iterating-list
  41. if time.time() - t > MAX_PROPAGATION_TIME:
  42. log.warning(f'binding propagation for {len(active_links)} active links took {time.time() - t:.3f} s')
  43. def _propagate(source_obj: Any, source_name: str, visited: Optional[Set[Tuple[int, str]]] = None) -> None:
  44. if visited is None:
  45. visited = set()
  46. visited.add((id(source_obj), source_name))
  47. for _, target_obj, target_name, transform in bindings.get((id(source_obj), source_name), []):
  48. if (id(target_obj), target_name) in visited:
  49. continue
  50. if _has_attribute(source_obj, source_name):
  51. target_value = transform(_get_attribute(source_obj, source_name))
  52. if not _has_attribute(target_obj, target_name) or _get_attribute(target_obj, target_name) != target_value:
  53. _set_attribute(target_obj, target_name, target_value)
  54. _propagate(target_obj, target_name, visited)
  55. def bind_to(self_obj: Any, self_name: str, other_obj: Any, other_name: str, forward: Callable[[Any], Any]) -> None:
  56. """Bind the property of one object to the property of another object.
  57. The binding works one way only, from the first object to the second.
  58. :param self_obj: The object to bind from.
  59. :param self_name: The name of the property to bind from.
  60. :param other_obj: The object to bind to.
  61. :param other_name: The name of the property to bind to.
  62. :param forward: A function to apply to the value before applying it.
  63. """
  64. bindings[(id(self_obj), self_name)].append((self_obj, other_obj, other_name, forward))
  65. if (id(self_obj), self_name) not in bindable_properties:
  66. active_links.append((self_obj, self_name, other_obj, other_name, forward))
  67. _propagate(self_obj, self_name)
  68. def bind_from(self_obj: Any, self_name: str, other_obj: Any, other_name: str, backward: Callable[[Any], Any]) -> None:
  69. """Bind the property of one object from the property of another object.
  70. The binding works one way only, from the second object to the first.
  71. :param self_obj: The object to bind to.
  72. :param self_name: The name of the property to bind to.
  73. :param other_obj: The object to bind from.
  74. :param other_name: The name of the property to bind from.
  75. :param backward: A function to apply to the value before applying it.
  76. """
  77. bindings[(id(other_obj), other_name)].append((other_obj, self_obj, self_name, backward))
  78. if (id(other_obj), other_name) not in bindable_properties:
  79. active_links.append((other_obj, other_name, self_obj, self_name, backward))
  80. _propagate(other_obj, other_name)
  81. def bind(self_obj: Any, self_name: str, other_obj: Any, other_name: str, *,
  82. forward: Callable[[Any], Any] = lambda x: x, backward: Callable[[Any], Any] = lambda x: x) -> None:
  83. """Bind the property of one object to the property of another object.
  84. The binding works both ways, from the first object to the second and from the second to the first.
  85. :param self_obj: First object to bind.
  86. :param self_name: The name of the first property to bind.
  87. :param other_obj: The second object to bind.
  88. :param other_name: The name of the second property to bind.
  89. :param forward: A function to apply to the value before applying it to the second object.
  90. :param backward: A function to apply to the value before applying it to the first object.
  91. """
  92. bind_from(self_obj, self_name, other_obj, other_name, backward=backward)
  93. bind_to(self_obj, self_name, other_obj, other_name, forward=forward)
  94. class BindableProperty:
  95. def __init__(self, on_change: Optional[Callable[..., Any]] = None) -> None:
  96. self._change_handler = on_change
  97. def __set_name__(self, _, name: str) -> None:
  98. self.name = name # pylint: disable=attribute-defined-outside-init
  99. def __get__(self, owner: Any, _=None) -> Any:
  100. return getattr(owner, '___' + self.name)
  101. def __set__(self, owner: Any, value: Any) -> None:
  102. has_attr = hasattr(owner, '___' + self.name)
  103. value_changed = has_attr and getattr(owner, '___' + self.name) != value
  104. if has_attr and not value_changed:
  105. return
  106. setattr(owner, '___' + self.name, value)
  107. bindable_properties[(id(owner), self.name)] = owner
  108. _propagate(owner, self.name)
  109. if value_changed and self._change_handler is not None:
  110. self._change_handler(owner, value)
  111. def remove(objects: Iterable[Any], type_: Type) -> None:
  112. """Remove all bindings that involve the given objects.
  113. The ``type_`` argument is as a quick pre-filter.
  114. :param objects: The objects to remove.
  115. :param type_: The type of the objects to remove.
  116. """
  117. active_links[:] = [
  118. (source_obj, source_name, target_obj, target_name, transform)
  119. for source_obj, source_name, target_obj, target_name, transform in active_links
  120. if not (isinstance(source_obj, type_) and source_obj in objects or
  121. isinstance(target_obj, type_) and target_obj in objects)
  122. ]
  123. for key, binding_list in list(bindings.items()):
  124. binding_list[:] = [
  125. (source_obj, target_obj, target_name, transform)
  126. for source_obj, target_obj, target_name, transform in binding_list
  127. if not (isinstance(source_obj, type_) and source_obj in objects or
  128. isinstance(target_obj, type_) and target_obj in objects)
  129. ]
  130. if not binding_list:
  131. del bindings[key]
  132. for (obj_id, name), obj in list(bindable_properties.items()):
  133. if isinstance(obj, type_) and obj in objects:
  134. del bindable_properties[(obj_id, name)]
  135. def reset() -> None:
  136. """Clear all bindings.
  137. This function is intended for testing purposes only.
  138. """
  139. bindings.clear()
  140. bindable_properties.clear()
  141. active_links.clear()