_scenario_duplicator.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from datetime import datetime
  12. from typing import Dict, Optional, Set, Union
  13. from taipy.common.config import Config
  14. from ..common._listattributes import _ListAttributes
  15. from ..common.scope import Scope
  16. from ..cycle._cycle_manager_factory import _CycleManagerFactory
  17. from ..data._data_duplicator import _DataDuplicator
  18. from ..data._data_manager_factory import _DataManagerFactory
  19. from ..data.data_node import DataNode
  20. from ..notification import EventOperation, Notifier, _make_event
  21. from ..sequence.sequence import Sequence
  22. from ..task._task_manager_factory import _TaskManagerFactory
  23. from ..task.task import Task
  24. from .scenario import Scenario
  25. class _ScenarioDuplicator:
  26. """A service to duplicate a scenario and related entities."""
  27. def __init__(self, scenario: Scenario, data_to_duplicate: Union[bool, Set[str]] = True):
  28. self.scenario: Scenario = scenario
  29. if data_to_duplicate is True:
  30. self.data_to_duplicate: Set[str] = set(self.scenario.data_nodes.keys())
  31. elif isinstance(data_to_duplicate, set):
  32. self.data_to_duplicate = data_to_duplicate
  33. else:
  34. self.data_to_duplicate = set()
  35. self.new_scenario: Scenario = None # type: ignore
  36. self.new_cycle_id: Optional[str] = None
  37. self.new_tasks: Dict[str, Task] = {}
  38. self.new_data_nodes: Dict[str, DataNode] = {}
  39. from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
  40. self.__scenario_manager = _ScenarioManagerFactory._build_manager()
  41. self.__cycle_manager = _CycleManagerFactory._build_manager()
  42. self.__task_manager = _TaskManagerFactory._build_manager()
  43. self.__data_manager = _DataManagerFactory._build_manager()
  44. def duplicate(self, new_creation_date: Optional[datetime] = None, new_name: Optional[str] = None) -> Scenario:
  45. """Create a duplicated scenario with its related entities
  46. Create a scenario with the same configuration as the original scenario, but with
  47. a new creation date and name. Creation events are published for the new scenario,
  48. tasks, and data nodes. The data nodes are duplicated if the `data_to_duplicate`
  49. is set to True or a set of data node configuration ids. The new scenario is returned.
  50. Arguments:
  51. new_creation_date (Optional[datetime]): The creation date of the new scenario.
  52. If not provided, the current date and time is used.
  53. new_name (Optional[str]): The name of the new scenario. If not provided, the
  54. name of the original scenario is used.
  55. Returns:
  56. The newly created scenario.
  57. """
  58. self.new_scenario = self.__init_new_scenario(new_creation_date or datetime.now(), new_name)
  59. for dn in self.scenario.additional_data_nodes.values():
  60. self.new_scenario._additional_data_nodes.add(self._duplicate_datanode(dn).id) # type: ignore
  61. for task in self.scenario.tasks.values():
  62. self.new_scenario._tasks.add(self._duplicate_task(task).id) # type: ignore
  63. self._duplicate_sequences()
  64. self.__scenario_manager._repository._save(self.new_scenario)
  65. Notifier.publish(_make_event(self.new_scenario, EventOperation.CREATION))
  66. return self.new_scenario
  67. def _duplicate_task(self, task: Task) -> Task:
  68. if task.scope == Scope.GLOBAL:
  69. # Task and children data nodes already exist. No need to duplicate.
  70. self.new_tasks[task.config_id] = task
  71. task._parent_ids.update([self.new_scenario.id])
  72. self.__task_manager._repository._save(task) # Through the repository so we don't set data nodes
  73. Notifier.publish(_make_event(task, EventOperation.UPDATE, "parent_ids", task._parent_ids))
  74. return task
  75. if task.scope == Scope.CYCLE and self.scenario.cycle.id == self.new_cycle_id:
  76. # Task and children data nodes already exist. No need to duplicate.
  77. self.new_tasks[task.config_id] = task
  78. task._parent_ids.update([self.new_scenario.id])
  79. self.__task_manager._repository._save(task) # Through the repository so we don't set data nodes
  80. Notifier.publish(_make_event(task, EventOperation.UPDATE, "parent_ids", task._parent_ids))
  81. return task
  82. if task.scope == Scope.CYCLE:
  83. existing_tasks = self.__task_manager._repository._get_by_configs_and_owner_ids( # type: ignore
  84. [(task.config_id, self.new_cycle_id)], self.__task_manager._build_filters_with_version(None)
  85. )
  86. if existing_tasks:
  87. # Task and children data nodes already exist. No need to duplicate.
  88. existing_t = existing_tasks[(task.config_id, self.new_cycle_id)]
  89. self.new_tasks[task.config_id] = existing_t
  90. existing_t._parent_ids.update([self.new_scenario.id])
  91. self.__task_manager._repository._save(existing_t) # Don't set data nodes
  92. Notifier.publish(_make_event(existing_t, EventOperation.UPDATE, "parent_ids", existing_t._parent_ids))
  93. return existing_t
  94. new_task = self.__init_new_task(task)
  95. for input in task.input.values():
  96. new_task._input[input.config_id] = self._duplicate_datanode(input, new_task)
  97. for output in task.output.values():
  98. new_task._output[output.config_id] = self._duplicate_datanode(output, new_task)
  99. self.new_tasks[task.config_id] = new_task
  100. self.__task_manager._repository._save(new_task)
  101. Notifier.publish(_make_event(new_task, EventOperation.CREATION))
  102. return new_task
  103. def _duplicate_datanode(self, dn: DataNode, task: Optional[Task] = None) -> DataNode:
  104. if dn.config_id in self.new_data_nodes:
  105. # Data node already created from another task. No need to duplicate.
  106. new_dn = self.new_data_nodes[dn.config_id]
  107. new_dn._parent_ids.update([task.id]) if task else new_dn._parent_ids.update([self.new_scenario.id])
  108. self.__data_manager._repository._save(new_dn)
  109. Notifier.publish(_make_event(new_dn, EventOperation.UPDATE, "parent_ids", new_dn._parent_ids))
  110. return new_dn
  111. if dn.scope == Scope.GLOBAL:
  112. # Data node already exists. No need to duplicate.
  113. dn._parent_ids.update([task.id]) if task else dn._parent_ids.update([self.new_scenario.id])
  114. self.__data_manager._update(dn)
  115. Notifier.publish(_make_event(dn, EventOperation.UPDATE, "parent_ids", dn._parent_ids))
  116. return dn
  117. if dn.scope == Scope.CYCLE and self.scenario.cycle.id == self.new_cycle_id:
  118. # Data node already exists. No need to duplicate.
  119. dn._parent_ids.update([task.id]) if task else dn._parent_ids.update([self.new_scenario.id])
  120. self.__data_manager._update(dn)
  121. Notifier.publish(_make_event(dn, EventOperation.UPDATE, "parent_ids", dn._parent_ids))
  122. return dn
  123. if dn.scope == Scope.CYCLE:
  124. existing_dns = self.__data_manager._repository._get_by_configs_and_owner_ids( # type: ignore
  125. [(dn.config_id, self.new_cycle_id)], self.__data_manager._build_filters_with_version(None)
  126. )
  127. if existing_dns.get((dn.config_id, self.new_cycle_id)):
  128. ex_dn = existing_dns[(dn.config_id, self.new_cycle_id)]
  129. # A cycle data node with same config and same cycle owner already exist. No need to duplicate it.
  130. ex_dn._parent_ids.update([task.id]) if task else ex_dn._parent_ids.update([self.new_scenario.id])
  131. self.__data_manager._update(ex_dn)
  132. Notifier.publish(_make_event(ex_dn, EventOperation.UPDATE, "parent_ids", ex_dn._parent_ids))
  133. return ex_dn
  134. new_dn = self.__init_new_datanode(dn, task)
  135. if new_dn._config_id in self.data_to_duplicate:
  136. duplicator = _DataDuplicator(dn)
  137. if duplicator.can_duplicate():
  138. duplicator.duplicate_data(new_dn)
  139. self.new_data_nodes[dn.config_id] = new_dn
  140. self.__data_manager._repository._save(new_dn)
  141. Notifier.publish(_make_event(new_dn, EventOperation.CREATION))
  142. return new_dn
  143. def _duplicate_sequences(self):
  144. new_sequences = {}
  145. for seq_name, seq_data in self.scenario._sequences.items():
  146. new_sequence_id = Sequence._new_id(seq_name, self.new_scenario.id)
  147. new_sequence = {
  148. Scenario._SEQUENCE_PROPERTIES_KEY: seq_data[Scenario._SEQUENCE_PROPERTIES_KEY],
  149. Scenario._SEQUENCE_TASKS_KEY: [],
  150. } # We do not want to duplicate the subscribers
  151. for task in seq_data[Scenario._SEQUENCE_TASKS_KEY]:
  152. new_task = self.new_tasks[task.config_id]
  153. new_task._parent_ids.update([new_sequence_id])
  154. self.__task_manager._repository._save(new_task)
  155. new_sequence[Scenario._SEQUENCE_TASKS_KEY].append(self.new_tasks[task.config_id])
  156. new_sequences[seq_name] = new_sequence
  157. self.new_scenario._sequences = new_sequences
  158. def __init_new_scenario(self, new_creation_date: datetime, new_name: Optional[str]) -> Scenario:
  159. self.new_scenario = self.__scenario_manager._get(self.scenario)
  160. self.new_scenario.id = self.new_scenario._new_id(self.scenario.config_id)
  161. self.__scenario_manager._repository._save(self.new_scenario)
  162. self.new_scenario._creation_date = new_creation_date
  163. if frequency := Config.scenarios[self.scenario.config_id].frequency:
  164. cycle = self.__cycle_manager._get_or_create(frequency, new_creation_date)
  165. self.new_scenario._cycle = cycle
  166. self.new_scenario._primary_scenario = len(self.__scenario_manager._get_all_by_cycle(cycle)) == 0
  167. self.new_cycle_id = cycle.id
  168. else:
  169. self.new_scenario._primary_scenario = False
  170. if hasattr(self.new_scenario._properties, "_entity_owner"):
  171. self.new_scenario._properties._entity_owner = self.new_scenario
  172. if new_name:
  173. self.new_scenario._properties["name"] = new_name
  174. self.new_scenario._subscribers = _ListAttributes(self.new_scenario, [])
  175. self.new_scenario._tasks = set() # To be potentially updated later
  176. self.new_scenario._sequences = {} # To be potentially updated later
  177. self.new_scenario._additional_data_nodes = set() # To be potentially updated later
  178. return self.new_scenario
  179. def __init_new_task(self, task: Task) -> Task:
  180. new_task = self.__task_manager._get(task)
  181. new_task.id = new_task._new_id(task.config_id)
  182. new_task._owner_id = self.__task_manager._get_owner_id(task.scope, self.new_cycle_id, self.new_scenario.id)
  183. new_task._parent_ids = {self.new_scenario.id}
  184. if hasattr(new_task._properties, "_entity_owner"):
  185. new_task._properties._entity_owner = new_task
  186. new_task._input = {} # To be potentially updated later
  187. new_task._output = {} # To be potentially updated later
  188. return new_task
  189. def __init_new_datanode(self, dn: DataNode, task: Optional[Task] = None) -> DataNode:
  190. new_dn = self.__data_manager._get(dn)
  191. new_dn.id = DataNode._new_id(dn._config_id)
  192. new_dn._owner_id = self.new_scenario.id if dn.scope == Scope.SCENARIO else self.new_cycle_id
  193. new_dn._parent_ids = {task.id} if task else {self.new_scenario.id}
  194. if hasattr(new_dn._properties, "_entity_owner"):
  195. new_dn._properties._entity_owner = new_dn
  196. new_dn._last_edit_date = None # To be potentially updated later
  197. new_dn._edits = [] # To be potentially updated later
  198. return new_dn