|
@@ -63,7 +63,7 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
It is not recommended to instantiate a `Scenario` directly. Instead, it should be
|
|
|
created with the `create_scenario()^` function.
|
|
|
|
|
|
- !!! Example
|
|
|
+ ??? Example
|
|
|
|
|
|
```python
|
|
|
import taipy as tp
|
|
@@ -92,22 +92,6 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
# Retrieve all scenarios
|
|
|
all_scenarios = tp.get_scenarios()
|
|
|
```
|
|
|
-
|
|
|
- Attributes:
|
|
|
- config_id (str): The identifier of the `ScenarioConfig^`.
|
|
|
- tasks (Set[Task^]): The set of tasks.
|
|
|
- additional_data_nodes (Set[DataNode^]): The set of additional data nodes.
|
|
|
- sequences (Dict[str, Sequence^]): The dictionary of sequences: subsets of tasks that can be submitted
|
|
|
- together independently of the rest of the scenario's tasks.
|
|
|
- properties (dict[str, Any]): A dictionary of additional properties.
|
|
|
- scenario_id (str): The unique identifier of this scenario.
|
|
|
- creation_date (datetime): The date and time of the scenario's creation.
|
|
|
- is_primary (bool): True if the scenario is the primary of its cycle. False otherwise.
|
|
|
- cycle (Cycle^): The cycle of the scenario.
|
|
|
- subscribers (List[Callable]): The list of callbacks to be called on `Job^`'s status change.
|
|
|
- tags (Set[str]): The list of scenario's tags.
|
|
|
- version (str): The string indicates the application version of the scenario to instantiate.
|
|
|
- If not provided, the latest version is used.
|
|
|
"""
|
|
|
|
|
|
_ID_PREFIX = "SCENARIO"
|
|
@@ -119,6 +103,9 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
_SEQUENCE_SUBSCRIBERS_KEY = "subscribers"
|
|
|
__CHECK_INIT_DONE_ATTR_NAME = "_init_done"
|
|
|
|
|
|
+ id: ScenarioId
|
|
|
+ """The unique identifier of this scenario."""
|
|
|
+
|
|
|
def __init__(
|
|
|
self,
|
|
|
config_id: str,
|
|
@@ -158,24 +145,21 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
|
|
|
self._init_done = True
|
|
|
|
|
|
- @staticmethod
|
|
|
- def _new_id(config_id: str) -> ScenarioId:
|
|
|
- """Generate a unique scenario identifier."""
|
|
|
- return ScenarioId(Scenario.__SEPARATOR.join([Scenario._ID_PREFIX, _validate_id(config_id), str(uuid.uuid4())]))
|
|
|
-
|
|
|
def __getstate__(self):
|
|
|
return self.id
|
|
|
|
|
|
- def __setstate__(self, id):
|
|
|
+ def __setstate__(self, id) -> None:
|
|
|
from ... import core as tp
|
|
|
|
|
|
sc = tp.get(id)
|
|
|
self.__dict__ = sc.__dict__
|
|
|
|
|
|
- def __hash__(self):
|
|
|
+ def __hash__(self) -> int:
|
|
|
+ """Return the hash of the scenario."""
|
|
|
return hash(self.id)
|
|
|
|
|
|
- def __eq__(self, other):
|
|
|
+ def __eq__(self, other) -> bool:
|
|
|
+ """Check if the scenario is equal to another scenario."""
|
|
|
return isinstance(other, Scenario) and self.id == other.id
|
|
|
|
|
|
def __setattr__(self, name: str, value: Any) -> None:
|
|
@@ -188,7 +172,20 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
except AttributeError:
|
|
|
return super().__setattr__(name, value)
|
|
|
|
|
|
- def __getattr__(self, attribute_name) -> Union[Sequence, Task, DataNode]:
|
|
|
+ def __getattr__(self, attribute_name: str) -> Union[Sequence, Task, DataNode]:
|
|
|
+ """Get a scenario attribute by its name.
|
|
|
+
|
|
|
+ The attribute can be a sequence, a task, or a data node.
|
|
|
+
|
|
|
+ Parameters:
|
|
|
+ attribute_name (str): The name of the attribute to get.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ The attribute with the given name.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ AttributeError: If the attribute is not found.
|
|
|
+ """
|
|
|
protected_attribute_name = _validate_id(attribute_name)
|
|
|
sequences = self._get_sequences()
|
|
|
if protected_attribute_name in sequences:
|
|
@@ -203,351 +200,143 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
raise AttributeError(f"{attribute_name} is not an attribute of scenario {self.id}")
|
|
|
|
|
|
@property
|
|
|
- def config_id(self):
|
|
|
+ def config_id(self) -> str:
|
|
|
+ """The identifier of the `ScenarioConfig^`."""
|
|
|
return self._config_id
|
|
|
|
|
|
@property # type: ignore
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
def sequences(self) -> Dict[str, Sequence]:
|
|
|
+ """The dictionary of the scenario's sequences.
|
|
|
+
|
|
|
+ The sequences are subsets of tasks that can be submitted together independently of
|
|
|
+ the rest of the scenario's tasks."""
|
|
|
return self._get_sequences()
|
|
|
|
|
|
@sequences.setter # type: ignore
|
|
|
@_self_setter(_MANAGER_NAME)
|
|
|
def sequences(
|
|
|
self, sequences: Dict[str, Dict[str, Union[List[Task], List[TaskId], _ListAttributes, List[_Subscriber], Dict]]]
|
|
|
- ):
|
|
|
+ ) -> None:
|
|
|
self._sequences = sequences
|
|
|
actual_sequences = self._get_sequences()
|
|
|
for sequence_name in sequences.keys():
|
|
|
if not actual_sequences[sequence_name]._is_consistent():
|
|
|
raise InvalidSequence(actual_sequences[sequence_name].id)
|
|
|
|
|
|
- def add_sequence(
|
|
|
- self,
|
|
|
- name: str,
|
|
|
- tasks: Union[List[Task], List[TaskId]],
|
|
|
- properties: Optional[Dict] = None,
|
|
|
- subscribers: Optional[List[_Subscriber]] = None,
|
|
|
- ):
|
|
|
- """Add a sequence to the scenario.
|
|
|
-
|
|
|
- Parameters:
|
|
|
- name (str): The name of the sequence.
|
|
|
- tasks (Union[List[Task], List[TaskId]]): The list of scenario's tasks to add to the sequence.
|
|
|
- properties (Optional[Dict]): The optional properties of the sequence.
|
|
|
- subscribers (Optional[List[_Subscriber]]): The optional list of callbacks to be called on
|
|
|
- `Job^`'s status change.
|
|
|
-
|
|
|
- Raises:
|
|
|
- SequenceTaskDoesNotExistInScenario^: If a task in the sequence does not exist in the scenario.
|
|
|
- SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
|
|
|
- """
|
|
|
- if name in self.sequences:
|
|
|
- raise SequenceAlreadyExists(name, self.id)
|
|
|
- seq = self._set_sequence(name, tasks, properties, subscribers)
|
|
|
- Notifier.publish(_make_event(seq, EventOperation.CREATION))
|
|
|
-
|
|
|
- def update_sequence(
|
|
|
- self,
|
|
|
- name: str,
|
|
|
- tasks: Union[List[Task], List[TaskId]],
|
|
|
- properties: Optional[Dict] = None,
|
|
|
- subscribers: Optional[List[_Subscriber]] = None,
|
|
|
- ):
|
|
|
- """Update an existing sequence.
|
|
|
-
|
|
|
- Parameters:
|
|
|
- name (str): The name of the sequence to update.
|
|
|
- tasks (Union[List[Task], List[TaskId]]): The new list of scenario's tasks.
|
|
|
- properties (Optional[Dict]): The new properties of the sequence.
|
|
|
- subscribers (Optional[List[_Subscriber]]): The new list of callbacks to be called on `Job^`'s status change.
|
|
|
-
|
|
|
- Raises:
|
|
|
- SequenceTaskDoesNotExistInScenario^: If a task in the list does not exist in the scenario.
|
|
|
- SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
|
|
|
- """
|
|
|
- if name not in self.sequences:
|
|
|
- raise NonExistingSequence(name, self.id)
|
|
|
- seq = self._set_sequence(name, tasks, properties, subscribers)
|
|
|
- Notifier.publish(_make_event(seq, EventOperation.UPDATE))
|
|
|
-
|
|
|
- def _set_sequence(
|
|
|
- self,
|
|
|
- name: str,
|
|
|
- tasks: Union[List[Task], List[TaskId]],
|
|
|
- properties: Optional[Dict] = None,
|
|
|
- subscribers: Optional[List[_Subscriber]] = None,
|
|
|
- ) -> Sequence:
|
|
|
- _scenario = _Reloader()._reload(self._MANAGER_NAME, self)
|
|
|
- _scenario_task_ids = {task.id if isinstance(task, Task) else task for task in _scenario._tasks}
|
|
|
- _sequence_task_ids: Set[TaskId] = {task.id if isinstance(task, Task) else task for task in tasks}
|
|
|
- self.__check_sequence_tasks_exist_in_scenario_tasks(name, _sequence_task_ids, self.id, _scenario_task_ids)
|
|
|
-
|
|
|
- from taipy.core.sequence._sequence_manager_factory import _SequenceManagerFactory
|
|
|
-
|
|
|
- seq_manager = _SequenceManagerFactory._build_manager()
|
|
|
- seq = seq_manager._create(name, tasks, subscribers or [], properties or {}, self.id, self.version)
|
|
|
-
|
|
|
- _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
|
|
|
- _sequences.update(
|
|
|
- {
|
|
|
- name: {
|
|
|
- self._SEQUENCE_TASKS_KEY: tasks,
|
|
|
- self._SEQUENCE_PROPERTIES_KEY: properties or {},
|
|
|
- self._SEQUENCE_SUBSCRIBERS_KEY: subscribers or [],
|
|
|
- }
|
|
|
- }
|
|
|
- )
|
|
|
- self.sequences = _sequences # type: ignore
|
|
|
- return seq
|
|
|
-
|
|
|
- def add_sequences(self, sequences: Dict[str, Union[List[Task], List[TaskId]]]):
|
|
|
- """Add multiple sequences to the scenario.
|
|
|
-
|
|
|
- Note:
|
|
|
- To provide properties and subscribers for the sequences, use `Scenario.add_sequence^` instead.
|
|
|
-
|
|
|
- Parameters:
|
|
|
- sequences (Dict[str, Union[List[Task], List[TaskId]]]):
|
|
|
- A dictionary containing sequences to add. Each key is a sequence name, and the value must
|
|
|
- be a list of the scenario tasks.
|
|
|
-
|
|
|
- Raises:
|
|
|
- SequenceTaskDoesNotExistInScenario^: If a task in the sequence does not exist in the scenario.
|
|
|
- """
|
|
|
- _scenario = _Reloader()._reload(self._MANAGER_NAME, self)
|
|
|
- _sc_task_ids = {task.id if isinstance(task, Task) else task for task in _scenario._tasks}
|
|
|
- for name, tasks in sequences.items():
|
|
|
- _seq_task_ids: Set[TaskId] = {task.id if isinstance(task, Task) else task for task in tasks}
|
|
|
- self.__check_sequence_tasks_exist_in_scenario_tasks(name, _seq_task_ids, self.id, _sc_task_ids)
|
|
|
- # Need to parse twice the sequences to avoid adding some sequences and not others in case of exception
|
|
|
- for name, tasks in sequences.items():
|
|
|
- self.add_sequence(name, tasks)
|
|
|
-
|
|
|
- def remove_sequence(self, name: str):
|
|
|
- """Remove a sequence from the scenario.
|
|
|
-
|
|
|
- Parameters:
|
|
|
- name (str): The name of the sequence to remove.
|
|
|
- """
|
|
|
- seq_id = self.sequences[name].id
|
|
|
- _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
|
|
|
- _sequences.pop(name)
|
|
|
- self.sequences = _sequences # type: ignore
|
|
|
- Notifier.publish(Event(EventEntityType.SEQUENCE, EventOperation.DELETION, entity_id=seq_id))
|
|
|
-
|
|
|
- def remove_sequences(self, sequence_names: List[str]):
|
|
|
- """
|
|
|
- Remove multiple sequences from the scenario.
|
|
|
-
|
|
|
- Parameters:
|
|
|
- sequence_names (List[str]): A list of sequence names to remove.
|
|
|
- """
|
|
|
- _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
|
|
|
- for sequence_name in sequence_names:
|
|
|
- seq_id = self.sequences[sequence_name].id
|
|
|
- _sequences.pop(sequence_name)
|
|
|
- Notifier.publish(
|
|
|
- Event(
|
|
|
- EventEntityType.SEQUENCE,
|
|
|
- EventOperation.DELETION,
|
|
|
- entity_id=seq_id,
|
|
|
- )
|
|
|
- )
|
|
|
- self.sequences = _sequences # type: ignore
|
|
|
-
|
|
|
- def rename_sequence(self, old_name, new_name):
|
|
|
- """Rename a sequence of the scenario.
|
|
|
-
|
|
|
- Parameters:
|
|
|
- old_name (str): The current name of the sequence to rename.
|
|
|
- new_name (str): The new name of the sequence.
|
|
|
-
|
|
|
- Raises:
|
|
|
- SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
|
|
|
- """
|
|
|
- if old_name == new_name:
|
|
|
- return
|
|
|
- if new_name in self.sequences:
|
|
|
- raise SequenceAlreadyExists(new_name, self.id)
|
|
|
- self._sequences[new_name] = self._sequences[old_name]
|
|
|
- del self._sequences[old_name]
|
|
|
- self.sequences = self._sequences # type: ignore
|
|
|
- Notifier.publish(
|
|
|
- Event(
|
|
|
- EventEntityType.SCENARIO,
|
|
|
- EventOperation.UPDATE,
|
|
|
- entity_id=self.id,
|
|
|
- attribute_name="sequences",
|
|
|
- attribute_value=self._sequences,
|
|
|
- )
|
|
|
- )
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def __check_sequence_tasks_exist_in_scenario_tasks(
|
|
|
- sequence_name: str, sequence_task_ids: Set[TaskId], scenario_id: ScenarioId, scenario_task_ids: Set[TaskId]
|
|
|
- ):
|
|
|
- non_existing_sequence_task_ids_in_scenario = set()
|
|
|
- for sequence_task_id in sequence_task_ids:
|
|
|
- if sequence_task_id not in scenario_task_ids:
|
|
|
- non_existing_sequence_task_ids_in_scenario.add(sequence_task_id)
|
|
|
- if len(non_existing_sequence_task_ids_in_scenario) > 0:
|
|
|
- raise SequenceTaskDoesNotExistInScenario(
|
|
|
- list(non_existing_sequence_task_ids_in_scenario), sequence_name, scenario_id
|
|
|
- )
|
|
|
-
|
|
|
- def _get_sequences(self) -> Dict[str, Sequence]:
|
|
|
- _sequences = {}
|
|
|
-
|
|
|
- from ..sequence._sequence_manager_factory import _SequenceManagerFactory
|
|
|
-
|
|
|
- sequence_manager = _SequenceManagerFactory._build_manager()
|
|
|
-
|
|
|
- for sequence_name, sequence_data in self._sequences.items():
|
|
|
- sequence = sequence_manager._build_sequence(
|
|
|
- sequence_name,
|
|
|
- sequence_data.get(self._SEQUENCE_TASKS_KEY, []),
|
|
|
- sequence_data.get(self._SEQUENCE_SUBSCRIBERS_KEY, []),
|
|
|
- sequence_data.get(self._SEQUENCE_PROPERTIES_KEY, {}),
|
|
|
- self.id,
|
|
|
- self.version,
|
|
|
- )
|
|
|
- if not isinstance(sequence, Sequence):
|
|
|
- raise NonExistingSequence(sequence_name, self.id)
|
|
|
- _sequences[sequence_name] = sequence
|
|
|
- return _sequences
|
|
|
-
|
|
|
@property # type: ignore
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
def tasks(self) -> Dict[str, Task]:
|
|
|
+ """The dictionary of the scenario's tasks."""
|
|
|
return self.__get_tasks()
|
|
|
|
|
|
- def __get_tasks(self) -> Dict[str, Task]:
|
|
|
- from ..task._task_manager_factory import _TaskManagerFactory
|
|
|
-
|
|
|
- _tasks = {}
|
|
|
- task_manager = _TaskManagerFactory._build_manager()
|
|
|
-
|
|
|
- for task_or_id in self._tasks:
|
|
|
- t = task_manager._get(task_or_id, task_or_id)
|
|
|
-
|
|
|
- if not isinstance(t, Task):
|
|
|
- raise NonExistingTask(task_or_id)
|
|
|
- _tasks[t.config_id] = t
|
|
|
- return _tasks
|
|
|
-
|
|
|
@tasks.setter # type: ignore
|
|
|
@_self_setter(_MANAGER_NAME)
|
|
|
- def tasks(self, val: Union[Set[TaskId], Set[Task]]):
|
|
|
+ def tasks(self, val: Union[Set[TaskId], Set[Task]]) -> None:
|
|
|
self._tasks = set(val)
|
|
|
|
|
|
@property # type: ignore
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
def additional_data_nodes(self) -> Dict[str, DataNode]:
|
|
|
- return self.__get_additional_data_nodes()
|
|
|
+ """The dictionary of the scenario's additional data nodes.
|
|
|
|
|
|
- def __get_additional_data_nodes(self):
|
|
|
- from ..data._data_manager_factory import _DataManagerFactory
|
|
|
-
|
|
|
- additional_data_nodes = {}
|
|
|
- data_manager = _DataManagerFactory._build_manager()
|
|
|
-
|
|
|
- for dn_or_id in self._additional_data_nodes:
|
|
|
- dn = data_manager._get(dn_or_id, dn_or_id)
|
|
|
-
|
|
|
- if not isinstance(dn, DataNode):
|
|
|
- raise NonExistingDataNode(dn_or_id)
|
|
|
- additional_data_nodes[dn.config_id] = dn
|
|
|
- return additional_data_nodes
|
|
|
+ Additional data nodes are data nodes that are not part of the
|
|
|
+ scenario's graph but are used to store extra data. They are not
|
|
|
+ connected to the scenario's tasks."""
|
|
|
+ return self.__get_additional_data_nodes()
|
|
|
|
|
|
@additional_data_nodes.setter # type: ignore
|
|
|
@_self_setter(_MANAGER_NAME)
|
|
|
- def additional_data_nodes(self, val: Union[Set[TaskId], Set[DataNode]]):
|
|
|
+ def additional_data_nodes(self, val: Union[Set[TaskId], Set[DataNode]]) -> None:
|
|
|
self._additional_data_nodes = set(val)
|
|
|
|
|
|
- def _get_set_of_tasks(self) -> Set[Task]:
|
|
|
- return set(self.tasks.values())
|
|
|
-
|
|
|
- def __get_data_nodes(self) -> Dict[str, DataNode]:
|
|
|
- data_nodes_dict = self.__get_additional_data_nodes()
|
|
|
- for _, task in self.__get_tasks().items():
|
|
|
- data_nodes_dict.update(task.data_nodes)
|
|
|
- return data_nodes_dict
|
|
|
-
|
|
|
@property # type: ignore
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
def data_nodes(self) -> Dict[str, DataNode]:
|
|
|
+ """The dictionary of the scenario's data nodes."""
|
|
|
return self.__get_data_nodes()
|
|
|
|
|
|
@property # type: ignore
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
- def creation_date(self):
|
|
|
+ def creation_date(self) -> datetime:
|
|
|
+ """The date and time of the scenario's creation."""
|
|
|
return self._creation_date
|
|
|
|
|
|
@creation_date.setter # type: ignore
|
|
|
@_self_setter(_MANAGER_NAME)
|
|
|
- def creation_date(self, val):
|
|
|
+ def creation_date(self, val) -> None:
|
|
|
self._creation_date = val
|
|
|
|
|
|
@property # type: ignore
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
- def cycle(self):
|
|
|
+ def cycle(self) -> Optional[Cycle]:
|
|
|
+ """The cycle of the scenario"""
|
|
|
return self._cycle
|
|
|
|
|
|
@cycle.setter # type: ignore
|
|
|
@_self_setter(_MANAGER_NAME)
|
|
|
- def cycle(self, val):
|
|
|
+ def cycle(self, val) -> None:
|
|
|
self._cycle = val
|
|
|
|
|
|
@property # type: ignore
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
- def is_primary(self):
|
|
|
+ def is_primary(self) -> bool:
|
|
|
+ """True if the scenario is the primary of its cycle. False otherwise."""
|
|
|
return self._primary_scenario
|
|
|
|
|
|
@is_primary.setter # type: ignore
|
|
|
@_self_setter(_MANAGER_NAME)
|
|
|
- def is_primary(self, val):
|
|
|
+ def is_primary(self, val) -> None:
|
|
|
self._primary_scenario = val
|
|
|
|
|
|
@property # type: ignore
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
- def subscribers(self):
|
|
|
+ def subscribers(self) -> _ListAttributes:
|
|
|
+ """The list of callbacks to be called on `Job^`'s status change."""
|
|
|
return self._subscribers
|
|
|
|
|
|
@subscribers.setter # type: ignore
|
|
|
@_self_setter(_MANAGER_NAME)
|
|
|
- def subscribers(self, val):
|
|
|
+ def subscribers(self, val) -> None:
|
|
|
self._subscribers = _ListAttributes(self, val)
|
|
|
|
|
|
@property # type: ignore
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
- def tags(self):
|
|
|
+ def tags(self) -> Set[str]:
|
|
|
+ """The set of scenario's tags."""
|
|
|
return self._tags
|
|
|
|
|
|
@tags.setter # type: ignore
|
|
|
@_self_setter(_MANAGER_NAME)
|
|
|
- def tags(self, val):
|
|
|
+ def tags(self, val) -> None:
|
|
|
self._tags = val or set()
|
|
|
|
|
|
@property
|
|
|
- def version(self):
|
|
|
+ def version(self) -> str:
|
|
|
+ """The application version of the scenario.
|
|
|
+
|
|
|
+ The string indicates the application version of the scenario. If not
|
|
|
+ provided, the latest version is used."""
|
|
|
return self._version
|
|
|
|
|
|
@property
|
|
|
- def owner_id(self):
|
|
|
- return self._cycle.id
|
|
|
+ def owner_id(self) -> Optional[str]:
|
|
|
+ """The identifier of the scenario cycle."""
|
|
|
+ return self._cycle.id if self._cycle else None
|
|
|
|
|
|
@property
|
|
|
- def properties(self):
|
|
|
+ def properties(self) -> _Properties:
|
|
|
+ """The dictionary of additional properties."""
|
|
|
self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
|
|
|
return self._properties
|
|
|
|
|
|
@property # type: ignore
|
|
|
def name(self) -> Optional[str]:
|
|
|
+ """The human-readable name of the scenario."""
|
|
|
return self.properties.get("name")
|
|
|
|
|
|
@name.setter # type: ignore
|
|
|
- def name(self, val):
|
|
|
+ def name(self, val) -> None:
|
|
|
self.properties["name"] = val
|
|
|
|
|
|
def has_tag(self, tag: str) -> bool:
|
|
@@ -555,50 +344,38 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
|
|
|
Parameters:
|
|
|
tag (str): The tag to search among the set of scenario's tags.
|
|
|
+
|
|
|
Returns:
|
|
|
True if the scenario has the tag given as parameter. False otherwise.
|
|
|
"""
|
|
|
return tag in self.tags
|
|
|
|
|
|
- def _add_tag(self, tag: str):
|
|
|
- self._tags = _Reloader()._reload("scenario", self)._tags
|
|
|
- self._tags.add(tag)
|
|
|
-
|
|
|
- def _remove_tag(self, tag: str):
|
|
|
- self._tags = _Reloader()._reload("scenario", self)._tags
|
|
|
- if self.has_tag(tag):
|
|
|
- self._tags.remove(tag)
|
|
|
-
|
|
|
- def subscribe(
|
|
|
- self,
|
|
|
- callback: Callable[[Scenario, Job], None],
|
|
|
- params: Optional[List[Any]] = None,
|
|
|
- ):
|
|
|
+ def subscribe(self, callback: Callable[[Scenario, Job], None], params: Optional[List[Any]] = None) -> None:
|
|
|
"""Subscribe a function to be called on `Job^` status change.
|
|
|
|
|
|
The subscription is applied to all jobs created from the scenario's execution.
|
|
|
|
|
|
+ Note:
|
|
|
+ Notification will be available only for jobs created after this subscription.
|
|
|
+
|
|
|
Parameters:
|
|
|
callback (Callable[[Scenario^, Job^], None]): The callable function to be called
|
|
|
on status change.
|
|
|
params (Optional[List[Any]]): The parameters to be passed to the _callback_.
|
|
|
-
|
|
|
- Note:
|
|
|
- Notification will be available only for jobs created after this subscription.
|
|
|
"""
|
|
|
from ... import core as tp
|
|
|
|
|
|
return tp.subscribe_scenario(callback, params, self)
|
|
|
|
|
|
- def unsubscribe(self, callback: Callable[[Scenario, Job], None], params: Optional[List[Any]] = None):
|
|
|
+ def unsubscribe(self, callback: Callable[[Scenario, Job], None], params: Optional[List[Any]] = None) -> None:
|
|
|
"""Unsubscribe a function that is called when the status of a `Job^` changes.
|
|
|
|
|
|
+ Note:
|
|
|
+ The function will continue to be called for ongoing jobs.
|
|
|
+
|
|
|
Parameters:
|
|
|
callback (Callable[[Scenario^, Job^], None]): The callable function to unsubscribe.
|
|
|
params (Optional[List[Any]]): The parameters to be passed to the _callback_.
|
|
|
-
|
|
|
- Note:
|
|
|
- The function will continue to be called for ongoing jobs.
|
|
|
"""
|
|
|
from ... import core as tp
|
|
|
|
|
@@ -626,6 +403,7 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
before returning.<br/>
|
|
|
If not provided and *wait* is True, the function waits indefinitely.
|
|
|
**properties (dict[str, any]): A keyworded variable length list of additional arguments.
|
|
|
+
|
|
|
Returns:
|
|
|
A `Submission^` containing the information of the submission.
|
|
|
"""
|
|
@@ -633,7 +411,7 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
|
|
|
return _ScenarioManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout, **properties)
|
|
|
|
|
|
- def set_primary(self):
|
|
|
+ def set_primary(self) -> None:
|
|
|
"""Promote the scenario as the primary scenario of its cycle.
|
|
|
|
|
|
If the cycle already has a primary scenario, it will be demoted, and it will no longer
|
|
@@ -643,7 +421,7 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
|
|
|
return tp.set_primary(self)
|
|
|
|
|
|
- def add_tag(self, tag: str):
|
|
|
+ def add_tag(self, tag: str) -> None:
|
|
|
"""Add a tag to this scenario.
|
|
|
|
|
|
If the scenario's cycle already have another scenario tagged with _tag_ the other
|
|
@@ -656,7 +434,7 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
|
|
|
return tp.tag(self, tag)
|
|
|
|
|
|
- def remove_tag(self, tag: str):
|
|
|
+ def remove_tag(self, tag: str) -> None:
|
|
|
"""Remove a tag from this scenario.
|
|
|
|
|
|
Parameters:
|
|
@@ -692,7 +470,143 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
"""
|
|
|
return self._get_simple_label()
|
|
|
|
|
|
+ def add_sequence(
|
|
|
+ self,
|
|
|
+ name: str,
|
|
|
+ tasks: Union[List[Task], List[TaskId]],
|
|
|
+ properties: Optional[Dict] = None,
|
|
|
+ subscribers: Optional[List[_Subscriber]] = None,
|
|
|
+ ) -> None:
|
|
|
+ """Add a sequence to the scenario.
|
|
|
+
|
|
|
+ Parameters:
|
|
|
+ name (str): The name of the sequence.
|
|
|
+ tasks (Union[List[Task], List[TaskId]]): The list of scenario's tasks to add to the sequence.
|
|
|
+ properties (Optional[Dict]): The optional properties of the sequence.
|
|
|
+ subscribers (Optional[List[_Subscriber]]): The optional list of callbacks to be called on
|
|
|
+ `Job^`'s status change.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ SequenceTaskDoesNotExistInScenario^: If a task in the sequence does not exist in the scenario.
|
|
|
+ SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
|
|
|
+ """
|
|
|
+ if name in self.sequences:
|
|
|
+ raise SequenceAlreadyExists(name, self.id)
|
|
|
+ seq = self._set_sequence(name, tasks, properties, subscribers)
|
|
|
+ Notifier.publish(_make_event(seq, EventOperation.CREATION))
|
|
|
+
|
|
|
+ def update_sequence(
|
|
|
+ self,
|
|
|
+ name: str,
|
|
|
+ tasks: Union[List[Task], List[TaskId]],
|
|
|
+ properties: Optional[Dict] = None,
|
|
|
+ subscribers: Optional[List[_Subscriber]] = None,
|
|
|
+ ) -> None:
|
|
|
+ """Update an existing sequence.
|
|
|
+
|
|
|
+ Parameters:
|
|
|
+ name (str): The name of the sequence to update.
|
|
|
+ tasks (Union[List[Task], List[TaskId]]): The new list of scenario's tasks.
|
|
|
+ properties (Optional[Dict]): The new properties of the sequence.
|
|
|
+ subscribers (Optional[List[_Subscriber]]): The new list of callbacks to be called on `Job^`'s status change.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ SequenceTaskDoesNotExistInScenario^: If a task in the list does not exist in the scenario.
|
|
|
+ SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
|
|
|
+ """
|
|
|
+ if name not in self.sequences:
|
|
|
+ raise NonExistingSequence(name, self.id)
|
|
|
+ seq = self._set_sequence(name, tasks, properties, subscribers)
|
|
|
+ Notifier.publish(_make_event(seq, EventOperation.UPDATE))
|
|
|
+
|
|
|
+ def add_sequences(self, sequences: Dict[str, Union[List[Task], List[TaskId]]]) -> None:
|
|
|
+ """Add multiple sequences to the scenario.
|
|
|
+
|
|
|
+ Note:
|
|
|
+ To provide properties and subscribers for the sequences, use `Scenario.add_sequence^` instead.
|
|
|
+
|
|
|
+ Parameters:
|
|
|
+ sequences (Dict[str, Union[List[Task], List[TaskId]]]):
|
|
|
+ A dictionary containing sequences to add. Each key is a sequence name, and the value must
|
|
|
+ be a list of the scenario tasks.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ SequenceTaskDoesNotExistInScenario^: If a task in the sequence does not exist in the scenario.
|
|
|
+ """
|
|
|
+ _scenario = _Reloader()._reload(self._MANAGER_NAME, self)
|
|
|
+ _sc_task_ids = {task.id if isinstance(task, Task) else task for task in _scenario._tasks}
|
|
|
+ for name, tasks in sequences.items():
|
|
|
+ _seq_task_ids: Set[TaskId] = {task.id if isinstance(task, Task) else task for task in tasks}
|
|
|
+ self.__check_sequence_tasks_exist_in_scenario_tasks(name, _seq_task_ids, self.id, _sc_task_ids)
|
|
|
+ # Need to parse twice the sequences to avoid adding some sequences and not others in case of exception
|
|
|
+ for name, tasks in sequences.items():
|
|
|
+ self.add_sequence(name, tasks)
|
|
|
+
|
|
|
+ def remove_sequence(self, name: str) -> None:
|
|
|
+ """Remove a sequence from the scenario.
|
|
|
+
|
|
|
+ Parameters:
|
|
|
+ name (str): The name of the sequence to remove.
|
|
|
+ """
|
|
|
+ seq_id = self.sequences[name].id
|
|
|
+ _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
|
|
|
+ _sequences.pop(name)
|
|
|
+ self.sequences = _sequences # type: ignore
|
|
|
+ Notifier.publish(Event(EventEntityType.SEQUENCE, EventOperation.DELETION, entity_id=seq_id))
|
|
|
+
|
|
|
+ def remove_sequences(self, sequence_names: List[str]) -> None:
|
|
|
+ """Remove multiple sequences from the scenario.
|
|
|
+
|
|
|
+ Parameters:
|
|
|
+ sequence_names (List[str]): A list of sequence names to remove.
|
|
|
+ """
|
|
|
+ _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
|
|
|
+ for sequence_name in sequence_names:
|
|
|
+ seq_id = self.sequences[sequence_name].id
|
|
|
+ _sequences.pop(sequence_name)
|
|
|
+ Notifier.publish(
|
|
|
+ Event(
|
|
|
+ EventEntityType.SEQUENCE,
|
|
|
+ EventOperation.DELETION,
|
|
|
+ entity_id=seq_id,
|
|
|
+ )
|
|
|
+ )
|
|
|
+ self.sequences = _sequences # type: ignore
|
|
|
+
|
|
|
+ def rename_sequence(self, old_name, new_name) -> None:
|
|
|
+ """Rename a scenario sequence.
|
|
|
+
|
|
|
+ Parameters:
|
|
|
+ old_name (str): The current name of the sequence to rename.
|
|
|
+ new_name (str): The new name of the sequence.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ SequenceAlreadyExists^: If a sequence with the same name already exists in the scenario.
|
|
|
+ """
|
|
|
+ if old_name == new_name:
|
|
|
+ return
|
|
|
+ if new_name in self.sequences:
|
|
|
+ raise SequenceAlreadyExists(new_name, self.id)
|
|
|
+ self._sequences[new_name] = self._sequences[old_name]
|
|
|
+ del self._sequences[old_name]
|
|
|
+ self.sequences = self._sequences # type: ignore
|
|
|
+ Notifier.publish(
|
|
|
+ Event(
|
|
|
+ EventEntityType.SCENARIO,
|
|
|
+ EventOperation.UPDATE,
|
|
|
+ entity_id=self.id,
|
|
|
+ attribute_name="sequences",
|
|
|
+ attribute_value=self._sequences,
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _new_id(config_id: str) -> ScenarioId:
|
|
|
+ """Generate a unique scenario identifier."""
|
|
|
+ return ScenarioId(Scenario.__SEPARATOR.join([Scenario._ID_PREFIX, _validate_id(config_id), str(uuid.uuid4())]))
|
|
|
+
|
|
|
def _is_consistent(self) -> bool:
|
|
|
+ """Check if the scenario is consistent."""
|
|
|
dag = self._build_dag()
|
|
|
if dag.number_of_nodes() == 0:
|
|
|
return True
|
|
@@ -706,6 +620,116 @@ class Scenario(_Entity, Submittable, _Labeled):
|
|
|
return False
|
|
|
return True
|
|
|
|
|
|
+ def _add_tag(self, tag: str) -> None:
|
|
|
+ self._tags = _Reloader()._reload("scenario", self)._tags
|
|
|
+ self._tags.add(tag)
|
|
|
+
|
|
|
+ def _remove_tag(self, tag: str) -> None:
|
|
|
+ self._tags = _Reloader()._reload("scenario", self)._tags
|
|
|
+ if self.has_tag(tag):
|
|
|
+ self._tags.remove(tag)
|
|
|
+
|
|
|
+ def _get_set_of_tasks(self) -> Set[Task]:
|
|
|
+ return set(self.tasks.values())
|
|
|
+
|
|
|
+ def __get_data_nodes(self) -> Dict[str, DataNode]:
|
|
|
+ data_nodes_dict = self.__get_additional_data_nodes()
|
|
|
+ for _, task in self.__get_tasks().items():
|
|
|
+ data_nodes_dict.update(task.data_nodes)
|
|
|
+ return data_nodes_dict
|
|
|
+
|
|
|
+ def __get_additional_data_nodes(self):
|
|
|
+ from ..data._data_manager_factory import _DataManagerFactory
|
|
|
+
|
|
|
+ additional_data_nodes = {}
|
|
|
+ data_manager = _DataManagerFactory._build_manager()
|
|
|
+
|
|
|
+ for dn_or_id in self._additional_data_nodes:
|
|
|
+ dn = data_manager._get(dn_or_id, dn_or_id)
|
|
|
+
|
|
|
+ if not isinstance(dn, DataNode):
|
|
|
+ raise NonExistingDataNode(dn_or_id)
|
|
|
+ additional_data_nodes[dn.config_id] = dn
|
|
|
+ return additional_data_nodes
|
|
|
+
|
|
|
+ def __get_tasks(self) -> Dict[str, Task]:
|
|
|
+ from ..task._task_manager_factory import _TaskManagerFactory
|
|
|
+
|
|
|
+ _tasks = {}
|
|
|
+ task_manager = _TaskManagerFactory._build_manager()
|
|
|
+
|
|
|
+ for task_or_id in self._tasks:
|
|
|
+ t = task_manager._get(task_or_id, task_or_id)
|
|
|
+
|
|
|
+ if not isinstance(t, Task):
|
|
|
+ raise NonExistingTask(task_or_id)
|
|
|
+ _tasks[t.config_id] = t
|
|
|
+ return _tasks
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def __check_sequence_tasks_exist_in_scenario_tasks(
|
|
|
+ sequence_name: str, sequence_task_ids: Set[TaskId], scenario_id: ScenarioId, scenario_task_ids: Set[TaskId]
|
|
|
+ ):
|
|
|
+ non_existing_sequence_task_ids_in_scenario = set()
|
|
|
+ for sequence_task_id in sequence_task_ids:
|
|
|
+ if sequence_task_id not in scenario_task_ids:
|
|
|
+ non_existing_sequence_task_ids_in_scenario.add(sequence_task_id)
|
|
|
+ if len(non_existing_sequence_task_ids_in_scenario) > 0:
|
|
|
+ raise SequenceTaskDoesNotExistInScenario(
|
|
|
+ list(non_existing_sequence_task_ids_in_scenario), sequence_name, scenario_id
|
|
|
+ )
|
|
|
+
|
|
|
+ def _get_sequences(self) -> Dict[str, Sequence]:
|
|
|
+ _sequences = {}
|
|
|
+
|
|
|
+ from ..sequence._sequence_manager_factory import _SequenceManagerFactory
|
|
|
+
|
|
|
+ sequence_manager = _SequenceManagerFactory._build_manager()
|
|
|
+
|
|
|
+ for sequence_name, sequence_data in self._sequences.items():
|
|
|
+ sequence = sequence_manager._build_sequence(
|
|
|
+ sequence_name,
|
|
|
+ sequence_data.get(self._SEQUENCE_TASKS_KEY, []),
|
|
|
+ sequence_data.get(self._SEQUENCE_SUBSCRIBERS_KEY, []),
|
|
|
+ sequence_data.get(self._SEQUENCE_PROPERTIES_KEY, {}),
|
|
|
+ self.id,
|
|
|
+ self.version,
|
|
|
+ )
|
|
|
+ if not isinstance(sequence, Sequence):
|
|
|
+ raise NonExistingSequence(sequence_name, self.id)
|
|
|
+ _sequences[sequence_name] = sequence
|
|
|
+ return _sequences
|
|
|
+
|
|
|
+ def _set_sequence(
|
|
|
+ self,
|
|
|
+ name: str,
|
|
|
+ tasks: Union[List[Task], List[TaskId]],
|
|
|
+ properties: Optional[Dict] = None,
|
|
|
+ subscribers: Optional[List[_Subscriber]] = None,
|
|
|
+ ) -> Sequence:
|
|
|
+ _scenario = _Reloader()._reload(self._MANAGER_NAME, self)
|
|
|
+ _scenario_task_ids = {task.id if isinstance(task, Task) else task for task in _scenario._tasks}
|
|
|
+ _sequence_task_ids: Set[TaskId] = {task.id if isinstance(task, Task) else task for task in tasks}
|
|
|
+ self.__check_sequence_tasks_exist_in_scenario_tasks(name, _sequence_task_ids, self.id, _scenario_task_ids)
|
|
|
+
|
|
|
+ from taipy.core.sequence._sequence_manager_factory import _SequenceManagerFactory
|
|
|
+
|
|
|
+ seq_manager = _SequenceManagerFactory._build_manager()
|
|
|
+ seq = seq_manager._create(name, tasks, subscribers or [], properties or {}, self.id, self.version)
|
|
|
+
|
|
|
+ _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
|
|
|
+ _sequences.update(
|
|
|
+ {
|
|
|
+ name: {
|
|
|
+ self._SEQUENCE_TASKS_KEY: tasks,
|
|
|
+ self._SEQUENCE_PROPERTIES_KEY: properties or {},
|
|
|
+ self._SEQUENCE_SUBSCRIBERS_KEY: subscribers or [],
|
|
|
+ }
|
|
|
+ }
|
|
|
+ )
|
|
|
+ self.sequences = _sequences # type: ignore
|
|
|
+ return seq
|
|
|
+
|
|
|
|
|
|
@_make_event.register(Scenario)
|
|
|
def _make_event_for_scenario(
|