123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734 |
- # Copyright 2021-2024 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- from __future__ import annotations
- import pathlib
- import uuid
- from datetime import datetime
- from typing import Any, Callable, Dict, List, Optional, Set, Union
- import networkx as nx
- from taipy.config.common._template_handler import _TemplateHandler as _tpl
- from taipy.config.common._validate_id import _validate_id
- from .._entity._entity import _Entity
- from .._entity._labeled import _Labeled
- from .._entity._properties import _Properties
- from .._entity._reload import _Reloader, _self_reload, _self_setter
- from .._entity.submittable import Submittable
- from .._version._version_manager_factory import _VersionManagerFactory
- from ..common._listattributes import _ListAttributes
- from ..common._utils import _Subscriber
- from ..cycle.cycle import Cycle
- from ..data.data_node import DataNode
- from ..data.data_node_id import DataNodeId
- from ..exceptions.exceptions import (
- InvalidSequence,
- NonExistingDataNode,
- NonExistingSequence,
- NonExistingTask,
- SequenceAlreadyExists,
- SequenceTaskDoesNotExistInScenario,
- )
- from ..job.job import Job
- from ..notification import Event, EventEntityType, EventOperation, Notifier, _make_event
- from ..sequence.sequence import Sequence
- from ..submission.submission import Submission
- from ..task.task import Task
- from ..task.task_id import TaskId
- from .scenario_id import ScenarioId
- class Scenario(_Entity, Submittable, _Labeled):
- """Instance of a Business case to solve.
- A scenario holds a set of tasks (instances of `Task^` class) to submit for execution in order to
- solve the Business case. It also holds a set of additional data nodes (instances of `DataNode` class)
- for extra data related to the scenario.
- !!! note
- It is not recommended to instantiate a `Scenario` directly. Instead, it should be
- created with the `create_scenario()^` function.
- !!! Example
- ```python
- import taipy as tp
- from taipy import Config
- def by_two(x: int):
- return x * 2
- # Configure scenarios
- input_cfg = Config.configure_data_node("my_input")
- result_cfg = Config.configure_data_node("my_result")
- task_cfg = Config.configure_task("my_double", function=by_two, input=input_cfg, output=result_cfg)
- scenario_cfg = Config.configure_scenario("my_scenario", task_configs=[task_cfg])
- # Create a new scenario from the configuration
- scenario = tp.create_scenario(scenario_cfg)
- # Write the input data and submit the scenario
- scenario.my_input.write(3)
- scenario.submit()
- # Read the result
- print(scenario.my_result.read()) # Output: 6
- # Retrieve all scenarios
- all_scenarios = tp.get_scenarios()
- ```
- Attributes:
- config_id (str): The identifier of the `ScenarioConfig^`.
- tasks (Set[Task^]): The set of tasks.
- additional_data_nodes (Set[DataNode^]): The set of additional data nodes.
- sequences (Dict[str, Sequence^]): The dictionary of sequences: subsets of tasks that can be submitted
- together independently of the rest of the scenario's tasks.
- properties (dict[str, Any]): A dictionary of additional properties.
- scenario_id (str): The unique identifier of this scenario.
- creation_date (datetime): The date and time of the scenario's creation.
- is_primary (bool): True if the scenario is the primary of its cycle. False otherwise.
- cycle (Cycle^): The cycle of the scenario.
- subscribers (List[Callable]): The list of callbacks to be called on `Job^`'s status change.
- tags (Set[str]): The list of scenario's tags.
- version (str): The string indicates the application version of the scenario to instantiate.
- If not provided, the latest version is used.
- """
- _ID_PREFIX = "SCENARIO"
- _MANAGER_NAME = "scenario"
- _MIGRATED_SEQUENCES_KEY = "sequences"
- __SEPARATOR = "_"
- _SEQUENCE_TASKS_KEY = "tasks"
- _SEQUENCE_PROPERTIES_KEY = "properties"
- _SEQUENCE_SUBSCRIBERS_KEY = "subscribers"
- def __init__(
- self,
- config_id: str,
- tasks: Optional[Union[Set[TaskId], Set[Task]]],
- properties: Dict[str, Any],
- additional_data_nodes: Optional[Union[Set[DataNodeId], Set[DataNode]]] = None,
- scenario_id: Optional[ScenarioId] = None,
- creation_date: Optional[datetime] = None,
- is_primary: bool = False,
- cycle: Optional[Cycle] = None,
- subscribers: Optional[List[_Subscriber]] = None,
- tags: Optional[Set[str]] = None,
- version: str = None,
- sequences: Optional[Dict[str, Dict]] = None,
- ):
- self._config_id = _validate_id(config_id)
- self.id: ScenarioId = scenario_id or self._new_id(self.config_id)
- super().__init__(self.id, subscribers or [])
- self._tasks: Union[Set[TaskId], Set[Task], Set] = tasks or set()
- self._additional_data_nodes: Union[Set[DataNodeId], Set[DataNode], Set] = additional_data_nodes or set()
- self._creation_date = creation_date or datetime.now()
- self._cycle = cycle
- self._primary_scenario = is_primary
- self._tags = tags or set()
- self._properties = _Properties(self, **properties)
- self._sequences: Dict[str, Dict] = sequences or {}
- _scenario_task_ids = {task.id if isinstance(task, Task) else task for task in self._tasks}
- for sequence_name, sequence_data in self._sequences.items():
- sequence_task_ids = {task.id if isinstance(task, Task) else task for task in sequence_data.get("tasks", [])}
- self.__check_sequence_tasks_exist_in_scenario_tasks(
- sequence_name, sequence_task_ids, self.id, _scenario_task_ids
- )
- self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
- @staticmethod
- def _new_id(config_id: str) -> ScenarioId:
- """Generate a unique scenario identifier."""
- return ScenarioId(Scenario.__SEPARATOR.join([Scenario._ID_PREFIX, _validate_id(config_id), str(uuid.uuid4())]))
- def __getstate__(self):
- return self.id
- def __setstate__(self, id):
- from ... import core as tp
- sc = tp.get(id)
- self.__dict__ = sc.__dict__
- def __hash__(self):
- return hash(self.id)
- def __eq__(self, other):
- return isinstance(other, Scenario) and self.id == other.id
- def __getattr__(self, attribute_name):
- protected_attribute_name = _validate_id(attribute_name)
- if protected_attribute_name in self._properties:
- return _tpl._replace_templates(self._properties[protected_attribute_name])
- sequences = self._get_sequences()
- if protected_attribute_name in sequences:
- return sequences[protected_attribute_name]
- tasks = self.tasks
- if protected_attribute_name in tasks:
- return tasks[protected_attribute_name]
- data_nodes = self.data_nodes
- if protected_attribute_name in data_nodes:
- return data_nodes[protected_attribute_name]
- raise AttributeError(f"{attribute_name} is not an attribute of scenario {self.id}")
- @property
- def config_id(self):
- return self._config_id
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def sequences(self) -> Dict[str, Sequence]:
- return self._get_sequences()
- @sequences.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def sequences(
- self, sequences: Dict[str, Dict[str, Union[List[Task], List[TaskId], _ListAttributes, List[_Subscriber], Dict]]]
- ):
- self._sequences = sequences
- actual_sequences = self._get_sequences()
- for sequence_name in sequences.keys():
- if not actual_sequences[sequence_name]._is_consistent():
- raise InvalidSequence(actual_sequences[sequence_name].id)
- def add_sequence(
- self,
- name: str,
- tasks: Union[List[Task], List[TaskId]],
- properties: Optional[Dict] = None,
- subscribers: Optional[List[_Subscriber]] = None,
- ):
- """Add a sequence to the scenario.
- Parameters:
- name (str): The name of the sequence.
- tasks (Union[List[Task], List[TaskId]]): The list of scenario's tasks to add to the sequence.
- properties (Optional[Dict]): The optional properties of the sequence.
- subscribers (Optional[List[_Subscriber]]): The optional list of callbacks to be called on
- `Job^`'s status change.
- Raises:
- SequenceTaskDoesNotExistInScenario^: If a task in the sequence does not exist in the scenario.
- SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
- """
- if name in self.sequences:
- raise SequenceAlreadyExists(name, self.id)
- seq = self._set_sequence(name, tasks, properties, subscribers)
- Notifier.publish(_make_event(seq, EventOperation.CREATION))
- def update_sequence(
- self,
- name: str,
- tasks: Union[List[Task], List[TaskId]],
- properties: Optional[Dict] = None,
- subscribers: Optional[List[_Subscriber]] = None,
- ):
- """Update an existing sequence.
- Parameters:
- name (str): The name of the sequence to update.
- tasks (Union[List[Task], List[TaskId]]): The new list of scenario's tasks.
- properties (Optional[Dict]): The new properties of the sequence.
- subscribers (Optional[List[_Subscriber]]): The new list of callbacks to be called on `Job^`'s status change.
- Raises:
- SequenceTaskDoesNotExistInScenario^: If a task in the list does not exist in the scenario.
- SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
- """
- if name not in self.sequences:
- raise NonExistingSequence(name, self.id)
- seq = self._set_sequence(name, tasks, properties, subscribers)
- Notifier.publish(_make_event(seq, EventOperation.UPDATE))
- def _set_sequence(
- self,
- name: str,
- tasks: Union[List[Task], List[TaskId]],
- properties: Optional[Dict] = None,
- subscribers: Optional[List[_Subscriber]] = None,
- ) -> Sequence:
- _scenario = _Reloader()._reload(self._MANAGER_NAME, self)
- _scenario_task_ids = {task.id if isinstance(task, Task) else task for task in _scenario._tasks}
- _sequence_task_ids: Set[TaskId] = {task.id if isinstance(task, Task) else task for task in tasks}
- self.__check_sequence_tasks_exist_in_scenario_tasks(name, _sequence_task_ids, self.id, _scenario_task_ids)
- from taipy.core.sequence._sequence_manager_factory import _SequenceManagerFactory
- seq_manager = _SequenceManagerFactory._build_manager()
- seq = seq_manager._create(name, tasks, subscribers or [], properties or {}, self.id, self.version)
- if not seq._is_consistent():
- raise InvalidSequence(name)
- _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
- _sequences.update(
- {
- name: {
- self._SEQUENCE_TASKS_KEY: tasks,
- self._SEQUENCE_PROPERTIES_KEY: properties or {},
- self._SEQUENCE_SUBSCRIBERS_KEY: subscribers or [],
- }
- }
- )
- self.sequences = _sequences # type: ignore
- return seq
- def add_sequences(self, sequences: Dict[str, Union[List[Task], List[TaskId]]]):
- """Add multiple sequences to the scenario.
- Note:
- To provide properties and subscribers for the sequences, use `Scenario.add_sequence^` instead.
- Parameters:
- sequences (Dict[str, Union[List[Task], List[TaskId]]]):
- A dictionary containing sequences to add. Each key is a sequence name, and the value must
- be a list of the scenario tasks.
- Raises:
- SequenceTaskDoesNotExistInScenario^: If a task in the sequence does not exist in the scenario.
- """
- _scenario = _Reloader()._reload(self._MANAGER_NAME, self)
- _sc_task_ids = {task.id if isinstance(task, Task) else task for task in _scenario._tasks}
- for name, tasks in sequences.items():
- _seq_task_ids: Set[TaskId] = {task.id if isinstance(task, Task) else task for task in tasks}
- self.__check_sequence_tasks_exist_in_scenario_tasks(name, _seq_task_ids, self.id, _sc_task_ids)
- # Need to parse twice the sequences to avoid adding some sequences and not others in case of exception
- for name, tasks in sequences.items():
- self.add_sequence(name, tasks)
- def remove_sequence(self, name: str):
- """Remove a sequence from the scenario.
- Parameters:
- name (str): The name of the sequence to remove.
- """
- seq_id = self.sequences[name].id
- _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
- _sequences.pop(name)
- self.sequences = _sequences # type: ignore
- Notifier.publish(Event(EventEntityType.SEQUENCE, EventOperation.DELETION, entity_id=seq_id))
- def remove_sequences(self, sequence_names: List[str]):
- """
- Remove multiple sequences from the scenario.
- Parameters:
- sequence_names (List[str]): A list of sequence names to remove.
- """
- _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
- for sequence_name in sequence_names:
- seq_id = self.sequences[sequence_name].id
- _sequences.pop(sequence_name)
- Notifier.publish(
- Event(
- EventEntityType.SEQUENCE,
- EventOperation.DELETION,
- entity_id=seq_id,
- )
- )
- self.sequences = _sequences # type: ignore
- def rename_sequence(self, old_name, new_name):
- """Rename a sequence of the scenario.
- Parameters:
- old_name (str): The current name of the sequence to rename.
- new_name (str): The new name of the sequence.
- Raises:
- SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
- """
- if old_name == new_name:
- return
- if new_name in self.sequences:
- raise SequenceAlreadyExists(new_name, self.id)
- self._sequences[new_name] = self._sequences[old_name]
- del self._sequences[old_name]
- self.sequences = self._sequences # type: ignore
- Notifier.publish(
- Event(
- EventEntityType.SCENARIO,
- EventOperation.UPDATE,
- entity_id=self.id,
- attribute_name="sequences",
- attribute_value=self._sequences,
- )
- )
- @staticmethod
- def __check_sequence_tasks_exist_in_scenario_tasks(
- sequence_name: str, sequence_task_ids: Set[TaskId], scenario_id: ScenarioId, scenario_task_ids: Set[TaskId]
- ):
- non_existing_sequence_task_ids_in_scenario = set()
- for sequence_task_id in sequence_task_ids:
- if sequence_task_id not in scenario_task_ids:
- non_existing_sequence_task_ids_in_scenario.add(sequence_task_id)
- if len(non_existing_sequence_task_ids_in_scenario) > 0:
- raise SequenceTaskDoesNotExistInScenario(
- list(non_existing_sequence_task_ids_in_scenario), sequence_name, scenario_id
- )
- def _get_sequences(self) -> Dict[str, Sequence]:
- _sequences = {}
- from ..sequence._sequence_manager_factory import _SequenceManagerFactory
- sequence_manager = _SequenceManagerFactory._build_manager()
- for sequence_name, sequence_data in self._sequences.items():
- p = sequence_manager._create(
- sequence_name,
- sequence_data.get(self._SEQUENCE_TASKS_KEY, []),
- sequence_data.get(self._SEQUENCE_SUBSCRIBERS_KEY, []),
- sequence_data.get(self._SEQUENCE_PROPERTIES_KEY, {}),
- self.id,
- self.version,
- )
- if not isinstance(p, Sequence):
- raise NonExistingSequence(sequence_name, self.id)
- _sequences[sequence_name] = p
- return _sequences
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def tasks(self) -> Dict[str, Task]:
- return self.__get_tasks()
- def __get_tasks(self) -> Dict[str, Task]:
- from ..task._task_manager_factory import _TaskManagerFactory
- _tasks = {}
- task_manager = _TaskManagerFactory._build_manager()
- for task_or_id in self._tasks:
- t = task_manager._get(task_or_id, task_or_id)
- if not isinstance(t, Task):
- raise NonExistingTask(task_or_id)
- _tasks[t.config_id] = t
- return _tasks
- @tasks.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def tasks(self, val: Union[Set[TaskId], Set[Task]]):
- self._tasks = set(val)
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def additional_data_nodes(self) -> Dict[str, DataNode]:
- return self.__get_additional_data_nodes()
- def __get_additional_data_nodes(self):
- from ..data._data_manager_factory import _DataManagerFactory
- additional_data_nodes = {}
- data_manager = _DataManagerFactory._build_manager()
- for dn_or_id in self._additional_data_nodes:
- dn = data_manager._get(dn_or_id, dn_or_id)
- if not isinstance(dn, DataNode):
- raise NonExistingDataNode(dn_or_id)
- additional_data_nodes[dn.config_id] = dn
- return additional_data_nodes
- @additional_data_nodes.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def additional_data_nodes(self, val: Union[Set[TaskId], Set[DataNode]]):
- self._additional_data_nodes = set(val)
- def _get_set_of_tasks(self) -> Set[Task]:
- return set(self.tasks.values())
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def data_nodes(self) -> Dict[str, DataNode]:
- data_nodes_dict = self.__get_additional_data_nodes()
- for _, task in self.__get_tasks().items():
- data_nodes_dict.update(task.data_nodes)
- return data_nodes_dict
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def creation_date(self):
- return self._creation_date
- @creation_date.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def creation_date(self, val):
- self._creation_date = val
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def cycle(self):
- return self._cycle
- @cycle.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def cycle(self, val):
- self._cycle = val
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def is_primary(self):
- return self._primary_scenario
- @is_primary.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def is_primary(self, val):
- self._primary_scenario = val
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def subscribers(self):
- return self._subscribers
- @subscribers.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def subscribers(self, val):
- self._subscribers = _ListAttributes(self, val)
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def tags(self):
- return self._tags
- @tags.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def tags(self, val):
- self._tags = val or set()
- @property
- def version(self):
- return self._version
- @property
- def owner_id(self):
- return self._cycle.id
- @property
- def properties(self):
- self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
- return self._properties
- @property # type: ignore
- def name(self) -> Optional[str]:
- return self.properties.get("name")
- @name.setter # type: ignore
- def name(self, val):
- self.properties["name"] = val
- def has_tag(self, tag: str) -> bool:
- """Indicate if the scenario has a given tag.
- Parameters:
- tag (str): The tag to search among the set of scenario's tags.
- Returns:
- True if the scenario has the tag given as parameter. False otherwise.
- """
- return tag in self.tags
- def _add_tag(self, tag: str):
- self._tags = _Reloader()._reload("scenario", self)._tags
- self._tags.add(tag)
- def _remove_tag(self, tag: str):
- self._tags = _Reloader()._reload("scenario", self)._tags
- if self.has_tag(tag):
- self._tags.remove(tag)
- def subscribe(
- self,
- callback: Callable[[Scenario, Job], None],
- params: Optional[List[Any]] = None,
- ):
- """Subscribe a function to be called on `Job^` status change.
- The subscription is applied to all jobs created from the scenario's execution.
- Parameters:
- callback (Callable[[Scenario^, Job^], None]): The callable function to be called
- on status change.
- params (Optional[List[Any]]): The parameters to be passed to the _callback_.
- Note:
- Notification will be available only for jobs created after this subscription.
- """
- from ... import core as tp
- return tp.subscribe_scenario(callback, params, self)
- def unsubscribe(self, callback: Callable[[Scenario, Job], None], params: Optional[List[Any]] = None):
- """Unsubscribe a function that is called when the status of a `Job^` changes.
- Parameters:
- callback (Callable[[Scenario^, Job^], None]): The callable function to unsubscribe.
- params (Optional[List[Any]]): The parameters to be passed to the _callback_.
- Note:
- The function will continue to be called for ongoing jobs.
- """
- from ... import core as tp
- return tp.unsubscribe_scenario(callback, params, self)
- def submit(
- self,
- callbacks: Optional[List[Callable]] = None,
- force: bool = False,
- wait: bool = False,
- timeout: Optional[Union[float, int]] = None,
- **properties,
- ) -> Submission:
- """Submit this scenario for execution.
- All the `Task^`s of the scenario will be submitted for execution.
- Parameters:
- callbacks (List[Callable]): The list of callable functions to be called on status
- change.
- force (bool): Force execution even if the data nodes are in cache.
- wait (bool): Wait for the orchestrated jobs created from the scenario submission to be finished in
- asynchronous mode.
- timeout (Union[float, int]): The optional maximum number of seconds to wait for the jobs to be finished
- before returning.
- **properties (dict[str, any]): A keyworded variable length list of additional arguments.
- Returns:
- A `Submission^` containing the information of the submission.
- """
- from ._scenario_manager_factory import _ScenarioManagerFactory
- return _ScenarioManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout, **properties)
- def export(
- self,
- folder_path: Union[str, pathlib.Path],
- override: bool = False,
- include_data: bool = False,
- ):
- """Export all related entities of this scenario to a folder.
- Parameters:
- folder_path (Union[str, pathlib.Path]): The folder path to export the scenario to.
- If the path exists and the override parameter is False, an exception is raised.
- override (bool): If True, the existing folder will be overridden. Default is False.
- include_data (bool): If True, the file-based data nodes are exported as well.
- This includes Pickle, CSV, Excel, Parquet, and JSON data nodes.
- If the scenario has a data node that is not file-based, a warning will be logged, and the data node
- will not be exported. The default value is False.
- Raises:
- ExportFolderAlreadyExist^: If the `folder_path` already exists and the override parameter is False.
- """
- from ... import core as tp
- return tp.export_scenario(self.id, folder_path, override, include_data)
- def set_primary(self):
- """Promote the scenario as the primary scenario of its cycle.
- If the cycle already has a primary scenario, it will be demoted, and it will no longer
- be primary for the cycle.
- """
- from ... import core as tp
- return tp.set_primary(self)
- def add_tag(self, tag: str):
- """Add a tag to this scenario.
- If the scenario's cycle already have another scenario tagged with _tag_ the other
- scenario will be untagged.
- Parameters:
- tag (str): The tag to add to this scenario.
- """
- from ... import core as tp
- return tp.tag(self, tag)
- def remove_tag(self, tag: str):
- """Remove a tag from this scenario.
- Parameters:
- tag (str): The tag to remove from the set of the scenario's tags.
- """
- from ... import core as tp
- return tp.untag(self, tag)
- def is_deletable(self) -> bool:
- """Indicate if the scenario can be deleted.
- Returns:
- True if the scenario can be deleted. False otherwise.
- """
- from ... import core as tp
- return tp.is_deletable(self)
- def get_label(self) -> str:
- """Returns the scenario simple label prefixed by its owner label.
- Returns:
- The label of the scenario as a string.
- """
- return self._get_label()
- def get_simple_label(self) -> str:
- """Returns the scenario simple label.
- Returns:
- The simple label of the scenario as a string.
- """
- return self._get_simple_label()
- def _is_consistent(self) -> bool:
- dag = self._build_dag()
- if dag.number_of_nodes() == 0:
- return True
- if not nx.is_directed_acyclic_graph(dag):
- return False
- for left_node, right_node in dag.edges:
- if (isinstance(left_node, DataNode) and isinstance(right_node, Task)) or (
- isinstance(left_node, Task) and isinstance(right_node, DataNode)
- ):
- continue
- return False
- return True
- @_make_event.register(Scenario)
- def _make_event_for_scenario(
- scenario: Scenario,
- operation: EventOperation,
- /,
- attribute_name: Optional[str] = None,
- attribute_value: Optional[Any] = None,
- **kwargs,
- ) -> Event:
- metadata = {"config_id": scenario.config_id, "version": scenario._version, **kwargs}
- return Event(
- entity_type=EventEntityType.SCENARIO,
- entity_id=scenario.id,
- operation=operation,
- attribute_name=attribute_name,
- attribute_value=attribute_value,
- metadata=metadata,
- )
|