|
@@ -43,6 +43,8 @@ class _Element(ABC):
|
|
_DEFAULT_PROPERTY = ""
|
|
_DEFAULT_PROPERTY = ""
|
|
__RE_INDEXED_PROPERTY = re.compile(r"^(.*?)__([\w\d]+)$")
|
|
__RE_INDEXED_PROPERTY = re.compile(r"^(.*?)__([\w\d]+)$")
|
|
_NEW_LAMBDA_NAME = "new_lambda"
|
|
_NEW_LAMBDA_NAME = "new_lambda"
|
|
|
|
+ _TAIPY_EMBEDDED_PREFIX = "_tp_embedded_"
|
|
|
|
+ _EMBEDED_PROPERTIES = ["decimator"]
|
|
|
|
|
|
def __new__(cls, *args, **kwargs):
|
|
def __new__(cls, *args, **kwargs):
|
|
obj = super(_Element, cls).__new__(cls)
|
|
obj = super(_Element, cls).__new__(cls)
|
|
@@ -93,47 +95,66 @@ class _Element(ABC):
|
|
return value
|
|
return value
|
|
if isinstance(value, FunctionType):
|
|
if isinstance(value, FunctionType):
|
|
if key.startswith("on_"):
|
|
if key.startswith("on_"):
|
|
- if value.__name__.startswith("<"):
|
|
|
|
- return value
|
|
|
|
- return value.__name__
|
|
|
|
-
|
|
|
|
- try:
|
|
|
|
- source = inspect.findsource(value)
|
|
|
|
- st = ast.parse("".join(source[0]))
|
|
|
|
- lambda_by_name: t.Dict[str, ast.Lambda] = {}
|
|
|
|
- _LambdaByName(self._ELEMENT_NAME, source[1], lambda_by_name).visit(st)
|
|
|
|
- lambda_fn = lambda_by_name.get(
|
|
|
|
- key,
|
|
|
|
- lambda_by_name.get(_LambdaByName._DEFAULT_NAME, None) if key == self._DEFAULT_PROPERTY else None,
|
|
|
|
- )
|
|
|
|
- if lambda_fn is not None:
|
|
|
|
- args = [arg.arg for arg in lambda_fn.args.args]
|
|
|
|
- targets = [
|
|
|
|
- compr.target.id # type: ignore[attr-defined]
|
|
|
|
- for node in ast.walk(lambda_fn.body)
|
|
|
|
- if isinstance(node, ast.ListComp)
|
|
|
|
- for compr in node.generators
|
|
|
|
- ]
|
|
|
|
- tree = _TransformVarToValue(self.__calling_frame, args + targets + _python_builtins).visit(
|
|
|
|
- lambda_fn
|
|
|
|
- )
|
|
|
|
- ast.fix_missing_locations(tree)
|
|
|
|
- if sys.version_info < (3, 9): # python 3.8 ast has no unparse
|
|
|
|
- string_fd = io.StringIO()
|
|
|
|
- _Unparser(tree, string_fd)
|
|
|
|
- string_fd.seek(0)
|
|
|
|
- lambda_text = string_fd.read()
|
|
|
|
- else:
|
|
|
|
- lambda_text = ast.unparse(tree)
|
|
|
|
- lambda_name = f"__lambda_{uuid.uuid4().hex}"
|
|
|
|
- self._lambdas[lambda_name] = lambda_text
|
|
|
|
- return f'{{{lambda_name}({", ".join(args)})}}'
|
|
|
|
- except Exception as e:
|
|
|
|
- _warn("Error in lambda expression", e)
|
|
|
|
|
|
+ return value if value.__name__.startswith("<") else value.__name__
|
|
|
|
+ # Parse lambda function
|
|
|
|
+ if (lambda_name := self.__parse_lambda_property(key, value)) is not None:
|
|
|
|
+ return lambda_name
|
|
|
|
+ # Embed value in the caller frame
|
|
|
|
+ if not isinstance(value, str) and key in self._EMBEDED_PROPERTIES:
|
|
|
|
+ return self.__embed_object(value, is_expression=False)
|
|
if hasattr(value, "__name__"):
|
|
if hasattr(value, "__name__"):
|
|
return str(getattr(value, "__name__")) # noqa: B009
|
|
return str(getattr(value, "__name__")) # noqa: B009
|
|
return str(value)
|
|
return str(value)
|
|
|
|
|
|
|
|
+ def __parse_lambda_property(self, key: str, value: t.Any) -> t.Any:
|
|
|
|
+ try:
|
|
|
|
+ source = inspect.findsource(value)
|
|
|
|
+ st = ast.parse("".join(source[0]))
|
|
|
|
+ lambda_by_name: t.Dict[str, ast.Lambda] = {}
|
|
|
|
+ _LambdaByName(self._ELEMENT_NAME, source[1], lambda_by_name).visit(st)
|
|
|
|
+ lambda_fn = lambda_by_name.get(
|
|
|
|
+ key,
|
|
|
|
+ lambda_by_name.get(_LambdaByName._DEFAULT_NAME, None) if key == self._DEFAULT_PROPERTY else None,
|
|
|
|
+ )
|
|
|
|
+ if lambda_fn is None:
|
|
|
|
+ return None
|
|
|
|
+ args = [arg.arg for arg in lambda_fn.args.args]
|
|
|
|
+ targets = [
|
|
|
|
+ compr.target.id # type: ignore[attr-defined]
|
|
|
|
+ for node in ast.walk(lambda_fn.body)
|
|
|
|
+ if isinstance(node, ast.ListComp)
|
|
|
|
+ for compr in node.generators
|
|
|
|
+ ]
|
|
|
|
+ tree = _TransformVarToValue(self.__calling_frame, args + targets + _python_builtins).visit(lambda_fn)
|
|
|
|
+ ast.fix_missing_locations(tree)
|
|
|
|
+ if sys.version_info < (3, 9): # python 3.8 ast has no unparse
|
|
|
|
+ string_fd = io.StringIO()
|
|
|
|
+ _Unparser(tree, string_fd)
|
|
|
|
+ string_fd.seek(0)
|
|
|
|
+ lambda_text = string_fd.read()
|
|
|
|
+ else:
|
|
|
|
+ lambda_text = ast.unparse(tree)
|
|
|
|
+ lambda_name = f"__lambda_{uuid.uuid4().hex}"
|
|
|
|
+ self._lambdas[lambda_name] = lambda_text
|
|
|
|
+ return f'{{{lambda_name}({", ".join(args)})}}'
|
|
|
|
+ except Exception as e:
|
|
|
|
+ _warn("Error in lambda expression", e)
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ def __embed_object(self, obj: t.Any, is_expression=True) -> str:
|
|
|
|
+ """Embed an object in the caller frame
|
|
|
|
+
|
|
|
|
+ Return the Taipy expression of the embedded object
|
|
|
|
+ """
|
|
|
|
+ frame_locals = self.__calling_frame.f_locals
|
|
|
|
+ obj_var_name = self._TAIPY_EMBEDDED_PREFIX + obj.__class__.__name__
|
|
|
|
+ index = 0
|
|
|
|
+ while f"{obj_var_name}_{index}" in frame_locals:
|
|
|
|
+ index += 1
|
|
|
|
+ obj_var_name = f"{obj_var_name}_{index}"
|
|
|
|
+ frame_locals[obj_var_name] = obj
|
|
|
|
+ return f"{{{obj_var_name}}}" if is_expression else obj_var_name
|
|
|
|
+
|
|
@abstractmethod
|
|
@abstractmethod
|
|
def _render(self, gui: "Gui") -> str:
|
|
def _render(self, gui: "Gui") -> str:
|
|
pass
|
|
pass
|