_scenario_manager.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import datetime
  12. from functools import partial
  13. from typing import Any, Callable, List, Literal, Optional, Union
  14. from taipy.config import Config
  15. from .._entity._entity_ids import _EntityIds
  16. from .._manager._manager import _Manager
  17. from .._repository._abstract_repository import _AbstractRepository
  18. from .._version._version_mixin import _VersionMixin
  19. from ..common.warn_if_inputs_not_ready import _warn_if_inputs_not_ready
  20. from ..config.scenario_config import ScenarioConfig
  21. from ..cycle._cycle_manager_factory import _CycleManagerFactory
  22. from ..cycle.cycle import Cycle
  23. from ..data._data_manager_factory import _DataManagerFactory
  24. from ..exceptions.exceptions import (
  25. DeletingPrimaryScenario,
  26. DifferentScenarioConfigs,
  27. DoesNotBelongToACycle,
  28. InsufficientScenarioToCompare,
  29. InvalidSequence,
  30. InvalidSscenario,
  31. NonExistingComparator,
  32. NonExistingScenario,
  33. NonExistingScenarioConfig,
  34. SequenceTaskConfigDoesNotExistInSameScenarioConfig,
  35. UnauthorizedTagError,
  36. )
  37. from ..job._job_manager_factory import _JobManagerFactory
  38. from ..job.job import Job
  39. from ..notification import EventEntityType, EventOperation, Notifier, _make_event
  40. from ..submission._submission_manager_factory import _SubmissionManagerFactory
  41. from ..submission.submission import Submission
  42. from ..task._task_manager_factory import _TaskManagerFactory
  43. from .scenario import Scenario
  44. from .scenario_id import ScenarioId
  45. class _ScenarioManager(_Manager[Scenario], _VersionMixin):
  46. _AUTHORIZED_TAGS_KEY = "authorized_tags"
  47. _ENTITY_NAME = Scenario.__name__
  48. _EVENT_ENTITY_TYPE = EventEntityType.SCENARIO
  49. _repository: _AbstractRepository
  50. @classmethod
  51. def _get_all(cls, version_number: Optional[str] = None) -> List[Scenario]:
  52. """
  53. Returns all entities.
  54. """
  55. filters = cls._build_filters_with_version(version_number)
  56. return cls._repository._load_all(filters)
  57. @classmethod
  58. def _subscribe(
  59. cls,
  60. callback: Callable[[Scenario, Job], None],
  61. params: Optional[List[Any]] = None,
  62. scenario: Optional[Scenario] = None,
  63. ):
  64. if scenario is None:
  65. scenarios = cls._get_all()
  66. for scn in scenarios:
  67. cls.__add_subscriber(callback, params, scn)
  68. return
  69. cls.__add_subscriber(callback, params, scenario)
  70. @classmethod
  71. def _unsubscribe(
  72. cls,
  73. callback: Callable[[Scenario, Job], None],
  74. params: Optional[List[Any]] = None,
  75. scenario: Optional[Scenario] = None,
  76. ):
  77. if scenario is None:
  78. scenarios = cls._get_all()
  79. for scn in scenarios:
  80. cls.__remove_subscriber(callback, params, scn)
  81. return
  82. cls.__remove_subscriber(callback, params, scenario)
  83. @classmethod
  84. def __add_subscriber(cls, callback, params, scenario: Scenario):
  85. scenario._add_subscriber(callback, params)
  86. Notifier.publish(
  87. _make_event(scenario, EventOperation.UPDATE, attribute_name="subscribers", attribute_value=params)
  88. )
  89. @classmethod
  90. def __remove_subscriber(cls, callback, params, scenario: Scenario):
  91. scenario._remove_subscriber(callback, params)
  92. Notifier.publish(
  93. _make_event(scenario, EventOperation.UPDATE, attribute_name="subscribers", attribute_value=params)
  94. )
  95. @classmethod
  96. def _create(
  97. cls,
  98. config: ScenarioConfig,
  99. creation_date: Optional[datetime.datetime] = None,
  100. name: Optional[str] = None,
  101. ) -> Scenario:
  102. _task_manager = _TaskManagerFactory._build_manager()
  103. _data_manager = _DataManagerFactory._build_manager()
  104. scenario_id = Scenario._new_id(str(config.id))
  105. cycle = (
  106. _CycleManagerFactory._build_manager()._get_or_create(config.frequency, creation_date)
  107. if config.frequency
  108. else None
  109. )
  110. cycle_id = cycle.id if cycle else None
  111. tasks = (
  112. _task_manager._bulk_get_or_create(config.task_configs, cycle_id, scenario_id) if config.task_configs else []
  113. )
  114. additional_data_nodes = (
  115. _data_manager._bulk_get_or_create(config.additional_data_node_configs, cycle_id, scenario_id)
  116. if config.additional_data_node_configs
  117. else {}
  118. )
  119. sequences = {}
  120. tasks_and_config_id_maps = {task.config_id: task for task in tasks}
  121. for sequence_name, sequence_task_configs in config.sequences.items():
  122. sequence_tasks = []
  123. non_existing_sequence_task_config_in_scenario_config = set()
  124. for sequence_task_config in sequence_task_configs:
  125. if task := tasks_and_config_id_maps.get(sequence_task_config.id):
  126. sequence_tasks.append(task)
  127. else:
  128. non_existing_sequence_task_config_in_scenario_config.add(sequence_task_config.id)
  129. if non_existing_sequence_task_config_in_scenario_config:
  130. raise SequenceTaskConfigDoesNotExistInSameScenarioConfig(
  131. list(non_existing_sequence_task_config_in_scenario_config), sequence_name, str(config.id)
  132. )
  133. sequences[sequence_name] = {Scenario._SEQUENCE_TASKS_KEY: sequence_tasks}
  134. is_primary_scenario = len(cls._get_all_by_cycle(cycle)) == 0 if cycle else False
  135. props = config._properties.copy()
  136. if name:
  137. props["name"] = name
  138. version = cls._get_latest_version()
  139. scenario = Scenario(
  140. config_id=str(config.id),
  141. tasks=set(tasks),
  142. properties=props,
  143. additional_data_nodes=set(additional_data_nodes.values()),
  144. scenario_id=scenario_id,
  145. creation_date=creation_date,
  146. is_primary=is_primary_scenario,
  147. cycle=cycle,
  148. version=version,
  149. sequences=sequences,
  150. )
  151. for task in tasks:
  152. if scenario_id not in task._parent_ids:
  153. task._parent_ids.update([scenario_id])
  154. _task_manager._set(task)
  155. for dn in additional_data_nodes.values():
  156. if scenario_id not in dn._parent_ids:
  157. dn._parent_ids.update([scenario_id])
  158. _data_manager._set(dn)
  159. cls._set(scenario)
  160. if not scenario._is_consistent():
  161. raise InvalidSscenario(scenario.id)
  162. actual_sequences = scenario._get_sequences()
  163. for sequence_name in sequences.keys():
  164. if not actual_sequences[sequence_name]._is_consistent():
  165. raise InvalidSequence(actual_sequences[sequence_name].id)
  166. Notifier.publish(_make_event(actual_sequences[sequence_name], EventOperation.CREATION))
  167. Notifier.publish(_make_event(scenario, EventOperation.CREATION))
  168. return scenario
  169. @classmethod
  170. def _is_submittable(cls, scenario: Union[Scenario, ScenarioId]) -> bool:
  171. if isinstance(scenario, str):
  172. scenario = cls._get(scenario)
  173. return isinstance(scenario, Scenario) and scenario.is_ready_to_run()
  174. @classmethod
  175. def _submit(
  176. cls,
  177. scenario: Union[Scenario, ScenarioId],
  178. callbacks: Optional[List[Callable]] = None,
  179. force: bool = False,
  180. wait: bool = False,
  181. timeout: Optional[Union[float, int]] = None,
  182. check_inputs_are_ready: bool = True,
  183. **properties,
  184. ) -> Submission:
  185. scenario_id = scenario.id if isinstance(scenario, Scenario) else scenario
  186. if not isinstance(scenario, Scenario):
  187. scenario = cls._get(scenario_id)
  188. if scenario is None or not cls._exists(scenario_id):
  189. raise NonExistingScenario(scenario_id)
  190. callbacks = callbacks or []
  191. scenario_subscription_callback = cls.__get_status_notifier_callbacks(scenario) + callbacks
  192. if check_inputs_are_ready:
  193. _warn_if_inputs_not_ready(scenario.get_inputs())
  194. submission = (
  195. _TaskManagerFactory._build_manager()
  196. ._orchestrator()
  197. .submit(
  198. scenario,
  199. callbacks=scenario_subscription_callback,
  200. force=force,
  201. wait=wait,
  202. timeout=timeout,
  203. **properties,
  204. )
  205. )
  206. Notifier.publish(_make_event(scenario, EventOperation.SUBMISSION))
  207. return submission
  208. @classmethod
  209. def __get_status_notifier_callbacks(cls, scenario: Scenario) -> List:
  210. return [partial(c.callback, *c.params, scenario) for c in scenario.subscribers]
  211. @classmethod
  212. def _get_primary(cls, cycle: Cycle) -> Optional[Scenario]:
  213. scenarios = cls._get_all_by_cycle(cycle)
  214. for scenario in scenarios:
  215. if scenario.is_primary:
  216. return scenario
  217. return None
  218. @classmethod
  219. def _get_by_tag(cls, cycle: Cycle, tag: str) -> Optional[Scenario]:
  220. scenarios = cls._get_all_by_cycle(cycle)
  221. for scenario in scenarios:
  222. if scenario.has_tag(tag):
  223. return scenario
  224. return None
  225. @classmethod
  226. def _get_all_by_tag(cls, tag: str) -> List[Scenario]:
  227. return [scenario for scenario in cls._get_all() if scenario.has_tag(tag)]
  228. @classmethod
  229. def _get_all_by_cycle(cls, cycle: Cycle) -> List[Scenario]:
  230. filters = cls._build_filters_with_version("all")
  231. if not filters:
  232. filters = [{}]
  233. for fil in filters:
  234. fil.update({"cycle": cycle.id})
  235. return cls._get_all_by(filters)
  236. @classmethod
  237. def _get_primary_scenarios(cls) -> List[Scenario]:
  238. return [scenario for scenario in cls._get_all() if scenario.is_primary]
  239. @classmethod
  240. def _sort_scenarios(
  241. cls,
  242. scenarios: List[Scenario],
  243. descending: bool = False,
  244. sort_key: Literal["name", "id", "config_id", "creation_date", "tags"] = "name",
  245. ) -> List[Scenario]:
  246. if sort_key in ["name", "config_id", "creation_date", "tags"]:
  247. if sort_key == "tags":
  248. scenarios.sort(key=lambda x: (tuple(sorted(x.tags)), x.id), reverse=descending)
  249. else:
  250. scenarios.sort(key=lambda x: (getattr(x, sort_key), x.id), reverse=descending)
  251. elif sort_key == "id":
  252. scenarios.sort(key=lambda x: x.id, reverse=descending)
  253. else:
  254. scenarios.sort(key=lambda x: (x.name, x.id), reverse=descending)
  255. return scenarios
  256. @classmethod
  257. def _is_promotable_to_primary(cls, scenario: Union[Scenario, ScenarioId]) -> bool:
  258. if isinstance(scenario, str):
  259. scenario = cls._get(scenario)
  260. if scenario and not scenario.is_primary and scenario.cycle:
  261. return True
  262. return False
  263. @classmethod
  264. def _set_primary(cls, scenario: Scenario):
  265. if not scenario.cycle:
  266. raise DoesNotBelongToACycle(
  267. f"Can't set scenario {scenario.id} to primary because it doesn't belong to a cycle."
  268. )
  269. primary_scenario = cls._get_primary(scenario.cycle)
  270. # To prevent SAME scenario updating out of Context Manager
  271. if primary_scenario and primary_scenario != scenario:
  272. primary_scenario.is_primary = False # type: ignore
  273. scenario.is_primary = True # type: ignore
  274. @classmethod
  275. def _tag(cls, scenario: Scenario, tag: str):
  276. tags = scenario.properties.get(cls._AUTHORIZED_TAGS_KEY, set())
  277. if len(tags) > 0 and tag not in tags:
  278. raise UnauthorizedTagError(f"Tag `{tag}` not authorized by scenario configuration `{scenario.config_id}`")
  279. if scenario.cycle:
  280. if old_tagged_scenario := cls._get_by_tag(scenario.cycle, tag):
  281. old_tagged_scenario.remove_tag(tag)
  282. cls._set(old_tagged_scenario)
  283. scenario._add_tag(tag)
  284. cls._set(scenario)
  285. Notifier.publish(
  286. _make_event(scenario, EventOperation.UPDATE, attribute_name="tags", attribute_value=scenario.tags)
  287. )
  288. @classmethod
  289. def _untag(cls, scenario: Scenario, tag: str):
  290. scenario._remove_tag(tag)
  291. cls._set(scenario)
  292. Notifier.publish(
  293. _make_event(scenario, EventOperation.UPDATE, attribute_name="tags", attribute_value=scenario.tags)
  294. )
  295. @classmethod
  296. def _compare(cls, *scenarios: Scenario, data_node_config_id: Optional[str] = None):
  297. if len(scenarios) < 2:
  298. raise InsufficientScenarioToCompare("At least two scenarios are required to compare.")
  299. if not all(scenarios[0].config_id == scenario.config_id for scenario in scenarios):
  300. raise DifferentScenarioConfigs("Scenarios to compare must have the same configuration.")
  301. if scenario_config := _ScenarioManager.__get_config(scenarios[0]):
  302. results = {}
  303. if data_node_config_id:
  304. if data_node_config_id in scenario_config.comparators.keys():
  305. dn_comparators = {data_node_config_id: scenario_config.comparators[data_node_config_id]}
  306. else:
  307. raise NonExistingComparator(f"Data node config {data_node_config_id} has no comparator.")
  308. else:
  309. dn_comparators = scenario_config.comparators
  310. for data_node_config_id, comparators in dn_comparators.items():
  311. data_nodes = [scenario.__getattr__(data_node_config_id).read() for scenario in scenarios]
  312. results[data_node_config_id] = {
  313. comparator.__name__: comparator(*data_nodes) for comparator in comparators
  314. }
  315. return results
  316. else:
  317. raise NonExistingScenarioConfig(scenarios[0].config_id)
  318. @staticmethod
  319. def __get_config(scenario: Scenario):
  320. return Config.scenarios.get(scenario.config_id, None)
  321. @classmethod
  322. def _is_deletable(cls, scenario: Union[Scenario, ScenarioId]) -> bool:
  323. if isinstance(scenario, str):
  324. scenario = cls._get(scenario)
  325. if scenario.is_primary:
  326. if len(cls._get_all_by_cycle(scenario.cycle)) > 1:
  327. return False
  328. return True
  329. @classmethod
  330. def _delete(cls, scenario_id: ScenarioId):
  331. scenario = cls._get(scenario_id)
  332. if not cls._is_deletable(scenario):
  333. raise DeletingPrimaryScenario(
  334. f"Scenario {scenario.id}, which has config id {scenario.config_id}, is primary and there are "
  335. f"other scenarios in the same cycle. "
  336. )
  337. if scenario.is_primary:
  338. _CycleManagerFactory._build_manager()._delete(scenario.cycle.id)
  339. super()._delete(scenario_id)
  340. @classmethod
  341. def _hard_delete(cls, scenario_id: ScenarioId):
  342. scenario = cls._get(scenario_id)
  343. if not cls._is_deletable(scenario):
  344. raise DeletingPrimaryScenario(
  345. f"Scenario {scenario.id}, which has config id {scenario.config_id}, is primary and there are "
  346. f"other scenarios in the same cycle. "
  347. )
  348. if scenario.is_primary:
  349. _CycleManagerFactory._build_manager()._hard_delete(scenario.cycle.id)
  350. else:
  351. entity_ids_to_delete = cls._get_children_entity_ids(scenario)
  352. entity_ids_to_delete.scenario_ids.add(scenario.id)
  353. cls._delete_entities_of_multiple_types(entity_ids_to_delete)
  354. @classmethod
  355. def _delete_by_version(cls, version_number: str):
  356. """
  357. Deletes scenario by the version number.
  358. Check if the cycle is only attached to this scenario, then delete it.
  359. """
  360. for scenario in cls._repository._search("version", version_number):
  361. if scenario.cycle and len(cls._get_all_by_cycle(scenario.cycle)) == 1:
  362. _CycleManagerFactory._build_manager()._delete(scenario.cycle.id)
  363. super()._delete(scenario.id)
  364. @classmethod
  365. def _get_children_entity_ids(cls, scenario: Scenario) -> _EntityIds:
  366. entity_ids = _EntityIds()
  367. for sequence in scenario.sequences.values():
  368. if sequence.owner_id == scenario.id:
  369. entity_ids.sequence_ids.add(sequence.id)
  370. for task in scenario.tasks.values():
  371. if task.owner_id == scenario.id:
  372. entity_ids.task_ids.add(task.id)
  373. for data_node in scenario.data_nodes.values():
  374. if data_node.owner_id == scenario.id:
  375. entity_ids.data_node_ids.add(data_node.id)
  376. jobs = _JobManagerFactory._build_manager()._get_all()
  377. for job in jobs:
  378. if job.task.id in entity_ids.task_ids:
  379. entity_ids.job_ids.add(job.id)
  380. submissions = _SubmissionManagerFactory._build_manager()._get_all()
  381. submitted_entity_ids = list(entity_ids.scenario_ids.union(entity_ids.sequence_ids, entity_ids.task_ids))
  382. for submission in submissions:
  383. if submission.entity_id in submitted_entity_ids or submission.entity_id == scenario.id:
  384. entity_ids.submission_ids.add(submission.id)
  385. return entity_ids
  386. @classmethod
  387. def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None) -> List[Scenario]:
  388. """
  389. Get all scenarios by its config id.
  390. """
  391. filters = cls._build_filters_with_version(version_number)
  392. if not filters:
  393. filters = [{}]
  394. for fil in filters:
  395. fil.update({"config_id": config_id})
  396. return cls._repository._load_all(filters)