_variable_directory.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import re
  12. import typing as t
  13. from types import FrameType
  14. from ._locals_context import _LocalsContext
  15. from .get_imported_var import _get_imported_var
  16. from .get_module_name import _get_module_name_from_frame, _get_module_name_from_imported_var
  17. class _VariableDirectory:
  18. def __init__(self, locals_context: _LocalsContext):
  19. self._locals_context = locals_context
  20. self._default_module = ""
  21. self._var_dir: t.Dict[str, t.Dict] = {}
  22. self._var_head: t.Dict[str, t.List[t.Tuple[str, str]]] = {}
  23. self._imported_var_dir: t.Dict[str, t.List[t.Tuple[str, str, str]]] = {}
  24. def set_default(self, frame: FrameType) -> None:
  25. self._default_module = _get_module_name_from_frame(frame)
  26. self.add_frame(frame)
  27. def add_frame(self, frame: t.Optional[FrameType]) -> None:
  28. if frame is None:
  29. return
  30. module_name = _get_module_name_from_frame(frame)
  31. if module_name not in self._imported_var_dir:
  32. imported_var_list = _get_imported_var(frame)
  33. self._imported_var_dir[module_name] = imported_var_list
  34. def pre_process_module_import_all(self) -> None:
  35. for imported_dir in self._imported_var_dir.values():
  36. additional_var_list: t.List[t.Tuple[str, str, str]] = []
  37. for name, asname, module in imported_dir:
  38. if name != "*" or asname != "*":
  39. continue
  40. if module not in self._locals_context._locals_map.keys():
  41. continue
  42. with self._locals_context.set_locals_context(module):
  43. additional_var_list.extend(
  44. (v, v, module) for v in self._locals_context.get_locals().keys() if not v.startswith("_")
  45. )
  46. imported_dir.extend(additional_var_list)
  47. def process_imported_var(self) -> None:
  48. self.pre_process_module_import_all()
  49. default_imported_dir = self._imported_var_dir[self._default_module]
  50. with self._locals_context.set_locals_context(self._default_module):
  51. for name, asname, module in default_imported_dir:
  52. if name == "*" and asname == "*":
  53. continue
  54. imported_module_name = _get_module_name_from_imported_var(
  55. name, self._locals_context.get_locals().get(asname, None), module
  56. )
  57. temp_var_name = self.add_var(asname, self._default_module)
  58. self.add_var(name, imported_module_name, temp_var_name)
  59. for k, v in self._imported_var_dir.items():
  60. with self._locals_context.set_locals_context(k):
  61. for name, asname, module in v:
  62. if name == "*" and asname == "*":
  63. continue
  64. imported_module_name = _get_module_name_from_imported_var(
  65. name, self._locals_context.get_locals().get(asname, None), module
  66. )
  67. var_name = self.get_var(name, imported_module_name)
  68. var_asname = self.get_var(asname, k)
  69. if var_name is None and var_asname is None:
  70. temp_var_name = self.add_var(asname, k)
  71. self.add_var(name, imported_module_name, temp_var_name)
  72. elif var_name is not None:
  73. self.add_var(asname, k, var_name)
  74. else:
  75. self.add_var(name, imported_module_name, var_asname)
  76. def add_var(self, name: str, module: t.Optional[str], var_name: t.Optional[str] = None) -> str:
  77. if module is None:
  78. module = self._default_module
  79. if gv := self.get_var(name, module):
  80. return gv
  81. var_encode = _variable_encode(name, module) if module != self._default_module else name
  82. if var_name is None:
  83. var_name = var_encode
  84. self.__add_var_head(name, module, var_name)
  85. if var_encode != var_name:
  86. var_name_decode, module_decode = _variable_decode(var_name)
  87. if module_decode is None:
  88. module_decode = self._default_module
  89. self.__add_var_head(var_name_decode, module_decode, var_encode)
  90. if name not in self._var_dir:
  91. self._var_dir[name] = {module: var_name}
  92. else:
  93. self._var_dir[name][module] = var_name
  94. return var_name
  95. def __add_var_head(self, name: str, module: str, var_head: str) -> None:
  96. if var_head not in self._var_head:
  97. self._var_head[var_head] = [(name, module)]
  98. else:
  99. self._var_head[var_head].append((name, module))
  100. def get_var(self, name: str, module: str) -> t.Optional[str]:
  101. if name in self._var_dir and module in self._var_dir[name]:
  102. return self._var_dir[name][module]
  103. return None
  104. _MODULE_NAME_MAP: t.List[str] = []
  105. _MODULE_ID = "_TPMDL_"
  106. _RE_TPMDL_DECODE = re.compile(r"(.*?)" + _MODULE_ID + r"(\d+)$")
  107. def _is_moduled_variable(var_name: str):
  108. return _MODULE_ID in var_name
  109. def _variable_encode(var_name: str, module_name: t.Optional[str]):
  110. if module_name is None:
  111. return var_name
  112. if module_name not in _MODULE_NAME_MAP:
  113. _MODULE_NAME_MAP.append(module_name)
  114. return f"{var_name}{_MODULE_ID}{_MODULE_NAME_MAP.index(module_name)}"
  115. def _variable_decode(var_name: str):
  116. from ._evaluator import _Evaluator
  117. if result := _RE_TPMDL_DECODE.match(var_name):
  118. return _Evaluator._expr_decode(str(result[1])), _MODULE_NAME_MAP[int(result[2])]
  119. return _Evaluator._expr_decode(var_name), None
  120. def _reset_name_map():
  121. _MODULE_NAME_MAP.clear()