1
0

_evaluator.py 18 KB


  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from __future__ import annotations
  12. import ast
  13. import builtins
  14. import re
  15. import typing as t
  16. import warnings
  17. from .._warnings import TaipyGuiWarning, _warn
  18. if t.TYPE_CHECKING:
  19. from ..gui import Gui
  20. from . import (
  21. _get_client_var_name,
  22. _get_expr_var_name,
  23. _get_lambda_id,
  24. _getscopeattr,
  25. _getscopeattr_drill,
  26. _hasscopeattr,
  27. _MapDict,
  28. _setscopeattr,
  29. _setscopeattr_drill,
  30. _TaipyBase,
  31. _variable_decode,
  32. _variable_encode,
  33. )
  34. class _Evaluator:
  35. # Regex to separate content from inside curly braces when evaluating f string expressions
  36. __EXPR_RE = re.compile(r"\{(([^\}]*)([^\{]*))\}")
  37. __EXPR_IS_EXPR = re.compile(r"[^\\][{}]")
  38. __EXPR_IS_EDGE_CASE = re.compile(r"^\s*{([^}]*)}\s*$")
  39. __EXPR_VALID_VAR_EDGE_CASE = re.compile(r"^([a-zA-Z\.\_0-9\[\]]*)$")
  40. __EXPR_EDGE_CASE_F_STRING = re.compile(r"[\{]*[a-zA-Z_][a-zA-Z0-9_]*:.+")
  41. __IS_TAIPY_EXPR_RE = re.compile(r"TpExPr_(.*)")
  42. __IS_ARRAY_EXPR_RE = re.compile(r"[^[]*\[(\d+)][^]]*")
  43. __CLEAN_LAMBDA_RE = re.compile(r"^__lambda_[\d_]+(TPMDL_\d+)?(.*)$")
  44. def __init__(self, default_bindings: t.Dict[str, t.Any], shared_variable: t.List[str]) -> None:
  45. # key = expression, value = hashed value of the expression
  46. self.__expr_to_hash: t.Dict[str, str] = {}
  47. # key = hashed value of the expression, value = expression
  48. self.__hash_to_expr: t.Dict[str, str] = {}
  49. # key = variable name of the expression, key = list of related expressions
  50. # ex: {x + y}
  51. # "x_TPMDL_0": ["{x + y}"],
  52. # "y_TPMDL_0": ["{x + y}"],
  53. self.__var_to_expr_list: t.Dict[str, t.List[str]] = {}
  54. # key = expression, value = list of related variables
  55. # "{x + y}": {"x": "x_TPMDL_", "y": "y_TPMDL_0"}
  56. self.__expr_to_var_map: t.Dict[str, t.Dict[str, str]] = {}
  57. # instead of binding everywhere the types
  58. self.__global_ctx = default_bindings
  59. # expr to holders
  60. self.__expr_to_holders: t.Dict[str, t.Set[t.Type[_TaipyBase]]] = {}
  61. # shared variables between multiple clients
  62. self.__shared_variable = shared_variable
  63. @staticmethod
  64. def _expr_decode(s: str):
  65. return str(result[1]) if (result := _Evaluator.__IS_TAIPY_EXPR_RE.match(s)) else s
  66. def get_hash_from_expr(self, expr: str) -> str:
  67. return self.__expr_to_hash.get(expr, expr)
  68. def get_expr_from_hash(self, hash_val: str) -> str:
  69. return self.__hash_to_expr.get(hash_val, hash_val)
  70. def get_shared_variables(self) -> t.List[str]:
  71. return self.__shared_variable
  72. def _is_expression(self, expr: str) -> bool:
  73. return len(_Evaluator.__EXPR_IS_EXPR.findall(expr)) != 0
  74. def _fetch_expression_list(self, expr: str) -> t.List:
  75. return [v[0] for v in _Evaluator.__EXPR_RE.findall(expr)]
  76. def _analyze_expression(
  77. self, gui: Gui, expr: str, lazy_declare: t.Optional[bool] = False
  78. ) -> t.Tuple[t.Dict[str, t.Any], t.Dict[str, str]]:
  79. var_val: t.Dict[str, t.Any] = {}
  80. var_map: t.Dict[str, str] = {}
  81. non_vars = list(self.__global_ctx.keys())
  82. builtin_vars = dir(builtins)
  83. non_vars.extend(builtin_vars)
  84. # Get a list of expressions (value that has been wrapped in curly braces {}) and find variables to bind
  85. for e in self._fetch_expression_list(expr):
  86. var_name = e.split(sep=".")[0]
  87. st = ast.parse('f"{' + e + '}"' if _Evaluator.__EXPR_EDGE_CASE_F_STRING.match(e) else e)
  88. args = [arg.arg for node in ast.walk(st) if isinstance(node, ast.arguments) for arg in node.args]
  89. targets = [
  90. comprehension.target.id # type: ignore[attr-defined]
  91. for node in ast.walk(st)
  92. if isinstance(node, ast.ListComp)
  93. for comprehension in node.generators
  94. ]
  95. functionsCalls = set()
  96. for node in ast.walk(st):
  97. if isinstance(node, ast.Call):
  98. functionsCalls.add(node.func)
  99. elif isinstance(node, ast.Name):
  100. var_name = node.id.split(sep=".")[0]
  101. if var_name in builtin_vars:
  102. if node not in functionsCalls:
  103. _warn(
  104. f"Variable '{var_name}' cannot be used in Taipy expressions "
  105. "as its name collides with a Python built-in identifier."
  106. )
  107. elif var_name not in args and var_name not in targets and var_name not in non_vars:
  108. try:
  109. if lazy_declare and var_name.startswith("__"):
  110. with warnings.catch_warnings(record=True) as warns:
  111. warnings.resetwarnings()
  112. encoded_var_name = gui._bind_var(var_name)
  113. if next((w for w in warns if w.category is TaipyGuiWarning), None):
  114. gui._bind_var_val(var_name, None)
  115. else:
  116. encoded_var_name = gui._bind_var(var_name)
  117. var_val[var_name] = _getscopeattr_drill(gui, encoded_var_name)
  118. var_map[var_name] = encoded_var_name
  119. except AttributeError as e:
  120. _warn(f"Variable '{var_name}' is not defined (in expression '{expr}')", e)
  121. return var_val, var_map
  122. def __save_expression(
  123. self,
  124. gui: Gui,
  125. expr: str,
  126. expr_hash: t.Optional[str],
  127. expr_evaluated: t.Optional[t.Any],
  128. var_map: t.Dict[str, str],
  129. lambda_expr: t.Optional[bool] = False,
  130. ):
  131. if expr in self.__expr_to_hash:
  132. expr_hash = self.__expr_to_hash[expr]
  133. gui._bind_var_val(expr_hash, expr_evaluated)
  134. return expr_hash
  135. if expr_hash is None:
  136. expr_hash = _get_expr_var_name(expr)
  137. elif not lambda_expr:
  138. # if lambda expr, it has a hasname, we work with that
  139. # edge case, only a single variable
  140. expr_hash = f"tpec_{_get_client_var_name(expr)}"
  141. self.__expr_to_hash[expr] = expr_hash
  142. gui._bind_var_val(expr_hash, expr_evaluated)
  143. self.__hash_to_expr[expr_hash] = expr
  144. for var in var_map.values():
  145. if var not in self.__global_ctx.keys():
  146. lst = self.__var_to_expr_list.get(var)
  147. if lst is None:
  148. self.__var_to_expr_list[var] = [expr]
  149. else:
  150. lst.append(expr)
  151. if expr not in self.__expr_to_var_map:
  152. self.__expr_to_var_map[expr] = var_map
  153. # save expr_hash to shared variable if valid
  154. for encoded_var_name in var_map.values():
  155. var_name, module_name = _variable_decode(encoded_var_name)
  156. # only variables in the main module with be taken into account
  157. if module_name is not None and module_name != gui._get_default_module_name():
  158. continue
  159. if var_name in self.__shared_variable:
  160. self.__shared_variable.append(expr_hash)
  161. return expr_hash
  162. def evaluate_bind_holder(self, gui: Gui, holder: t.Type[_TaipyBase], expr: str) -> str:
  163. expr_hash = self.__expr_to_hash.get(expr, "unknownExpr")
  164. hash_name = self.__get_holder_hash(holder, expr_hash)
  165. expr_lit = expr.replace("'", "\\'")
  166. holder_expr = f"{holder.__name__}({expr},'{expr_lit}')"
  167. self.__evaluate_holder(gui, holder, expr)
  168. if a_set := self.__expr_to_holders.get(expr):
  169. a_set.add(holder)
  170. else:
  171. self.__expr_to_holders[expr] = {holder}
  172. self.__expr_to_hash[holder_expr] = hash_name
  173. # expression is only the first part ...
  174. expr = expr.split(".")[0]
  175. self.__expr_to_var_map[holder_expr] = {expr: expr}
  176. if a_list := self.__var_to_expr_list.get(expr):
  177. if holder_expr not in a_list:
  178. a_list.append(holder_expr)
  179. else:
  180. self.__var_to_expr_list[expr] = [holder_expr]
  181. return hash_name
  182. def evaluate_holders(self, gui: Gui, expr: str) -> t.List[str]:
  183. lst = []
  184. for hld in self.__expr_to_holders.get(expr, []):
  185. hash_val = self.__get_holder_hash(hld, self.__expr_to_hash.get(expr, ""))
  186. self.__evaluate_holder(gui, hld, expr)
  187. lst.append(hash_val)
  188. return lst
  189. @staticmethod
  190. def __get_holder_hash(holder: t.Type[_TaipyBase], expr_hash: str) -> str:
  191. return f"{holder.get_hash()}_{_get_client_var_name(expr_hash)}"
  192. def __evaluate_holder(self, gui: Gui, holder: t.Type[_TaipyBase], expr: str) -> t.Optional[_TaipyBase]:
  193. expr_hash = ""
  194. try:
  195. expr_hash = self.__expr_to_hash.get(expr, "unknownExpr")
  196. holder_hash = self.__get_holder_hash(holder, expr_hash)
  197. expr_value = _getscopeattr_drill(gui, expr_hash)
  198. holder_value = _getscopeattr(gui, holder_hash, None)
  199. if not isinstance(holder_value, _TaipyBase):
  200. holder_value = holder(expr_value, expr_hash)
  201. _setscopeattr(gui, holder_hash, holder_value)
  202. else:
  203. holder_value.set(expr_value)
  204. return holder_value
  205. except Exception as e:
  206. _warn(f"Cannot evaluate expression {holder.__name__}({expr_hash},'{expr_hash}') for {expr}", e)
  207. return None
  208. def evaluate_expr(
  209. self, gui: Gui, expr: str, lazy_declare: t.Optional[bool] = False, lambda_expr: t.Optional[bool] = False
  210. ) -> t.Any:
  211. if not self._is_expression(expr) and not lambda_expr:
  212. return expr
  213. if not lambda_expr and expr.startswith("{lambda ") and expr.endswith("}"):
  214. lambda_expr = True
  215. expr = expr[1:-1]
  216. var_val, var_map = ({}, {}) if lambda_expr else self._analyze_expression(gui, expr, lazy_declare)
  217. expr_hash = None
  218. is_edge_case = False
  219. # The expr_string is placed here in case expr get replaced by edge case
  220. expr_string = expr if lambda_expr else 'f"' + expr.replace('"', '\\"') + '"'
  221. # simplify expression if it only contains var_name
  222. m = _Evaluator.__EXPR_IS_EDGE_CASE.match(expr)
  223. if m and not _Evaluator.__EXPR_EDGE_CASE_F_STRING.match(expr):
  224. expr = m.group(1)
  225. expr_hash = expr if _Evaluator.__EXPR_VALID_VAR_EDGE_CASE.match(expr) else None
  226. is_edge_case = True
  227. # validate whether expression has already been evaluated
  228. module_name = gui._get_locals_context()
  229. not_encoded_expr = expr
  230. expr = f"TpExPr_{_variable_encode(expr, module_name)}"
  231. if expr in self.__expr_to_hash and _hasscopeattr(gui, self.__expr_to_hash[expr]):
  232. return self.__expr_to_hash[expr]
  233. try:
  234. # evaluate expressions
  235. ctx: t.Dict[str, t.Any] = {}
  236. ctx.update(self.__global_ctx)
  237. # entries in var_val are not always seen (NameError) when passed as locals
  238. ctx.update(var_val)
  239. with gui._get_authorization():
  240. expr_evaluated = eval(not_encoded_expr if is_edge_case else expr_string, ctx)
  241. except Exception as e:
  242. exception_str = not_encoded_expr if is_edge_case else expr_string
  243. _warn(
  244. f"Cannot evaluate expression '{_Evaluator._clean_exception_expr(exception_str)}'",
  245. e,
  246. always_show=True,
  247. )
  248. expr_evaluated = None
  249. if lambda_expr and callable(expr_evaluated):
  250. expr_hash = _get_lambda_id(expr_evaluated, module=module_name) # type: ignore[reportArgumentType]
  251. # save the expression if it needs to be re-evaluated
  252. return self.__save_expression(gui, expr, expr_hash, expr_evaluated, var_map, lambda_expr)
  253. def refresh_expr(self, gui: Gui, var_name: str, holder: t.Optional[_TaipyBase]):
  254. """
  255. This function will execute when the __request_var_update function receive a refresh order
  256. """
  257. expr = self.__hash_to_expr.get(var_name)
  258. if not expr:
  259. return
  260. expr_decoded, _ = _variable_decode(expr)
  261. var_map = self.__expr_to_var_map.get(expr, {})
  262. eval_dict = {k: _getscopeattr_drill(gui, gui._bind_var(v)) for k, v in var_map.items()}
  263. if self._is_expression(expr_decoded):
  264. expr_string = 'f"' + _variable_decode(expr)[0].replace('"', '\\"') + '"'
  265. else:
  266. expr_string = expr_decoded
  267. try:
  268. ctx: t.Dict[str, t.Any] = {}
  269. ctx.update(self.__global_ctx)
  270. ctx.update(eval_dict)
  271. expr_evaluated = eval(expr_string, ctx)
  272. _setscopeattr(gui, var_name, expr_evaluated)
  273. if holder is not None:
  274. holder.set(expr_evaluated)
  275. except Exception as e:
  276. _warn(f"Exception raised evaluating {_Evaluator._clean_exception_expr(expr_string)}", e)
  277. def re_evaluate_expr(self, gui: Gui, var_name: str) -> t.Set[str]: # noqa C901
  278. """
  279. This function will execute when the _update_var function is handling
  280. an expression with only a single variable
  281. """
  282. modified_vars: t.Set[str] = set()
  283. # Verify that the current hash is an edge case one (only a single variable inside the original expression)
  284. if var_name.startswith("tp_"):
  285. return modified_vars
  286. expr_original = None
  287. # if var_name starts with tpec_ --> it is an edge case with modified var
  288. if var_name.startswith("tpec_"):
  289. # backup for later reference
  290. var_name_original = var_name
  291. expr_original = self.__hash_to_expr[var_name]
  292. temp_expr_var_map = self.__expr_to_var_map[expr_original]
  293. if len(temp_expr_var_map) <= 1:
  294. index_in_array = int(m[0]) if (m := _Evaluator.__IS_ARRAY_EXPR_RE.findall(expr_original)) else -1
  295. # since this is an edge case --> only 1 item in the dict and that item is the original var
  296. var_name = next(iter(temp_expr_var_map.values()), var_name)
  297. # construct correct var_path to reassign values
  298. var_name_full, _ = _variable_decode(expr_original)
  299. var_name_full = var_name_full.split(".")
  300. var_name_full[0] = var_name
  301. var_name_full = ".".join(var_name_full)
  302. if index_in_array >= 0:
  303. array_val = _getscopeattr(gui, var_name)
  304. if isinstance(array_val, list) and len(array_val) > index_in_array:
  305. array_val[index_in_array] = _getscopeattr(gui, var_name_original)
  306. else:
  307. index_in_array = -1
  308. if index_in_array < 0:
  309. _setscopeattr_drill(gui, var_name_full, _getscopeattr(gui, var_name_original))
  310. else:
  311. # multiple key-value pair in expr_var_map --> expr is special case a["b"]
  312. key = ""
  313. for v in temp_expr_var_map.values():
  314. if isinstance(_getscopeattr(gui, v), _MapDict):
  315. var_name = v
  316. else:
  317. key = v
  318. if key == "":
  319. return modified_vars
  320. _setscopeattr_drill(gui, f"{var_name}.{_getscopeattr(gui, key)}", _getscopeattr(gui, var_name_original))
  321. # A middle check to see if var_name is from _MapDict
  322. if "." in var_name:
  323. var_name = var_name[: var_name.index(".")]
  324. # otherwise, that var_name is correct and doesn't require any resolution
  325. if var_name not in self.__var_to_expr_list:
  326. # _warn("{var_name} not found.")
  327. return modified_vars
  328. # refresh expressions and holders
  329. for expr in self.__var_to_expr_list[var_name]:
  330. expr_decoded, _ = _variable_decode(expr)
  331. hash_expr = self.__expr_to_hash.get(expr, "UnknownExpr")
  332. if expr != var_name and not expr.startswith(_TaipyBase._HOLDER_PREFIX):
  333. expr_var_map = self.__expr_to_var_map.get(expr) # ["x", "y"]
  334. if expr_var_map is None:
  335. _warn(f"Something is amiss with expression list for {expr}.")
  336. else:
  337. eval_dict = {k: _getscopeattr_drill(gui, gui._bind_var(v)) for k, v in expr_var_map.items()}
  338. if self._is_expression(expr_decoded):
  339. expr_string = 'f"' + _variable_decode(expr)[0].replace('"', '\\"') + '"'
  340. else:
  341. expr_string = expr_decoded
  342. try:
  343. ctx: t.Dict[str, t.Any] = {}
  344. ctx.update(self.__global_ctx)
  345. ctx.update(eval_dict)
  346. expr_evaluated = eval(expr_string, ctx)
  347. _setscopeattr(gui, hash_expr, expr_evaluated)
  348. except Exception as e:
  349. _warn(f"Exception raised evaluating {_Evaluator._clean_exception_expr(expr_string)}", e)
  350. # refresh holders if any
  351. for h in self.__expr_to_holders.get(expr, []):
  352. holder_hash = self.__get_holder_hash(h, self.get_hash_from_expr(expr))
  353. if holder_hash not in modified_vars:
  354. _setscopeattr(gui, holder_hash, self.__evaluate_holder(gui, h, expr))
  355. modified_vars.add(holder_hash)
  356. modified_vars.add(hash_expr)
  357. return modified_vars
  358. def _get_instance_in_context(self, name: str):
  359. return self.__global_ctx.get(name)
  360. @staticmethod
  361. def _clean_exception_expr(expr: str):
  362. return _Evaluator.__CLEAN_LAMBDA_RE.sub(r"<lambda>\2", expr)