|
@@ -16,9 +16,9 @@ from ..notification import EventOperation, Notifier, _make_event
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
from ..data.data_node import DataNode, DataNodeId
|
|
|
- from ..scenario.scenario import ScenarioId
|
|
|
- from ..sequence.sequence import SequenceId
|
|
|
- from ..task.task import TaskId
|
|
|
+ from ..scenario.scenario import Scenario, ScenarioId
|
|
|
+ from ..sequence.sequence import Sequence, SequenceId
|
|
|
+ from ..task.task import Task, TaskId
|
|
|
|
|
|
|
|
|
class _ReadyToRunProperty:
|
|
@@ -33,61 +33,62 @@ class _ReadyToRunProperty:
|
|
|
Union["ScenarioId", "SequenceId", "TaskId"], Dict["DataNodeId", Set[str]]
|
|
|
] = defaultdict(lambda: defaultdict(set))
|
|
|
|
|
|
- @staticmethod
|
|
|
- def _publish_submittable_property_event(
|
|
|
- submittable_id: Union["ScenarioId", "SequenceId", "TaskId"], submittable_property
|
|
|
- ) -> None:
|
|
|
- from ..taipy import get as tp_get
|
|
|
+ @classmethod
|
|
|
+ def _add(cls, dn: "DataNode", reason: str) -> None:
|
|
|
+ from ..scenario.scenario import Scenario
|
|
|
+ from ..sequence.sequence import Sequence
|
|
|
+ from ..task.task import Task
|
|
|
|
|
|
- Notifier.publish(
|
|
|
- _make_event(
|
|
|
- tp_get(submittable_id),
|
|
|
- EventOperation.UPDATE,
|
|
|
- attribute_name=_ReadyToRunProperty.IS_SUBMITTABLE_PROPERTY_NAME,
|
|
|
- attribute_value=submittable_property,
|
|
|
- )
|
|
|
- )
|
|
|
+ parent_entities = dn.get_parents()
|
|
|
|
|
|
- @classmethod
|
|
|
- def __add(
|
|
|
- cls, submittable_id: Union["ScenarioId", "SequenceId", "TaskId"], datanode_id: "DataNodeId", reason: str
|
|
|
- ) -> None:
|
|
|
- cls._datanode_id_submittables[datanode_id].add(submittable_id)
|
|
|
- if submittable_id not in cls._submittable_id_datanodes:
|
|
|
- cls._publish_submittable_property_event(submittable_id, False)
|
|
|
- cls._submittable_id_datanodes[submittable_id][datanode_id].add(reason)
|
|
|
+ for scenario_parent in parent_entities.get(Scenario._MANAGER_NAME, []):
|
|
|
+ _ReadyToRunProperty.__add(scenario_parent, dn, reason)
|
|
|
+ for sequence_parent in parent_entities.get(Sequence._MANAGER_NAME, []):
|
|
|
+ _ReadyToRunProperty.__add(sequence_parent, dn, reason)
|
|
|
+ for task_parent in parent_entities.get(Task._MANAGER_NAME, []):
|
|
|
+ _ReadyToRunProperty.__add(task_parent, dn, reason)
|
|
|
|
|
|
@classmethod
|
|
|
- def _remove(cls, datanode_id: "DataNodeId", reason: str) -> None:
|
|
|
+ def _remove(cls, datanode: "DataNode", reason: str) -> None:
|
|
|
# check the data node status to determine the reason to be removed
|
|
|
- submittable_ids: Set = cls._datanode_id_submittables.get(datanode_id, set())
|
|
|
+ submittable_ids: Set = cls._datanode_id_submittables.get(datanode.id, set())
|
|
|
|
|
|
to_remove_dn = False
|
|
|
for submittable_id in submittable_ids:
|
|
|
# check remove the reason
|
|
|
- if reason in cls._submittable_id_datanodes[submittable_id].get(datanode_id, set()):
|
|
|
- cls._submittable_id_datanodes[submittable_id].get(datanode_id, set()).remove(reason)
|
|
|
- if len(cls._submittable_id_datanodes[submittable_id][datanode_id]) == 0:
|
|
|
+ if reason in cls._submittable_id_datanodes[submittable_id].get(datanode.id, set()):
|
|
|
+ cls._submittable_id_datanodes[submittable_id].get(datanode.id, set()).remove(reason)
|
|
|
+ if len(cls._submittable_id_datanodes[submittable_id][datanode.id]) == 0:
|
|
|
to_remove_dn = True
|
|
|
- cls._submittable_id_datanodes[submittable_id].pop(datanode_id, None)
|
|
|
+ cls._submittable_id_datanodes[submittable_id].pop(datanode.id, None)
|
|
|
if len(cls._submittable_id_datanodes[submittable_id]) == 0:
|
|
|
- cls._publish_submittable_property_event(submittable_id, True)
|
|
|
+ from ..taipy import get as tp_get
|
|
|
+
|
|
|
+ submittable = tp_get(submittable_id)
|
|
|
+ cls.__publish_submittable_property_event(submittable, True)
|
|
|
cls._submittable_id_datanodes.pop(submittable_id, None)
|
|
|
|
|
|
if to_remove_dn:
|
|
|
- cls._datanode_id_submittables.pop(datanode_id)
|
|
|
+ cls._datanode_id_submittables.pop(datanode.id)
|
|
|
|
|
|
@classmethod
|
|
|
- def _add(cls, dn: "DataNode", reason: str) -> None:
|
|
|
- from ..scenario.scenario import Scenario
|
|
|
- from ..sequence.sequence import Sequence
|
|
|
- from ..task.task import Task
|
|
|
-
|
|
|
- parent_entities = dn.get_parents()
|
|
|
+ def __add(
|
|
|
+ cls, submittable: Union["Scenario", "Sequence", "Task"], datanode: "DataNode", reason: str
|
|
|
+ ) -> None:
|
|
|
+ cls._datanode_id_submittables[datanode.id].add(submittable.id)
|
|
|
+ if submittable.id not in cls._submittable_id_datanodes:
|
|
|
+ cls.__publish_submittable_property_event(submittable, False)
|
|
|
+ cls._submittable_id_datanodes[submittable.id][datanode.id].add(reason)
|
|
|
|
|
|
- for scenario_parent in parent_entities.get(Scenario._MANAGER_NAME, []):
|
|
|
- _ReadyToRunProperty.__add(scenario_parent.id, dn.id, reason)
|
|
|
- for sequence_parent in parent_entities.get(Sequence._MANAGER_NAME, []):
|
|
|
- _ReadyToRunProperty.__add(sequence_parent.id, dn.id, reason)
|
|
|
- for task_parent in parent_entities.get(Task._MANAGER_NAME, []):
|
|
|
- _ReadyToRunProperty.__add(task_parent.id, dn.id, reason)
|
|
|
+ @staticmethod
|
|
|
+ def __publish_submittable_property_event(
|
|
|
+ submittable: Union["Scenario", "Sequence", "Task"], submittable_property
|
|
|
+ ) -> None:
|
|
|
+ Notifier.publish(
|
|
|
+ _make_event(
|
|
|
+ submittable,
|
|
|
+ EventOperation.UPDATE,
|
|
|
+ attribute_name=_ReadyToRunProperty.IS_SUBMITTABLE_PROPERTY_NAME,
|
|
|
+ attribute_value=submittable_property,
|
|
|
+ )
|
|
|
+ )
|