scenario.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from __future__ import annotations
  12. import pathlib
  13. import uuid
  14. from datetime import datetime
  15. from typing import Any, Callable, Dict, List, Optional, Set, Union
  16. import networkx as nx
  17. from taipy.config.common._template_handler import _TemplateHandler as _tpl
  18. from taipy.config.common._validate_id import _validate_id
  19. from .._entity._entity import _Entity
  20. from .._entity._labeled import _Labeled
  21. from .._entity._properties import _Properties
  22. from .._entity._reload import _Reloader, _self_reload, _self_setter
  23. from .._entity.submittable import Submittable
  24. from .._version._version_manager_factory import _VersionManagerFactory
  25. from ..common._listattributes import _ListAttributes
  26. from ..common._utils import _Subscriber
  27. from ..cycle.cycle import Cycle
  28. from ..data.data_node import DataNode
  29. from ..data.data_node_id import DataNodeId
  30. from ..exceptions.exceptions import (
  31. InvalidSequence,
  32. NonExistingDataNode,
  33. NonExistingSequence,
  34. NonExistingTask,
  35. SequenceAlreadyExists,
  36. SequenceTaskDoesNotExistInScenario,
  37. )
  38. from ..job.job import Job
  39. from ..notification import Event, EventEntityType, EventOperation, Notifier, _make_event
  40. from ..sequence.sequence import Sequence
  41. from ..submission.submission import Submission
  42. from ..task.task import Task
  43. from ..task.task_id import TaskId
  44. from .scenario_id import ScenarioId
  45. class Scenario(_Entity, Submittable, _Labeled):
  46. """Instance of a Business case to solve.
  47. A scenario holds a set of tasks (instances of `Task^` class) to submit for execution in order to
  48. solve the Business case. It also holds a set of additional data nodes (instances of `DataNode` class)
  49. for extra data related to the scenario.
  50. Attributes:
  51. config_id (str): The identifier of the `ScenarioConfig^`.
  52. tasks (Set[Task^]): The set of tasks.
  53. additional_data_nodes (Set[DataNode^]): The set of additional data nodes.
  54. sequences (Dict[str, Sequence^]): The dictionary of sequences: subsets of tasks that can be submitted
  55. together independently of the rest of the scenario's tasks.
  56. properties (dict[str, Any]): A dictionary of additional properties.
  57. scenario_id (str): The unique identifier of this scenario.
  58. creation_date (datetime): The date and time of the scenario's creation.
  59. is_primary (bool): True if the scenario is the primary of its cycle. False otherwise.
  60. cycle (Cycle^): The cycle of the scenario.
  61. subscribers (List[Callable]): The list of callbacks to be called on `Job^`'s status change.
  62. tags (Set[str]): The list of scenario's tags.
  63. version (str): The string indicates the application version of the scenario to instantiate.
  64. If not provided, the latest version is used.
  65. """
  66. _ID_PREFIX = "SCENARIO"
  67. _MANAGER_NAME = "scenario"
  68. _MIGRATED_SEQUENCES_KEY = "sequences"
  69. __SEPARATOR = "_"
  70. _SEQUENCE_TASKS_KEY = "tasks"
  71. _SEQUENCE_PROPERTIES_KEY = "properties"
  72. _SEQUENCE_SUBSCRIBERS_KEY = "subscribers"
  73. def __init__(
  74. self,
  75. config_id: str,
  76. tasks: Optional[Union[Set[TaskId], Set[Task]]],
  77. properties: Dict[str, Any],
  78. additional_data_nodes: Optional[Union[Set[DataNodeId], Set[DataNode]]] = None,
  79. scenario_id: Optional[ScenarioId] = None,
  80. creation_date: Optional[datetime] = None,
  81. is_primary: bool = False,
  82. cycle: Optional[Cycle] = None,
  83. subscribers: Optional[List[_Subscriber]] = None,
  84. tags: Optional[Set[str]] = None,
  85. version: str = None,
  86. sequences: Optional[Dict[str, Dict]] = None,
  87. ):
  88. super().__init__(subscribers or [])
  89. self._config_id = _validate_id(config_id)
  90. self.id: ScenarioId = scenario_id or self._new_id(self.config_id)
  91. self._tasks: Union[Set[TaskId], Set[Task], Set] = tasks or set()
  92. self._additional_data_nodes: Union[Set[DataNodeId], Set[DataNode], Set] = additional_data_nodes or set()
  93. self._creation_date = creation_date or datetime.now()
  94. self._cycle = cycle
  95. self._primary_scenario = is_primary
  96. self._tags = tags or set()
  97. self._properties = _Properties(self, **properties)
  98. self._sequences: Dict[str, Dict] = sequences or {}
  99. _scenario_task_ids = {task.id if isinstance(task, Task) else task for task in self._tasks}
  100. for sequence_name, sequence_data in self._sequences.items():
  101. sequence_task_ids = {task.id if isinstance(task, Task) else task for task in sequence_data.get("tasks", [])}
  102. self.__check_sequence_tasks_exist_in_scenario_tasks(
  103. sequence_name, sequence_task_ids, self.id, _scenario_task_ids
  104. )
  105. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  106. @staticmethod
  107. def _new_id(config_id: str) -> ScenarioId:
  108. """Generate a unique scenario identifier."""
  109. return ScenarioId(Scenario.__SEPARATOR.join([Scenario._ID_PREFIX, _validate_id(config_id), str(uuid.uuid4())]))
  110. def __getstate__(self):
  111. return self.id
  112. def __setstate__(self, id):
  113. from ... import core as tp
  114. sc = tp.get(id)
  115. self.__dict__ = sc.__dict__
  116. def __hash__(self):
  117. return hash(self.id)
  118. def __eq__(self, other):
  119. return self.id == other.id
  120. def __getattr__(self, attribute_name):
  121. protected_attribute_name = _validate_id(attribute_name)
  122. if protected_attribute_name in self._properties:
  123. return _tpl._replace_templates(self._properties[protected_attribute_name])
  124. sequences = self._get_sequences()
  125. if protected_attribute_name in sequences:
  126. return sequences[protected_attribute_name]
  127. tasks = self.tasks
  128. if protected_attribute_name in tasks:
  129. return tasks[protected_attribute_name]
  130. data_nodes = self.data_nodes
  131. if protected_attribute_name in data_nodes:
  132. return data_nodes[protected_attribute_name]
  133. raise AttributeError(f"{attribute_name} is not an attribute of scenario {self.id}")
  134. @property
  135. def config_id(self):
  136. return self._config_id
  137. @property # type: ignore
  138. @_self_reload(_MANAGER_NAME)
  139. def sequences(self) -> Dict[str, Sequence]:
  140. return self._get_sequences()
  141. @sequences.setter # type: ignore
  142. @_self_setter(_MANAGER_NAME)
  143. def sequences(
  144. self, sequences: Dict[str, Dict[str, Union[List[Task], List[TaskId], _ListAttributes, List[_Subscriber], Dict]]]
  145. ):
  146. self._sequences = sequences
  147. actual_sequences = self._get_sequences()
  148. for sequence_name in sequences.keys():
  149. if not actual_sequences[sequence_name]._is_consistent():
  150. raise InvalidSequence(actual_sequences[sequence_name].id)
  151. def add_sequence(
  152. self,
  153. name: str,
  154. tasks: Union[List[Task], List[TaskId]],
  155. properties: Optional[Dict] = None,
  156. subscribers: Optional[List[_Subscriber]] = None,
  157. ):
  158. """Add a sequence to the scenario.
  159. Parameters:
  160. name (str): The name of the sequence.
  161. tasks (Union[List[Task], List[TaskId]]): The list of scenario's tasks to add to the sequence.
  162. properties (Optional[Dict]): The optional properties of the sequence.
  163. subscribers (Optional[List[_Subscriber]]): The optional list of callbacks to be called on
  164. `Job^`'s status change.
  165. Raises:
  166. SequenceTaskDoesNotExistInScenario^: If a task in the sequence does not exist in the scenario.
  167. SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
  168. """
  169. if name in self.sequences:
  170. raise SequenceAlreadyExists(name, self.id)
  171. seq = self._set_sequence(name, tasks, properties, subscribers)
  172. Notifier.publish(_make_event(seq, EventOperation.CREATION))
  173. def update_sequence(
  174. self,
  175. name: str,
  176. tasks: Union[List[Task], List[TaskId]],
  177. properties: Optional[Dict] = None,
  178. subscribers: Optional[List[_Subscriber]] = None,
  179. ):
  180. """Update an existing sequence.
  181. Parameters:
  182. name (str): The name of the sequence to update.
  183. tasks (Union[List[Task], List[TaskId]]): The new list of scenario's tasks.
  184. properties (Optional[Dict]): The new properties of the sequence.
  185. subscribers (Optional[List[_Subscriber]]): The new list of callbacks to be called on `Job^`'s status change.
  186. Raises:
  187. SequenceTaskDoesNotExistInScenario^: If a task in the list does not exist in the scenario.
  188. SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
  189. """
  190. if name not in self.sequences:
  191. raise NonExistingSequence(name, self.id)
  192. seq = self._set_sequence(name, tasks, properties, subscribers)
  193. Notifier.publish(_make_event(seq, EventOperation.UPDATE))
  194. def _set_sequence(
  195. self,
  196. name: str,
  197. tasks: Union[List[Task], List[TaskId]],
  198. properties: Optional[Dict] = None,
  199. subscribers: Optional[List[_Subscriber]] = None,
  200. ) -> Sequence:
  201. _scenario = _Reloader()._reload(self._MANAGER_NAME, self)
  202. _scenario_task_ids = {task.id if isinstance(task, Task) else task for task in _scenario._tasks}
  203. _sequence_task_ids: Set[TaskId] = {task.id if isinstance(task, Task) else task for task in tasks}
  204. self.__check_sequence_tasks_exist_in_scenario_tasks(name, _sequence_task_ids, self.id, _scenario_task_ids)
  205. from taipy.core.sequence._sequence_manager_factory import _SequenceManagerFactory
  206. seq_manager = _SequenceManagerFactory._build_manager()
  207. seq = seq_manager._create(name, tasks, subscribers or [], properties or {}, self.id, self.version)
  208. if not seq._is_consistent():
  209. raise InvalidSequence(name)
  210. _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
  211. _sequences.update(
  212. {
  213. name: {
  214. self._SEQUENCE_TASKS_KEY: tasks,
  215. self._SEQUENCE_PROPERTIES_KEY: properties or {},
  216. self._SEQUENCE_SUBSCRIBERS_KEY: subscribers or [],
  217. }
  218. }
  219. )
  220. self.sequences = _sequences # type: ignore
  221. return seq
  222. def add_sequences(self, sequences: Dict[str, Union[List[Task], List[TaskId]]]):
  223. """Add multiple sequences to the scenario.
  224. Note:
  225. To provide properties and subscribers for the sequences, use `Scenario.add_sequence^` instead.
  226. Parameters:
  227. sequences (Dict[str, Union[List[Task], List[TaskId]]]):
  228. A dictionary containing sequences to add. Each key is a sequence name, and the value must
  229. be a list of the scenario tasks.
  230. Raises:
  231. SequenceTaskDoesNotExistInScenario^: If a task in the sequence does not exist in the scenario.
  232. """
  233. _scenario = _Reloader()._reload(self._MANAGER_NAME, self)
  234. _sc_task_ids = {task.id if isinstance(task, Task) else task for task in _scenario._tasks}
  235. for name, tasks in sequences.items():
  236. _seq_task_ids: Set[TaskId] = {task.id if isinstance(task, Task) else task for task in tasks}
  237. self.__check_sequence_tasks_exist_in_scenario_tasks(name, _seq_task_ids, self.id, _sc_task_ids)
  238. # Need to parse twice the sequences to avoid adding some sequences and not others in case of exception
  239. for name, tasks in sequences.items():
  240. self.add_sequence(name, tasks)
  241. def remove_sequence(self, name: str):
  242. """Remove a sequence from the scenario.
  243. Parameters:
  244. name (str): The name of the sequence to remove.
  245. """
  246. seq_id = self.sequences[name].id
  247. _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
  248. _sequences.pop(name)
  249. self.sequences = _sequences # type: ignore
  250. Notifier.publish(Event(EventEntityType.SEQUENCE, EventOperation.DELETION, entity_id=seq_id))
  251. def remove_sequences(self, sequence_names: List[str]):
  252. """
  253. Remove multiple sequences from the scenario.
  254. Parameters:
  255. sequence_names (List[str]): A list of sequence names to remove.
  256. """
  257. _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
  258. for sequence_name in sequence_names:
  259. seq_id = self.sequences[sequence_name].id
  260. _sequences.pop(sequence_name)
  261. Notifier.publish(
  262. Event(
  263. EventEntityType.SEQUENCE,
  264. EventOperation.DELETION,
  265. entity_id=seq_id,
  266. )
  267. )
  268. self.sequences = _sequences # type: ignore
  269. def rename_sequence(self, old_name, new_name):
  270. """Rename a sequence of the scenario.
  271. Parameters:
  272. old_name (str): The current name of the sequence to rename.
  273. new_name (str): The new name of the sequence.
  274. Raises:
  275. SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
  276. """
  277. if old_name == new_name:
  278. return
  279. if new_name in self.sequences:
  280. raise SequenceAlreadyExists(new_name, self.id)
  281. self._sequences[new_name] = self._sequences[old_name]
  282. del self._sequences[old_name]
  283. self.sequences = self._sequences # type: ignore
  284. Notifier.publish(
  285. Event(
  286. EventEntityType.SCENARIO,
  287. EventOperation.UPDATE,
  288. entity_id=self.id,
  289. attribute_name="sequences",
  290. attribute_value=self._sequences,
  291. )
  292. )
  293. @staticmethod
  294. def __check_sequence_tasks_exist_in_scenario_tasks(
  295. sequence_name: str, sequence_task_ids: Set[TaskId], scenario_id: ScenarioId, scenario_task_ids: Set[TaskId]
  296. ):
  297. non_existing_sequence_task_ids_in_scenario = set()
  298. for sequence_task_id in sequence_task_ids:
  299. if sequence_task_id not in scenario_task_ids:
  300. non_existing_sequence_task_ids_in_scenario.add(sequence_task_id)
  301. if len(non_existing_sequence_task_ids_in_scenario) > 0:
  302. raise SequenceTaskDoesNotExistInScenario(
  303. list(non_existing_sequence_task_ids_in_scenario), sequence_name, scenario_id
  304. )
  305. def _get_sequences(self) -> Dict[str, Sequence]:
  306. _sequences = {}
  307. from ..sequence._sequence_manager_factory import _SequenceManagerFactory
  308. sequence_manager = _SequenceManagerFactory._build_manager()
  309. for sequence_name, sequence_data in self._sequences.items():
  310. p = sequence_manager._create(
  311. sequence_name,
  312. sequence_data.get(self._SEQUENCE_TASKS_KEY, []),
  313. sequence_data.get(self._SEQUENCE_SUBSCRIBERS_KEY, []),
  314. sequence_data.get(self._SEQUENCE_PROPERTIES_KEY, {}),
  315. self.id,
  316. self.version,
  317. )
  318. if not isinstance(p, Sequence):
  319. raise NonExistingSequence(sequence_name, self.id)
  320. _sequences[sequence_name] = p
  321. return _sequences
  322. @property # type: ignore
  323. @_self_reload(_MANAGER_NAME)
  324. def tasks(self) -> Dict[str, Task]:
  325. return self.__get_tasks()
  326. def __get_tasks(self) -> Dict[str, Task]:
  327. from ..task._task_manager_factory import _TaskManagerFactory
  328. _tasks = {}
  329. task_manager = _TaskManagerFactory._build_manager()
  330. for task_or_id in self._tasks:
  331. t = task_manager._get(task_or_id, task_or_id)
  332. if not isinstance(t, Task):
  333. raise NonExistingTask(task_or_id)
  334. _tasks[t.config_id] = t
  335. return _tasks
  336. @tasks.setter # type: ignore
  337. @_self_setter(_MANAGER_NAME)
  338. def tasks(self, val: Union[Set[TaskId], Set[Task]]):
  339. self._tasks = set(val)
  340. @property # type: ignore
  341. @_self_reload(_MANAGER_NAME)
  342. def additional_data_nodes(self) -> Dict[str, DataNode]:
  343. return self.__get_additional_data_nodes()
  344. def __get_additional_data_nodes(self):
  345. from ..data._data_manager_factory import _DataManagerFactory
  346. additional_data_nodes = {}
  347. data_manager = _DataManagerFactory._build_manager()
  348. for dn_or_id in self._additional_data_nodes:
  349. dn = data_manager._get(dn_or_id, dn_or_id)
  350. if not isinstance(dn, DataNode):
  351. raise NonExistingDataNode(dn_or_id)
  352. additional_data_nodes[dn.config_id] = dn
  353. return additional_data_nodes
  354. @additional_data_nodes.setter # type: ignore
  355. @_self_setter(_MANAGER_NAME)
  356. def additional_data_nodes(self, val: Union[Set[TaskId], Set[DataNode]]):
  357. self._additional_data_nodes = set(val)
  358. def _get_set_of_tasks(self) -> Set[Task]:
  359. return set(self.tasks.values())
  360. @property # type: ignore
  361. @_self_reload(_MANAGER_NAME)
  362. def data_nodes(self) -> Dict[str, DataNode]:
  363. data_nodes_dict = self.__get_additional_data_nodes()
  364. for _, task in self.__get_tasks().items():
  365. data_nodes_dict.update(task.data_nodes)
  366. return data_nodes_dict
  367. @property # type: ignore
  368. @_self_reload(_MANAGER_NAME)
  369. def creation_date(self):
  370. return self._creation_date
  371. @creation_date.setter # type: ignore
  372. @_self_setter(_MANAGER_NAME)
  373. def creation_date(self, val):
  374. self._creation_date = val
  375. @property # type: ignore
  376. @_self_reload(_MANAGER_NAME)
  377. def cycle(self):
  378. return self._cycle
  379. @cycle.setter # type: ignore
  380. @_self_setter(_MANAGER_NAME)
  381. def cycle(self, val):
  382. self._cycle = val
  383. @property # type: ignore
  384. @_self_reload(_MANAGER_NAME)
  385. def is_primary(self):
  386. return self._primary_scenario
  387. @is_primary.setter # type: ignore
  388. @_self_setter(_MANAGER_NAME)
  389. def is_primary(self, val):
  390. self._primary_scenario = val
  391. @property # type: ignore
  392. @_self_reload(_MANAGER_NAME)
  393. def subscribers(self):
  394. return self._subscribers
  395. @subscribers.setter # type: ignore
  396. @_self_setter(_MANAGER_NAME)
  397. def subscribers(self, val):
  398. self._subscribers = _ListAttributes(self, val)
  399. @property # type: ignore
  400. @_self_reload(_MANAGER_NAME)
  401. def tags(self):
  402. return self._tags
  403. @tags.setter # type: ignore
  404. @_self_setter(_MANAGER_NAME)
  405. def tags(self, val):
  406. self._tags = val or set()
  407. @property
  408. def version(self):
  409. return self._version
  410. @property
  411. def owner_id(self):
  412. return self._cycle.id
  413. @property
  414. def properties(self):
  415. self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
  416. return self._properties
  417. @property # type: ignore
  418. def name(self) -> Optional[str]:
  419. return self.properties.get("name")
  420. @name.setter # type: ignore
  421. def name(self, val):
  422. self.properties["name"] = val
  423. def has_tag(self, tag: str) -> bool:
  424. """Indicate if the scenario has a given tag.
  425. Parameters:
  426. tag (str): The tag to search among the set of scenario's tags.
  427. Returns:
  428. True if the scenario has the tag given as parameter. False otherwise.
  429. """
  430. return tag in self.tags
  431. def _add_tag(self, tag: str):
  432. self._tags = _Reloader()._reload("scenario", self)._tags
  433. self._tags.add(tag)
  434. def _remove_tag(self, tag: str):
  435. self._tags = _Reloader()._reload("scenario", self)._tags
  436. if self.has_tag(tag):
  437. self._tags.remove(tag)
  438. def subscribe(
  439. self,
  440. callback: Callable[[Scenario, Job], None],
  441. params: Optional[List[Any]] = None,
  442. ):
  443. """Subscribe a function to be called on `Job^` status change.
  444. The subscription is applied to all jobs created from the scenario's execution.
  445. Parameters:
  446. callback (Callable[[Scenario^, Job^], None]): The callable function to be called
  447. on status change.
  448. params (Optional[List[Any]]): The parameters to be passed to the _callback_.
  449. Note:
  450. Notification will be available only for jobs created after this subscription.
  451. """
  452. from ... import core as tp
  453. return tp.subscribe_scenario(callback, params, self)
  454. def unsubscribe(self, callback: Callable[[Scenario, Job], None], params: Optional[List[Any]] = None):
  455. """Unsubscribe a function that is called when the status of a `Job^` changes.
  456. Parameters:
  457. callback (Callable[[Scenario^, Job^], None]): The callable function to unsubscribe.
  458. params (Optional[List[Any]]): The parameters to be passed to the _callback_.
  459. Note:
  460. The function will continue to be called for ongoing jobs.
  461. """
  462. from ... import core as tp
  463. return tp.unsubscribe_scenario(callback, params, self)
  464. def submit(
  465. self,
  466. callbacks: Optional[List[Callable]] = None,
  467. force: bool = False,
  468. wait: bool = False,
  469. timeout: Optional[Union[float, int]] = None,
  470. **properties,
  471. ) -> Submission:
  472. """Submit this scenario for execution.
  473. All the `Task^`s of the scenario will be submitted for execution.
  474. Parameters:
  475. callbacks (List[Callable]): The list of callable functions to be called on status
  476. change.
  477. force (bool): Force execution even if the data nodes are in cache.
  478. wait (bool): Wait for the orchestrated jobs created from the scenario submission to be finished in
  479. asynchronous mode.
  480. timeout (Union[float, int]): The optional maximum number of seconds to wait for the jobs to be finished
  481. before returning.
  482. **properties (dict[str, any]): A keyworded variable length list of additional arguments.
  483. Returns:
  484. A `Submission^` containing the information of the submission.
  485. """
  486. from ._scenario_manager_factory import _ScenarioManagerFactory
  487. return _ScenarioManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout, **properties)
  488. def export(
  489. self,
  490. folder_path: Union[str, pathlib.Path],
  491. override: bool = False,
  492. include_data: bool = False,
  493. ):
  494. """Export all related entities of this scenario to a folder.
  495. Parameters:
  496. folder_path (Union[str, pathlib.Path]): The folder path to export the scenario to.
  497. If the path exists and the override parameter is False, an exception is raised.
  498. override (bool): If True, the existing folder will be overridden. Default is False.
  499. include_data (bool): If True, the file-based data nodes are exported as well.
  500. This includes Pickle, CSV, Excel, Parquet, and JSON data nodes.
  501. If the scenario has a data node that is not file-based, a warning will be logged, and the data node
  502. will not be exported. The default value is False.
  503. Raises:
  504. ExportFolderAlreadyExist^: If the `folder_path` already exists and the override parameter is False.
  505. """
  506. from ... import core as tp
  507. return tp.export_scenario(self.id, folder_path, override, include_data)
  508. def set_primary(self):
  509. """Promote the scenario as the primary scenario of its cycle.
  510. If the cycle already has a primary scenario, it will be demoted, and it will no longer
  511. be primary for the cycle.
  512. """
  513. from ... import core as tp
  514. return tp.set_primary(self)
  515. def add_tag(self, tag: str):
  516. """Add a tag to this scenario.
  517. If the scenario's cycle already have another scenario tagged with _tag_ the other
  518. scenario will be untagged.
  519. Parameters:
  520. tag (str): The tag to add to this scenario.
  521. """
  522. from ... import core as tp
  523. return tp.tag(self, tag)
  524. def remove_tag(self, tag: str):
  525. """Remove a tag from this scenario.
  526. Parameters:
  527. tag (str): The tag to remove from the set of the scenario's tags.
  528. """
  529. from ... import core as tp
  530. return tp.untag(self, tag)
  531. def is_deletable(self) -> bool:
  532. """Indicate if the scenario can be deleted.
  533. Returns:
  534. True if the scenario can be deleted. False otherwise.
  535. """
  536. from ... import core as tp
  537. return tp.is_deletable(self)
  538. def get_label(self) -> str:
  539. """Returns the scenario simple label prefixed by its owner label.
  540. Returns:
  541. The label of the scenario as a string.
  542. """
  543. return self._get_label()
  544. def get_simple_label(self) -> str:
  545. """Returns the scenario simple label.
  546. Returns:
  547. The simple label of the scenario as a string.
  548. """
  549. return self._get_simple_label()
  550. def _is_consistent(self) -> bool:
  551. dag = self._build_dag()
  552. if dag.number_of_nodes() == 0:
  553. return True
  554. if not nx.is_directed_acyclic_graph(dag):
  555. return False
  556. for left_node, right_node in dag.edges:
  557. if (isinstance(left_node, DataNode) and isinstance(right_node, Task)) or (
  558. isinstance(left_node, Task) and isinstance(right_node, DataNode)
  559. ):
  560. continue
  561. return False
  562. return True
  563. @_make_event.register(Scenario)
  564. def _make_event_for_scenario(
  565. scenario: Scenario,
  566. operation: EventOperation,
  567. /,
  568. attribute_name: Optional[str] = None,
  569. attribute_value: Optional[Any] = None,
  570. **kwargs,
  571. ) -> Event:
  572. metadata = {"config_id": scenario.config_id, "version": scenario._version, **kwargs}
  573. return Event(
  574. entity_type=EventEntityType.SCENARIO,
  575. entity_id=scenario.id,
  576. operation=operation,
  577. attribute_name=attribute_name,
  578. attribute_value=attribute_value,
  579. metadata=metadata,
  580. )