|
@@ -2,6 +2,7 @@ import asyncio
|
|
|
import logging
|
|
|
import time
|
|
|
from collections import defaultdict
|
|
|
+from collections.abc import Mapping
|
|
|
from typing import Any, Callable, DefaultDict, Dict, List, Optional, Set, Tuple, Type, Union
|
|
|
|
|
|
from . import globals
|
|
@@ -11,14 +12,21 @@ bindable_properties: Dict[Tuple[int, str], Any] = {}
|
|
|
active_links: List[Tuple[Any, str, Any, str, Callable[[Any], Any]]] = []
|
|
|
|
|
|
|
|
|
-def get_attribute(obj: Union[object, Dict], name: str) -> Any:
|
|
|
- if isinstance(obj, dict):
|
|
|
+def has_attribute(obj: Union[object, Mapping], name: str) -> Any:
|
|
|
+ if isinstance(obj, Mapping):
|
|
|
+ return name in obj
|
|
|
+ else:
|
|
|
+ return hasattr(obj, name)
|
|
|
+
|
|
|
+
|
|
|
+def get_attribute(obj: Union[object, Mapping], name: str) -> Any:
|
|
|
+ if isinstance(obj, Mapping):
|
|
|
return obj[name]
|
|
|
else:
|
|
|
return getattr(obj, name)
|
|
|
|
|
|
|
|
|
-def set_attribute(obj: Union[object, Dict], name: str, value: Any) -> None:
|
|
|
+def set_attribute(obj: Union[object, Mapping], name: str, value: Any) -> None:
|
|
|
if isinstance(obj, dict):
|
|
|
obj[name] = value
|
|
|
else:
|
|
@@ -31,10 +39,11 @@ async def loop() -> None:
|
|
|
t = time.time()
|
|
|
for link in active_links:
|
|
|
(source_obj, source_name, target_obj, target_name, transform) = link
|
|
|
- value = transform(get_attribute(source_obj, source_name))
|
|
|
- if get_attribute(target_obj, target_name) != value:
|
|
|
- set_attribute(target_obj, target_name, value)
|
|
|
- propagate(target_obj, target_name, visited)
|
|
|
+ if has_attribute(source_obj, source_name):
|
|
|
+ value = transform(get_attribute(source_obj, source_name))
|
|
|
+ if not has_attribute(target_obj, target_name) or get_attribute(target_obj, target_name) != value:
|
|
|
+ set_attribute(target_obj, target_name, value)
|
|
|
+ propagate(target_obj, target_name, visited)
|
|
|
del link, source_obj, target_obj
|
|
|
if time.time() - t > 0.01:
|
|
|
logging.warning(f'binding propagation for {len(active_links)} active links took {time.time() - t:.3f} s')
|
|
@@ -48,10 +57,11 @@ def propagate(source_obj: Any, source_name: str, visited: Optional[Set[Tuple[int
|
|
|
for _, target_obj, target_name, transform in bindings.get((id(source_obj), source_name), []):
|
|
|
if (id(target_obj), target_name) in visited:
|
|
|
continue
|
|
|
- target_value = transform(get_attribute(source_obj, source_name))
|
|
|
- if get_attribute(target_obj, target_name) != target_value:
|
|
|
- set_attribute(target_obj, target_name, target_value)
|
|
|
- propagate(target_obj, target_name, visited)
|
|
|
+ if has_attribute(source_obj, source_name):
|
|
|
+ target_value = transform(get_attribute(source_obj, source_name))
|
|
|
+ if not has_attribute(target_obj, target_name) or get_attribute(target_obj, target_name) != target_value:
|
|
|
+ set_attribute(target_obj, target_name, target_value)
|
|
|
+ propagate(target_obj, target_name, visited)
|
|
|
|
|
|
|
|
|
def bind_to(self_obj: Any, self_name: str, other_obj: Any, other_name: str, forward: Callable[[Any], Any]) -> None:
|