scenario.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from __future__ import annotations
  12. import pathlib
  13. import uuid
  14. from datetime import datetime
  15. from typing import Any, Callable, Dict, List, Optional, Set, Union
  16. import networkx as nx
  17. from taipy.config.common._template_handler import _TemplateHandler as _tpl
  18. from taipy.config.common._validate_id import _validate_id
  19. from .._entity._entity import _Entity
  20. from .._entity._labeled import _Labeled
  21. from .._entity._properties import _Properties
  22. from .._entity._reload import _Reloader, _self_reload, _self_setter
  23. from .._entity.submittable import Submittable
  24. from .._version._version_manager_factory import _VersionManagerFactory
  25. from ..common._listattributes import _ListAttributes
  26. from ..common._utils import _Subscriber
  27. from ..cycle.cycle import Cycle
  28. from ..data.data_node import DataNode
  29. from ..data.data_node_id import DataNodeId
  30. from ..exceptions.exceptions import (
  31. InvalidSequence,
  32. NonExistingDataNode,
  33. NonExistingSequence,
  34. NonExistingTask,
  35. SequenceAlreadyExists,
  36. SequenceTaskDoesNotExistInScenario,
  37. )
  38. from ..job.job import Job
  39. from ..notification import Event, EventEntityType, EventOperation, Notifier, _make_event
  40. from ..sequence.sequence import Sequence
  41. from ..submission.submission import Submission
  42. from ..task.task import Task
  43. from ..task.task_id import TaskId
  44. from .scenario_id import ScenarioId
  45. class Scenario(_Entity, Submittable, _Labeled):
  46. """Instance of a Business case to solve.
  47. A scenario holds a set of tasks (instances of `Task^` class) to submit for execution in order to
  48. solve the Business case. It also holds a set of additional data nodes (instances of `DataNode` class)
  49. for extra data related to the scenario.
  50. !!! note
  51. It is not recommended to instantiate a `Scenario` directly. Instead, it should be
  52. created with the `create_scenario()^` function.
  53. !!! Example
  54. ```python
  55. import taipy as tp
  56. from taipy import Config
  57. def by_two(x: int):
  58. return x * 2
  59. # Configure scenarios
  60. input_cfg = Config.configure_data_node("my_input")
  61. result_cfg = Config.configure_data_node("my_result")
  62. task_cfg = Config.configure_task("my_double", function=by_two, input=input_cfg, output=result_cfg)
  63. scenario_cfg = Config.configure_scenario("my_scenario", task_configs=[task_cfg])
  64. # Create a new scenario from the configuration
  65. scenario = tp.create_scenario(scenario_cfg)
  66. # Write the input data and submit the scenario
  67. scenario.my_input.write(3)
  68. scenario.submit()
  69. # Read the result
  70. print(scenario.my_result.read()) # Output: 6
  71. # Retrieve all scenarios
  72. all_scenarios = tp.get_scenarios()
  73. ```
  74. Attributes:
  75. config_id (str): The identifier of the `ScenarioConfig^`.
  76. tasks (Set[Task^]): The set of tasks.
  77. additional_data_nodes (Set[DataNode^]): The set of additional data nodes.
  78. sequences (Dict[str, Sequence^]): The dictionary of sequences: subsets of tasks that can be submitted
  79. together independently of the rest of the scenario's tasks.
  80. properties (dict[str, Any]): A dictionary of additional properties.
  81. scenario_id (str): The unique identifier of this scenario.
  82. creation_date (datetime): The date and time of the scenario's creation.
  83. is_primary (bool): True if the scenario is the primary of its cycle. False otherwise.
  84. cycle (Cycle^): The cycle of the scenario.
  85. subscribers (List[Callable]): The list of callbacks to be called on `Job^`'s status change.
  86. tags (Set[str]): The list of scenario's tags.
  87. version (str): The string indicates the application version of the scenario to instantiate.
  88. If not provided, the latest version is used.
  89. """
  90. _ID_PREFIX = "SCENARIO"
  91. _MANAGER_NAME = "scenario"
  92. _MIGRATED_SEQUENCES_KEY = "sequences"
  93. __SEPARATOR = "_"
  94. _SEQUENCE_TASKS_KEY = "tasks"
  95. _SEQUENCE_PROPERTIES_KEY = "properties"
  96. _SEQUENCE_SUBSCRIBERS_KEY = "subscribers"
  97. def __init__(
  98. self,
  99. config_id: str,
  100. tasks: Optional[Union[Set[TaskId], Set[Task]]],
  101. properties: Dict[str, Any],
  102. additional_data_nodes: Optional[Union[Set[DataNodeId], Set[DataNode]]] = None,
  103. scenario_id: Optional[ScenarioId] = None,
  104. creation_date: Optional[datetime] = None,
  105. is_primary: bool = False,
  106. cycle: Optional[Cycle] = None,
  107. subscribers: Optional[List[_Subscriber]] = None,
  108. tags: Optional[Set[str]] = None,
  109. version: str = None,
  110. sequences: Optional[Dict[str, Dict]] = None,
  111. ):
  112. self._config_id = _validate_id(config_id)
  113. self.id: ScenarioId = scenario_id or self._new_id(self.config_id)
  114. super().__init__(self.id, subscribers or [])
  115. self._tasks: Union[Set[TaskId], Set[Task], Set] = tasks or set()
  116. self._additional_data_nodes: Union[Set[DataNodeId], Set[DataNode], Set] = additional_data_nodes or set()
  117. self._creation_date = creation_date or datetime.now()
  118. self._cycle = cycle
  119. self._primary_scenario = is_primary
  120. self._tags = tags or set()
  121. self._properties = _Properties(self, **properties)
  122. self._sequences: Dict[str, Dict] = sequences or {}
  123. _scenario_task_ids = {task.id if isinstance(task, Task) else task for task in self._tasks}
  124. for sequence_name, sequence_data in self._sequences.items():
  125. sequence_task_ids = {task.id if isinstance(task, Task) else task for task in sequence_data.get("tasks", [])}
  126. self.__check_sequence_tasks_exist_in_scenario_tasks(
  127. sequence_name, sequence_task_ids, self.id, _scenario_task_ids
  128. )
  129. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  130. @staticmethod
  131. def _new_id(config_id: str) -> ScenarioId:
  132. """Generate a unique scenario identifier."""
  133. return ScenarioId(Scenario.__SEPARATOR.join([Scenario._ID_PREFIX, _validate_id(config_id), str(uuid.uuid4())]))
  134. def __getstate__(self):
  135. return self.id
  136. def __setstate__(self, id):
  137. from ... import core as tp
  138. sc = tp.get(id)
  139. self.__dict__ = sc.__dict__
  140. def __hash__(self):
  141. return hash(self.id)
  142. def __eq__(self, other):
  143. return isinstance(other, Scenario) and self.id == other.id
  144. def __getattr__(self, attribute_name):
  145. protected_attribute_name = _validate_id(attribute_name)
  146. if protected_attribute_name in self._properties:
  147. return _tpl._replace_templates(self._properties[protected_attribute_name])
  148. sequences = self._get_sequences()
  149. if protected_attribute_name in sequences:
  150. return sequences[protected_attribute_name]
  151. tasks = self.tasks
  152. if protected_attribute_name in tasks:
  153. return tasks[protected_attribute_name]
  154. data_nodes = self.data_nodes
  155. if protected_attribute_name in data_nodes:
  156. return data_nodes[protected_attribute_name]
  157. raise AttributeError(f"{attribute_name} is not an attribute of scenario {self.id}")
  158. @property
  159. def config_id(self):
  160. return self._config_id
  161. @property # type: ignore
  162. @_self_reload(_MANAGER_NAME)
  163. def sequences(self) -> Dict[str, Sequence]:
  164. return self._get_sequences()
  165. @sequences.setter # type: ignore
  166. @_self_setter(_MANAGER_NAME)
  167. def sequences(
  168. self, sequences: Dict[str, Dict[str, Union[List[Task], List[TaskId], _ListAttributes, List[_Subscriber], Dict]]]
  169. ):
  170. self._sequences = sequences
  171. actual_sequences = self._get_sequences()
  172. for sequence_name in sequences.keys():
  173. if not actual_sequences[sequence_name]._is_consistent():
  174. raise InvalidSequence(actual_sequences[sequence_name].id)
  175. def add_sequence(
  176. self,
  177. name: str,
  178. tasks: Union[List[Task], List[TaskId]],
  179. properties: Optional[Dict] = None,
  180. subscribers: Optional[List[_Subscriber]] = None,
  181. ):
  182. """Add a sequence to the scenario.
  183. Parameters:
  184. name (str): The name of the sequence.
  185. tasks (Union[List[Task], List[TaskId]]): The list of scenario's tasks to add to the sequence.
  186. properties (Optional[Dict]): The optional properties of the sequence.
  187. subscribers (Optional[List[_Subscriber]]): The optional list of callbacks to be called on
  188. `Job^`'s status change.
  189. Raises:
  190. SequenceTaskDoesNotExistInScenario^: If a task in the sequence does not exist in the scenario.
  191. SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
  192. """
  193. if name in self.sequences:
  194. raise SequenceAlreadyExists(name, self.id)
  195. seq = self._set_sequence(name, tasks, properties, subscribers)
  196. Notifier.publish(_make_event(seq, EventOperation.CREATION))
  197. def update_sequence(
  198. self,
  199. name: str,
  200. tasks: Union[List[Task], List[TaskId]],
  201. properties: Optional[Dict] = None,
  202. subscribers: Optional[List[_Subscriber]] = None,
  203. ):
  204. """Update an existing sequence.
  205. Parameters:
  206. name (str): The name of the sequence to update.
  207. tasks (Union[List[Task], List[TaskId]]): The new list of scenario's tasks.
  208. properties (Optional[Dict]): The new properties of the sequence.
  209. subscribers (Optional[List[_Subscriber]]): The new list of callbacks to be called on `Job^`'s status change.
  210. Raises:
  211. SequenceTaskDoesNotExistInScenario^: If a task in the list does not exist in the scenario.
  212. SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
  213. """
  214. if name not in self.sequences:
  215. raise NonExistingSequence(name, self.id)
  216. seq = self._set_sequence(name, tasks, properties, subscribers)
  217. Notifier.publish(_make_event(seq, EventOperation.UPDATE))
  218. def _set_sequence(
  219. self,
  220. name: str,
  221. tasks: Union[List[Task], List[TaskId]],
  222. properties: Optional[Dict] = None,
  223. subscribers: Optional[List[_Subscriber]] = None,
  224. ) -> Sequence:
  225. _scenario = _Reloader()._reload(self._MANAGER_NAME, self)
  226. _scenario_task_ids = {task.id if isinstance(task, Task) else task for task in _scenario._tasks}
  227. _sequence_task_ids: Set[TaskId] = {task.id if isinstance(task, Task) else task for task in tasks}
  228. self.__check_sequence_tasks_exist_in_scenario_tasks(name, _sequence_task_ids, self.id, _scenario_task_ids)
  229. from taipy.core.sequence._sequence_manager_factory import _SequenceManagerFactory
  230. seq_manager = _SequenceManagerFactory._build_manager()
  231. seq = seq_manager._create(name, tasks, subscribers or [], properties or {}, self.id, self.version)
  232. if not seq._is_consistent():
  233. raise InvalidSequence(name)
  234. _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
  235. _sequences.update(
  236. {
  237. name: {
  238. self._SEQUENCE_TASKS_KEY: tasks,
  239. self._SEQUENCE_PROPERTIES_KEY: properties or {},
  240. self._SEQUENCE_SUBSCRIBERS_KEY: subscribers or [],
  241. }
  242. }
  243. )
  244. self.sequences = _sequences # type: ignore
  245. return seq
  246. def add_sequences(self, sequences: Dict[str, Union[List[Task], List[TaskId]]]):
  247. """Add multiple sequences to the scenario.
  248. Note:
  249. To provide properties and subscribers for the sequences, use `Scenario.add_sequence^` instead.
  250. Parameters:
  251. sequences (Dict[str, Union[List[Task], List[TaskId]]]):
  252. A dictionary containing sequences to add. Each key is a sequence name, and the value must
  253. be a list of the scenario tasks.
  254. Raises:
  255. SequenceTaskDoesNotExistInScenario^: If a task in the sequence does not exist in the scenario.
  256. """
  257. _scenario = _Reloader()._reload(self._MANAGER_NAME, self)
  258. _sc_task_ids = {task.id if isinstance(task, Task) else task for task in _scenario._tasks}
  259. for name, tasks in sequences.items():
  260. _seq_task_ids: Set[TaskId] = {task.id if isinstance(task, Task) else task for task in tasks}
  261. self.__check_sequence_tasks_exist_in_scenario_tasks(name, _seq_task_ids, self.id, _sc_task_ids)
  262. # Need to parse twice the sequences to avoid adding some sequences and not others in case of exception
  263. for name, tasks in sequences.items():
  264. self.add_sequence(name, tasks)
  265. def remove_sequence(self, name: str):
  266. """Remove a sequence from the scenario.
  267. Parameters:
  268. name (str): The name of the sequence to remove.
  269. """
  270. seq_id = self.sequences[name].id
  271. _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
  272. _sequences.pop(name)
  273. self.sequences = _sequences # type: ignore
  274. Notifier.publish(Event(EventEntityType.SEQUENCE, EventOperation.DELETION, entity_id=seq_id))
  275. def remove_sequences(self, sequence_names: List[str]):
  276. """
  277. Remove multiple sequences from the scenario.
  278. Parameters:
  279. sequence_names (List[str]): A list of sequence names to remove.
  280. """
  281. _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
  282. for sequence_name in sequence_names:
  283. seq_id = self.sequences[sequence_name].id
  284. _sequences.pop(sequence_name)
  285. Notifier.publish(
  286. Event(
  287. EventEntityType.SEQUENCE,
  288. EventOperation.DELETION,
  289. entity_id=seq_id,
  290. )
  291. )
  292. self.sequences = _sequences # type: ignore
  293. def rename_sequence(self, old_name, new_name):
  294. """Rename a sequence of the scenario.
  295. Parameters:
  296. old_name (str): The current name of the sequence to rename.
  297. new_name (str): The new name of the sequence.
  298. Raises:
  299. SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
  300. """
  301. if old_name == new_name:
  302. return
  303. if new_name in self.sequences:
  304. raise SequenceAlreadyExists(new_name, self.id)
  305. self._sequences[new_name] = self._sequences[old_name]
  306. del self._sequences[old_name]
  307. self.sequences = self._sequences # type: ignore
  308. Notifier.publish(
  309. Event(
  310. EventEntityType.SCENARIO,
  311. EventOperation.UPDATE,
  312. entity_id=self.id,
  313. attribute_name="sequences",
  314. attribute_value=self._sequences,
  315. )
  316. )
  317. @staticmethod
  318. def __check_sequence_tasks_exist_in_scenario_tasks(
  319. sequence_name: str, sequence_task_ids: Set[TaskId], scenario_id: ScenarioId, scenario_task_ids: Set[TaskId]
  320. ):
  321. non_existing_sequence_task_ids_in_scenario = set()
  322. for sequence_task_id in sequence_task_ids:
  323. if sequence_task_id not in scenario_task_ids:
  324. non_existing_sequence_task_ids_in_scenario.add(sequence_task_id)
  325. if len(non_existing_sequence_task_ids_in_scenario) > 0:
  326. raise SequenceTaskDoesNotExistInScenario(
  327. list(non_existing_sequence_task_ids_in_scenario), sequence_name, scenario_id
  328. )
  329. def _get_sequences(self) -> Dict[str, Sequence]:
  330. _sequences = {}
  331. from ..sequence._sequence_manager_factory import _SequenceManagerFactory
  332. sequence_manager = _SequenceManagerFactory._build_manager()
  333. for sequence_name, sequence_data in self._sequences.items():
  334. p = sequence_manager._create(
  335. sequence_name,
  336. sequence_data.get(self._SEQUENCE_TASKS_KEY, []),
  337. sequence_data.get(self._SEQUENCE_SUBSCRIBERS_KEY, []),
  338. sequence_data.get(self._SEQUENCE_PROPERTIES_KEY, {}),
  339. self.id,
  340. self.version,
  341. )
  342. if not isinstance(p, Sequence):
  343. raise NonExistingSequence(sequence_name, self.id)
  344. _sequences[sequence_name] = p
  345. return _sequences
  346. @property # type: ignore
  347. @_self_reload(_MANAGER_NAME)
  348. def tasks(self) -> Dict[str, Task]:
  349. return self.__get_tasks()
  350. def __get_tasks(self) -> Dict[str, Task]:
  351. from ..task._task_manager_factory import _TaskManagerFactory
  352. _tasks = {}
  353. task_manager = _TaskManagerFactory._build_manager()
  354. for task_or_id in self._tasks:
  355. t = task_manager._get(task_or_id, task_or_id)
  356. if not isinstance(t, Task):
  357. raise NonExistingTask(task_or_id)
  358. _tasks[t.config_id] = t
  359. return _tasks
  360. @tasks.setter # type: ignore
  361. @_self_setter(_MANAGER_NAME)
  362. def tasks(self, val: Union[Set[TaskId], Set[Task]]):
  363. self._tasks = set(val)
  364. @property # type: ignore
  365. @_self_reload(_MANAGER_NAME)
  366. def additional_data_nodes(self) -> Dict[str, DataNode]:
  367. return self.__get_additional_data_nodes()
  368. def __get_additional_data_nodes(self):
  369. from ..data._data_manager_factory import _DataManagerFactory
  370. additional_data_nodes = {}
  371. data_manager = _DataManagerFactory._build_manager()
  372. for dn_or_id in self._additional_data_nodes:
  373. dn = data_manager._get(dn_or_id, dn_or_id)
  374. if not isinstance(dn, DataNode):
  375. raise NonExistingDataNode(dn_or_id)
  376. additional_data_nodes[dn.config_id] = dn
  377. return additional_data_nodes
  378. @additional_data_nodes.setter # type: ignore
  379. @_self_setter(_MANAGER_NAME)
  380. def additional_data_nodes(self, val: Union[Set[TaskId], Set[DataNode]]):
  381. self._additional_data_nodes = set(val)
  382. def _get_set_of_tasks(self) -> Set[Task]:
  383. return set(self.tasks.values())
  384. @property # type: ignore
  385. @_self_reload(_MANAGER_NAME)
  386. def data_nodes(self) -> Dict[str, DataNode]:
  387. data_nodes_dict = self.__get_additional_data_nodes()
  388. for _, task in self.__get_tasks().items():
  389. data_nodes_dict.update(task.data_nodes)
  390. return data_nodes_dict
  391. @property # type: ignore
  392. @_self_reload(_MANAGER_NAME)
  393. def creation_date(self):
  394. return self._creation_date
  395. @creation_date.setter # type: ignore
  396. @_self_setter(_MANAGER_NAME)
  397. def creation_date(self, val):
  398. self._creation_date = val
  399. @property # type: ignore
  400. @_self_reload(_MANAGER_NAME)
  401. def cycle(self):
  402. return self._cycle
  403. @cycle.setter # type: ignore
  404. @_self_setter(_MANAGER_NAME)
  405. def cycle(self, val):
  406. self._cycle = val
  407. @property # type: ignore
  408. @_self_reload(_MANAGER_NAME)
  409. def is_primary(self):
  410. return self._primary_scenario
  411. @is_primary.setter # type: ignore
  412. @_self_setter(_MANAGER_NAME)
  413. def is_primary(self, val):
  414. self._primary_scenario = val
  415. @property # type: ignore
  416. @_self_reload(_MANAGER_NAME)
  417. def subscribers(self):
  418. return self._subscribers
  419. @subscribers.setter # type: ignore
  420. @_self_setter(_MANAGER_NAME)
  421. def subscribers(self, val):
  422. self._subscribers = _ListAttributes(self, val)
  423. @property # type: ignore
  424. @_self_reload(_MANAGER_NAME)
  425. def tags(self):
  426. return self._tags
  427. @tags.setter # type: ignore
  428. @_self_setter(_MANAGER_NAME)
  429. def tags(self, val):
  430. self._tags = val or set()
  431. @property
  432. def version(self):
  433. return self._version
  434. @property
  435. def owner_id(self):
  436. return self._cycle.id
  437. @property
  438. def properties(self):
  439. self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
  440. return self._properties
  441. @property # type: ignore
  442. def name(self) -> Optional[str]:
  443. return self.properties.get("name")
  444. @name.setter # type: ignore
  445. def name(self, val):
  446. self.properties["name"] = val
  447. def has_tag(self, tag: str) -> bool:
  448. """Indicate if the scenario has a given tag.
  449. Parameters:
  450. tag (str): The tag to search among the set of scenario's tags.
  451. Returns:
  452. True if the scenario has the tag given as parameter. False otherwise.
  453. """
  454. return tag in self.tags
  455. def _add_tag(self, tag: str):
  456. self._tags = _Reloader()._reload("scenario", self)._tags
  457. self._tags.add(tag)
  458. def _remove_tag(self, tag: str):
  459. self._tags = _Reloader()._reload("scenario", self)._tags
  460. if self.has_tag(tag):
  461. self._tags.remove(tag)
  462. def subscribe(
  463. self,
  464. callback: Callable[[Scenario, Job], None],
  465. params: Optional[List[Any]] = None,
  466. ):
  467. """Subscribe a function to be called on `Job^` status change.
  468. The subscription is applied to all jobs created from the scenario's execution.
  469. Parameters:
  470. callback (Callable[[Scenario^, Job^], None]): The callable function to be called
  471. on status change.
  472. params (Optional[List[Any]]): The parameters to be passed to the _callback_.
  473. Note:
  474. Notification will be available only for jobs created after this subscription.
  475. """
  476. from ... import core as tp
  477. return tp.subscribe_scenario(callback, params, self)
  478. def unsubscribe(self, callback: Callable[[Scenario, Job], None], params: Optional[List[Any]] = None):
  479. """Unsubscribe a function that is called when the status of a `Job^` changes.
  480. Parameters:
  481. callback (Callable[[Scenario^, Job^], None]): The callable function to unsubscribe.
  482. params (Optional[List[Any]]): The parameters to be passed to the _callback_.
  483. Note:
  484. The function will continue to be called for ongoing jobs.
  485. """
  486. from ... import core as tp
  487. return tp.unsubscribe_scenario(callback, params, self)
  488. def submit(
  489. self,
  490. callbacks: Optional[List[Callable]] = None,
  491. force: bool = False,
  492. wait: bool = False,
  493. timeout: Optional[Union[float, int]] = None,
  494. **properties,
  495. ) -> Submission:
  496. """Submit this scenario for execution.
  497. All the `Task^`s of the scenario will be submitted for execution.
  498. Parameters:
  499. callbacks (List[Callable]): The list of callable functions to be called on status
  500. change.
  501. force (bool): Force execution even if the data nodes are in cache.
  502. wait (bool): Wait for the orchestrated jobs created from the scenario submission to be finished in
  503. asynchronous mode.
  504. timeout (Union[float, int]): The optional maximum number of seconds to wait for the jobs to be finished
  505. before returning.
  506. **properties (dict[str, any]): A keyworded variable length list of additional arguments.
  507. Returns:
  508. A `Submission^` containing the information of the submission.
  509. """
  510. from ._scenario_manager_factory import _ScenarioManagerFactory
  511. return _ScenarioManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout, **properties)
  512. def export(
  513. self,
  514. folder_path: Union[str, pathlib.Path],
  515. override: bool = False,
  516. include_data: bool = False,
  517. ):
  518. """Export all related entities of this scenario to a folder.
  519. Parameters:
  520. folder_path (Union[str, pathlib.Path]): The folder path to export the scenario to.
  521. If the path exists and the override parameter is False, an exception is raised.
  522. override (bool): If True, the existing folder will be overridden. Default is False.
  523. include_data (bool): If True, the file-based data nodes are exported as well.
  524. This includes Pickle, CSV, Excel, Parquet, and JSON data nodes.
  525. If the scenario has a data node that is not file-based, a warning will be logged, and the data node
  526. will not be exported. The default value is False.
  527. Raises:
  528. ExportFolderAlreadyExist^: If the `folder_path` already exists and the override parameter is False.
  529. """
  530. from ... import core as tp
  531. return tp.export_scenario(self.id, folder_path, override, include_data)
  532. def set_primary(self):
  533. """Promote the scenario as the primary scenario of its cycle.
  534. If the cycle already has a primary scenario, it will be demoted, and it will no longer
  535. be primary for the cycle.
  536. """
  537. from ... import core as tp
  538. return tp.set_primary(self)
  539. def add_tag(self, tag: str):
  540. """Add a tag to this scenario.
  541. If the scenario's cycle already have another scenario tagged with _tag_ the other
  542. scenario will be untagged.
  543. Parameters:
  544. tag (str): The tag to add to this scenario.
  545. """
  546. from ... import core as tp
  547. return tp.tag(self, tag)
  548. def remove_tag(self, tag: str):
  549. """Remove a tag from this scenario.
  550. Parameters:
  551. tag (str): The tag to remove from the set of the scenario's tags.
  552. """
  553. from ... import core as tp
  554. return tp.untag(self, tag)
  555. def is_deletable(self) -> bool:
  556. """Indicate if the scenario can be deleted.
  557. Returns:
  558. True if the scenario can be deleted. False otherwise.
  559. """
  560. from ... import core as tp
  561. return tp.is_deletable(self)
  562. def get_label(self) -> str:
  563. """Returns the scenario simple label prefixed by its owner label.
  564. Returns:
  565. The label of the scenario as a string.
  566. """
  567. return self._get_label()
  568. def get_simple_label(self) -> str:
  569. """Returns the scenario simple label.
  570. Returns:
  571. The simple label of the scenario as a string.
  572. """
  573. return self._get_simple_label()
  574. def _is_consistent(self) -> bool:
  575. dag = self._build_dag()
  576. if dag.number_of_nodes() == 0:
  577. return True
  578. if not nx.is_directed_acyclic_graph(dag):
  579. return False
  580. for left_node, right_node in dag.edges:
  581. if (isinstance(left_node, DataNode) and isinstance(right_node, Task)) or (
  582. isinstance(left_node, Task) and isinstance(right_node, DataNode)
  583. ):
  584. continue
  585. return False
  586. return True
  587. @_make_event.register(Scenario)
  588. def _make_event_for_scenario(
  589. scenario: Scenario,
  590. operation: EventOperation,
  591. /,
  592. attribute_name: Optional[str] = None,
  593. attribute_value: Optional[Any] = None,
  594. **kwargs,
  595. ) -> Event:
  596. metadata = {"config_id": scenario.config_id, "version": scenario._version, **kwargs}
  597. return Event(
  598. entity_type=EventEntityType.SCENARIO,
  599. entity_id=scenario.id,
  600. operation=operation,
  601. attribute_name=attribute_name,
  602. attribute_value=attribute_value,
  603. metadata=metadata,
  604. )