_reload.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import functools
  12. from typing import Dict, Type
  13. import threading
  14. from ...common._check_dependencies import EnterpriseEditionUtils
  15. from .._manager._manager import _Manager
  16. from ..common._utils import _load_fct
  17. from ..notification import EventOperation, Notifier, _make_event
  18. class _Reloader:
  19. """The _Reloader singleton class"""
  20. _instance = None
  21. _lock = threading.RLock()
  22. _managers: Dict[str, Type[_Manager]] = {}
  23. def __new__(cls, *args, **kwargs):
  24. with cls._lock:
  25. if not isinstance(cls._instance, cls):
  26. cls._instance = super().__new__(cls, *args, **kwargs)
  27. cls._instance._no_reload_context = False # Initialize once
  28. cls._instance._context_depth = 0 # Track nested `with` usage
  29. cls._managers = cls._build_managers()
  30. return cls._instance
  31. def _reload(self, manager: str, obj):
  32. if self._no_reload_context:
  33. return obj
  34. entity = self._get_manager(manager)._get(obj, obj)
  35. if obj._is_in_context and hasattr(entity, "_properties"):
  36. if obj._properties._pending_changes:
  37. entity._properties._pending_changes = obj._properties._pending_changes
  38. if obj._properties._pending_deletions:
  39. entity._properties._pending_deletions = obj._properties._pending_deletions
  40. entity._properties._entity_owner = obj
  41. return entity
  42. def __enter__(self):
  43. with self._lock:
  44. self._context_depth += 1
  45. self._no_reload_context = True
  46. return self
  47. def __exit__(self, exc_type, exc_value, exc_traceback):
  48. with self._lock:
  49. self._context_depth -= 1
  50. if self._context_depth == 0:
  51. self._no_reload_context = False
  52. @classmethod
  53. @functools.lru_cache
  54. def _build_managers(cls) -> Dict[str, Type[_Manager]]:
  55. from ..cycle._cycle_manager_factory import _CycleManagerFactory
  56. from ..data._data_manager_factory import _DataManagerFactory
  57. from ..job._job_manager_factory import _JobManagerFactory
  58. from ..scenario._scenario_manager_factory import _ScenarioManagerFactory
  59. from ..sequence._sequence_manager_factory import _SequenceManagerFactory
  60. from ..submission._submission_manager_factory import _SubmissionManagerFactory
  61. from ..task._task_manager_factory import _TaskManagerFactory
  62. managers: Dict[str, Type[_Manager]] = {
  63. "scenario": _ScenarioManagerFactory._build_manager(),
  64. "sequence": _SequenceManagerFactory._build_manager(),
  65. "data": _DataManagerFactory._build_manager(),
  66. "cycle": _CycleManagerFactory._build_manager(),
  67. "job": _JobManagerFactory._build_manager(),
  68. "task": _TaskManagerFactory._build_manager(),
  69. "submission": _SubmissionManagerFactory._build_manager(),
  70. }
  71. if EnterpriseEditionUtils._using_enterprise():
  72. _build_enterprise_managers = _load_fct(
  73. EnterpriseEditionUtils._TAIPY_ENTERPRISE_CORE_MODULE + "._entity.utils", "_build_enterprise_managers"
  74. )
  75. managers.update(_build_enterprise_managers())
  76. return managers
  77. @classmethod
  78. @functools.lru_cache
  79. def _get_manager(cls, manager: str) -> Type[_Manager]:
  80. return cls._managers[manager]
  81. def _self_reload(manager: str):
  82. def __reload(fct):
  83. @functools.wraps(fct)
  84. def _do_reload(self, *args, **kwargs):
  85. self = _Reloader()._reload(manager, self)
  86. return fct(self, *args, **kwargs)
  87. return _do_reload
  88. return __reload
  89. def _self_setter(manager):
  90. def __update_entity(fct):
  91. @functools.wraps(fct)
  92. def _do_update_entity(self, *args, **kwargs):
  93. fct(self, *args, **kwargs)
  94. value = args[0] if len(args) == 1 else args
  95. event = _make_event(
  96. self,
  97. EventOperation.UPDATE,
  98. attribute_name=fct.__name__,
  99. attribute_value=value,
  100. )
  101. if not self._is_in_context:
  102. entity = _Reloader()._reload(manager, self)
  103. fct(entity, *args, **kwargs)
  104. _Reloader._get_manager(manager)._update(entity)
  105. Notifier.publish(event)
  106. else:
  107. self._in_context_attributes_changed_collector.append(event)
  108. return _do_update_entity
  109. return __update_entity