Jelajahi Sumber

Merge pull request #1086 from Avaiga/bug/#746-scenario-submittable-not-clear-in-visual-element

bug/#746 new notifier mechanism for submittable status
Toan Quach 1 tahun lalu
induk
melakukan
abe39d54ab

+ 103 - 0
taipy/core/_entity/_ready_to_run_property.py

@@ -0,0 +1,103 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+from typing import TYPE_CHECKING, Dict, Set, Union
+
+from ..notification import EventOperation, Notifier, _make_event
+
+if TYPE_CHECKING:
+    from ..data.data_node import DataNode, DataNodeId
+    from ..scenario.scenario import Scenario, ScenarioId
+    from ..sequence.sequence import Sequence, SequenceId
+    from ..task.task import Task, TaskId
+
+
+class _ReadyToRunProperty:
+    IS_SUBMITTABLE_PROPERTY_NAME: str = "is_submittable"
+
+    # A dictionary of the data nodes not ready_to_read and their corresponding submittable entities.
+    _datanode_id_submittables: Dict["DataNodeId", Set[Union["ScenarioId", "SequenceId", "TaskId"]]] = {}
+
+    # A nested dictionary of the submittable entities (Scenario, Sequence, Task) and
+    # the data nodes that make it not ready_to_run with the reason(s)
+    _submittable_id_datanodes: Dict[Union["ScenarioId", "SequenceId", "TaskId"], Dict["DataNodeId", Set[str]]] = {}
+
+    @classmethod
+    def _add(cls, dn: "DataNode", reason: str) -> None:
+        from ..scenario.scenario import Scenario
+        from ..sequence.sequence import Sequence
+        from ..task.task import Task
+
+        parent_entities = dn.get_parents()
+
+        for scenario_parent in parent_entities.get(Scenario._MANAGER_NAME, []):
+            if dn in scenario_parent.get_inputs():
+                _ReadyToRunProperty.__add(scenario_parent, dn, reason)
+        for sequence_parent in parent_entities.get(Sequence._MANAGER_NAME, []):
+            if dn in sequence_parent.get_inputs():
+                _ReadyToRunProperty.__add(sequence_parent, dn, reason)
+        for task_parent in parent_entities.get(Task._MANAGER_NAME, []):
+            if dn in task_parent.input.values():
+                _ReadyToRunProperty.__add(task_parent, dn, reason)
+
+    @classmethod
+    def _remove(cls, datanode: "DataNode", reason: str) -> None:
+        from ..taipy import get as tp_get
+
+        # check the data node status to determine the reason to be removed
+        submittable_ids: Set = cls._datanode_id_submittables.get(datanode.id, set())
+
+        to_remove_dn = False
+        for submittable_id in submittable_ids:
+            # check remove the reason
+            if reason in cls._submittable_id_datanodes.get(submittable_id, {}).get(datanode.id, set()):
+                cls._submittable_id_datanodes[submittable_id][datanode.id].remove(reason)
+            if len(cls._submittable_id_datanodes.get(submittable_id, {}).get(datanode.id, set())) == 0:
+                to_remove_dn = True
+                cls._submittable_id_datanodes.get(submittable_id, {}).pop(datanode.id, None)
+                if (
+                    submittable_id in cls._submittable_id_datanodes
+                    and len(cls._submittable_id_datanodes[submittable_id]) == 0
+                ):
+                    submittable = tp_get(submittable_id)
+                    cls.__publish_submittable_property_event(submittable, True)
+                    cls._submittable_id_datanodes.pop(submittable_id, None)
+
+        if to_remove_dn:
+            cls._datanode_id_submittables.pop(datanode.id)
+
+    @classmethod
+    def __add(cls, submittable: Union["Scenario", "Sequence", "Task"], datanode: "DataNode", reason: str) -> None:
+        if datanode.id not in cls._datanode_id_submittables:
+            cls._datanode_id_submittables[datanode.id] = set()
+        cls._datanode_id_submittables[datanode.id].add(submittable.id)
+
+        if submittable.id not in cls._submittable_id_datanodes:
+            cls.__publish_submittable_property_event(submittable, False)
+
+        if submittable.id not in cls._submittable_id_datanodes:
+            cls._submittable_id_datanodes[submittable.id] = {}
+        if datanode.id not in cls._submittable_id_datanodes[submittable.id]:
+            cls._submittable_id_datanodes[submittable.id][datanode.id] = set()
+        cls._submittable_id_datanodes[submittable.id][datanode.id].add(reason)
+
+    @staticmethod
+    def __publish_submittable_property_event(
+        submittable: Union["Scenario", "Sequence", "Task"], submittable_property
+    ) -> None:
+        Notifier.publish(
+            _make_event(
+                submittable,
+                EventOperation.UPDATE,
+                attribute_name=_ReadyToRunProperty.IS_SUBMITTABLE_PROPERTY_NAME,
+                attribute_value=submittable_property,
+            )
+        )

+ 2 - 1
taipy/core/_entity/submittable.py

@@ -33,7 +33,8 @@ class Submittable:
         subscribers (List[Callable]): The list of callbacks to be called on `Job^`'s status change.
         subscribers (List[Callable]): The list of callbacks to be called on `Job^`'s status change.
     """
     """
 
 
-    def __init__(self, subscribers: Optional[List[_Subscriber]] = None):
+    def __init__(self, submittable_id: str, subscribers: Optional[List[_Subscriber]] = None) -> None:
+        self._submittable_id = submittable_id
         self._subscribers = _ListAttributes(self, subscribers or [])
         self._subscribers = _ListAttributes(self, subscribers or [])
 
 
     @abc.abstractmethod
     @abc.abstractmethod

+ 1 - 1
taipy/core/cycle/cycle.py

@@ -51,7 +51,7 @@ class Cycle(_Entity, _Labeled):
         end_date: datetime,
         end_date: datetime,
         name: Optional[str] = None,
         name: Optional[str] = None,
         id: Optional[CycleId] = None,
         id: Optional[CycleId] = None,
-    ):
+    ) -> None:
         self._frequency = frequency
         self._frequency = frequency
         self._creation_date = creation_date
         self._creation_date = creation_date
         self._start_date = start_date
         self._start_date = start_date

+ 1 - 1
taipy/core/data/_abstract_sql.py

@@ -90,7 +90,7 @@ class _AbstractSQLDataNode(DataNode, _TabularDataNodeMixin):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties: Optional[Dict] = None,
         properties: Optional[Dict] = None,
-    ):
+    ) -> None:
         if properties is None:
         if properties is None:
             properties = {}
             properties = {}
         self._check_required_properties(properties)
         self._check_required_properties(properties)

+ 1 - 1
taipy/core/data/aws_s3.py

@@ -97,7 +97,7 @@ class S3ObjectDataNode(DataNode):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties: Optional[Dict] = None,
         properties: Optional[Dict] = None,
-    ):
+    ) -> None:
         _check_dependency_is_installed("S3 Data Node", "boto3")
         _check_dependency_is_installed("S3 Data Node", "boto3")
         if properties is None:
         if properties is None:
             properties = {}
             properties = {}

+ 1 - 1
taipy/core/data/csv.py

@@ -80,7 +80,7 @@ class CSVDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties: Optional[Dict] = None,
         properties: Optional[Dict] = None,
-    ):
+    ) -> None:
         self.id = id or self._new_id(config_id)
         self.id = id or self._new_id(config_id)
 
 
         if properties is None:
         if properties is None:

+ 22 - 1
taipy/core/data/data_node.py

@@ -9,6 +9,7 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 # specific language governing permissions and limitations under the License.
 
 
+import functools
 import os
 import os
 import uuid
 import uuid
 from abc import abstractmethod
 from abc import abstractmethod
@@ -24,6 +25,7 @@ from taipy.logger._taipy_logger import _TaipyLogger
 from .._entity._entity import _Entity
 from .._entity._entity import _Entity
 from .._entity._labeled import _Labeled
 from .._entity._labeled import _Labeled
 from .._entity._properties import _Properties
 from .._entity._properties import _Properties
+from .._entity._ready_to_run_property import _ReadyToRunProperty
 from .._entity._reload import _Reloader, _self_reload, _self_setter
 from .._entity._reload import _Reloader, _self_reload, _self_setter
 from .._version._version_manager_factory import _VersionManagerFactory
 from .._version._version_manager_factory import _VersionManagerFactory
 from ..common._warnings import _warn_deprecated
 from ..common._warnings import _warn_deprecated
@@ -35,6 +37,23 @@ from .data_node_id import DataNodeId, Edit
 from .operator import JoinOperator
 from .operator import JoinOperator
 
 
 
 
+def _update_ready_for_reading(fct):
+    # This decorator must be wrapped before self_setter decorator as self_setter will run the function twice.
+    @functools.wraps(fct)
+    def _recompute_is_ready_for_reading(dn: "DataNode", *args, **kwargs):
+        fct(dn, *args, **kwargs)
+        if dn._edit_in_progress:
+            _ReadyToRunProperty._add(dn, f"DataNode {dn.id} is being edited")
+        else:
+            _ReadyToRunProperty._remove(dn, f"DataNode {dn.id} is being edited")
+        if not dn._last_edit_date:
+            _ReadyToRunProperty._add(dn, f"DataNode {dn.id} is not written")
+        else:
+            _ReadyToRunProperty._remove(dn, f"DataNode {dn.id} is not written")
+
+    return _recompute_is_ready_for_reading
+
+
 class DataNode(_Entity, _Labeled):
 class DataNode(_Entity, _Labeled):
     """Reference to a dataset.
     """Reference to a dataset.
 
 
@@ -172,6 +191,7 @@ class DataNode(_Entity, _Labeled):
             return self._last_edit_date
             return self._last_edit_date
 
 
     @last_edit_date.setter  # type: ignore
     @last_edit_date.setter  # type: ignore
+    @_update_ready_for_reading
     @_self_setter(_MANAGER_NAME)
     @_self_setter(_MANAGER_NAME)
     def last_edit_date(self, val):
     def last_edit_date(self, val):
         self._last_edit_date = val
         self._last_edit_date = val
@@ -236,6 +256,7 @@ class DataNode(_Entity, _Labeled):
         return self._edit_in_progress
         return self._edit_in_progress
 
 
     @edit_in_progress.setter  # type: ignore
     @edit_in_progress.setter  # type: ignore
+    @_update_ready_for_reading
     @_self_setter(_MANAGER_NAME)
     @_self_setter(_MANAGER_NAME)
     def edit_in_progress(self, val):
     def edit_in_progress(self, val):
         self._edit_in_progress = val
         self._edit_in_progress = val
@@ -382,7 +403,7 @@ class DataNode(_Entity, _Labeled):
         """Creates and adds a new entry in the edits attribute without writing the data.
         """Creates and adds a new entry in the edits attribute without writing the data.
 
 
         Parameters:
         Parameters:
-            options (dict[str, any)): track `timestamp`, `comments`, `job_id`. The others are user-custom, users can
+            options (dict[str, any]): track `timestamp`, `comments`, `job_id`. The others are user-custom, users can
                 use options to attach any information to an external edit of a data node.
                 use options to attach any information to an external edit of a data node.
         """
         """
         edit = {k: v for k, v in options.items() if v is not None}
         edit = {k: v for k, v in options.items() if v is not None}

+ 1 - 1
taipy/core/data/excel.py

@@ -84,7 +84,7 @@ class ExcelDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties: Dict = None,
         properties: Dict = None,
-    ):
+    ) -> None:
         self.id = id or self._new_id(config_id)
         self.id = id or self._new_id(config_id)
 
 
         if properties is None:
         if properties is None:

+ 1 - 1
taipy/core/data/generic.py

@@ -76,7 +76,7 @@ class GenericDataNode(DataNode):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties: Dict = None,
         properties: Dict = None,
-    ):
+    ) -> None:
         if properties is None:
         if properties is None:
             properties = {}
             properties = {}
         if missing := set(self._REQUIRED_PROPERTIES) - set(properties.keys()):
         if missing := set(self._REQUIRED_PROPERTIES) - set(properties.keys()):

+ 2 - 2
taipy/core/data/in_memory.py

@@ -75,7 +75,7 @@ class InMemoryDataNode(DataNode):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties=None,
         properties=None,
-    ):
+    ) -> None:
         if properties is None:
         if properties is None:
             properties = {}
             properties = {}
         default_value = properties.pop(self.__DEFAULT_DATA_VALUE, None)
         default_value = properties.pop(self.__DEFAULT_DATA_VALUE, None)
@@ -92,7 +92,7 @@ class InMemoryDataNode(DataNode):
             edit_in_progress,
             edit_in_progress,
             editor_id,
             editor_id,
             editor_expiration_date,
             editor_expiration_date,
-            **properties
+            **properties,
         )
         )
         if default_value is not None and self.id not in in_memory_storage:
         if default_value is not None and self.id not in in_memory_storage:
             self._write(default_value)
             self._write(default_value)

+ 1 - 1
taipy/core/data/json.py

@@ -80,7 +80,7 @@ class JSONDataNode(DataNode, _FileDataNodeMixin):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties: Optional[Dict] = None,
         properties: Optional[Dict] = None,
-    ):
+    ) -> None:
         self.id = id or self._new_id(config_id)
         self.id = id or self._new_id(config_id)
 
 
         if properties is None:
         if properties is None:

+ 1 - 1
taipy/core/data/mongo.py

@@ -103,7 +103,7 @@ class MongoCollectionDataNode(DataNode):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties: Dict = None,
         properties: Dict = None,
-    ):
+    ) -> None:
         _check_dependency_is_installed("Mongo Data Node", "pymongo")
         _check_dependency_is_installed("Mongo Data Node", "pymongo")
         if properties is None:
         if properties is None:
             properties = {}
             properties = {}

+ 1 - 1
taipy/core/data/parquet.py

@@ -96,7 +96,7 @@ class ParquetDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties: Optional[Dict] = None,
         properties: Optional[Dict] = None,
-    ):
+    ) -> None:
         self.id = id or self._new_id(config_id)
         self.id = id or self._new_id(config_id)
 
 
         if properties is None:
         if properties is None:

+ 1 - 1
taipy/core/data/pickle.py

@@ -72,7 +72,7 @@ class PickleDataNode(DataNode, _FileDataNodeMixin):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties=None,
         properties=None,
-    ):
+    ) -> None:
         self.id = id or self._new_id(config_id)
         self.id = id or self._new_id(config_id)
 
 
         if properties is None:
         if properties is None:

+ 1 - 1
taipy/core/data/sql.py

@@ -91,7 +91,7 @@ class SQLDataNode(_AbstractSQLDataNode):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties: Optional[Dict] = None,
         properties: Optional[Dict] = None,
-    ):
+    ) -> None:
         if properties is None:
         if properties is None:
             properties = {}
             properties = {}
         if properties.get(self.__READ_QUERY_KEY) is None:
         if properties.get(self.__READ_QUERY_KEY) is None:

+ 1 - 1
taipy/core/data/sql_table.py

@@ -85,7 +85,7 @@ class SQLTableDataNode(_AbstractSQLDataNode):
         editor_id: Optional[str] = None,
         editor_id: Optional[str] = None,
         editor_expiration_date: Optional[datetime] = None,
         editor_expiration_date: Optional[datetime] = None,
         properties: Optional[Dict] = None,
         properties: Optional[Dict] = None,
-    ):
+    ) -> None:
         if properties is None:
         if properties is None:
             properties = {}
             properties = {}
         if properties.get(self.__TABLE_KEY) is None:
         if properties.get(self.__TABLE_KEY) is None:

+ 3 - 2
taipy/core/scenario/_scenario_manager.py

@@ -216,8 +216,9 @@ class _ScenarioManager(_Manager[Scenario], _VersionMixin):
         **properties,
         **properties,
     ) -> Submission:
     ) -> Submission:
         scenario_id = scenario.id if isinstance(scenario, Scenario) else scenario
         scenario_id = scenario.id if isinstance(scenario, Scenario) else scenario
-        scenario = cls._get(scenario_id)
-        if scenario is None:
+        if not isinstance(scenario, Scenario):
+            scenario = cls._get(scenario_id)
+        if scenario is None or not cls._exists(scenario_id):
             raise NonExistingScenario(scenario_id)
             raise NonExistingScenario(scenario_id)
         callbacks = callbacks or []
         callbacks = callbacks or []
         scenario_subscription_callback = cls.__get_status_notifier_callbacks(scenario) + callbacks
         scenario_subscription_callback = cls.__get_status_notifier_callbacks(scenario) + callbacks

+ 1 - 1
taipy/core/scenario/scenario.py

@@ -95,9 +95,9 @@ class Scenario(_Entity, Submittable, _Labeled):
         version: str = None,
         version: str = None,
         sequences: Optional[Dict[str, Dict]] = None,
         sequences: Optional[Dict[str, Dict]] = None,
     ):
     ):
-        super().__init__(subscribers or [])
         self._config_id = _validate_id(config_id)
         self._config_id = _validate_id(config_id)
         self.id: ScenarioId = scenario_id or self._new_id(self.config_id)
         self.id: ScenarioId = scenario_id or self._new_id(self.config_id)
+        super().__init__(self.id, subscribers or [])
 
 
         self._tasks: Union[Set[TaskId], Set[Task], Set] = tasks or set()
         self._tasks: Union[Set[TaskId], Set[Task], Set] = tasks or set()
         self._additional_data_nodes: Union[Set[DataNodeId], Set[DataNode], Set] = additional_data_nodes or set()
         self._additional_data_nodes: Union[Set[DataNodeId], Set[DataNode], Set] = additional_data_nodes or set()

+ 1 - 1
taipy/core/sequence/sequence.py

@@ -64,8 +64,8 @@ class Sequence(_Entity, Submittable, _Labeled):
         subscribers: Optional[List[_Subscriber]] = None,
         subscribers: Optional[List[_Subscriber]] = None,
         version: Optional[str] = None,
         version: Optional[str] = None,
     ):
     ):
-        super().__init__(subscribers)
         self.id: SequenceId = sequence_id
         self.id: SequenceId = sequence_id
+        super().__init__(self.id, subscribers)
         self._tasks = tasks
         self._tasks = tasks
         self._owner_id = owner_id
         self._owner_id = owner_id
         self._parent_ids = parent_ids or set()
         self._parent_ids = parent_ids or set()

+ 227 - 0
tests/core/_entity/test_ready_to_run_property.py

@@ -0,0 +1,227 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+
+from taipy import ScenarioId, SequenceId, TaskId
+from taipy.config.common.frequency import Frequency
+from taipy.config.config import Config
+from taipy.core._entity._ready_to_run_property import _ReadyToRunProperty
+from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
+from taipy.core.sequence._sequence_manager_factory import _SequenceManagerFactory
+from taipy.core.task._task_manager_factory import _TaskManagerFactory
+
+
+def test_non_existing_submittable_is_not_ready_to_run():
+    assert not _ScenarioManagerFactory._build_manager()._is_submittable(ScenarioId("wrong_id"))
+    assert not _SequenceManagerFactory._build_manager()._is_submittable(SequenceId("wrong_id"))
+    assert not _TaskManagerFactory._build_manager()._is_submittable(TaskId("wrong_id"))
+
+
+def test_scenario_without_input_is_ready_to_run():
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    task_config = Config.configure_task("task", print, [], [])
+    scenario_config = Config.configure_scenario("sc", [task_config], [], Frequency.DAILY)
+    scenario = scenario_manager._create(scenario_config)
+
+    assert scenario_manager._is_submittable(scenario)
+    assert scenario.id not in _ReadyToRunProperty._submittable_id_datanodes
+
+
+def test_scenario_submittable_with_inputs_is_ready_to_run():
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    dn_config_1 = Config.configure_in_memory_data_node("dn_1", 10)
+    dn_config_2 = Config.configure_in_memory_data_node("dn_2", 10)
+    task_config = Config.configure_task("task", print, [dn_config_1, dn_config_2], [])
+    scenario_config = Config.configure_scenario("sc", [task_config])
+    scenario = scenario_manager._create(scenario_config)
+
+    assert scenario_manager._is_submittable(scenario)
+    assert scenario.id not in _ReadyToRunProperty._submittable_id_datanodes
+
+
+def test_scenario_submittable_even_with_output_not_ready_to_run():
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    dn_config_1 = Config.configure_in_memory_data_node("dn_1", 10)
+    dn_config_2 = Config.configure_in_memory_data_node("dn_2", 10)
+    dn_config_3 = Config.configure_in_memory_data_node("dn_3")
+    task_config = Config.configure_task("task", print, [dn_config_1, dn_config_2], [dn_config_3])
+    scenario_config = Config.configure_scenario("sc", [task_config])
+    scenario = scenario_manager._create(scenario_config)
+    dn_3 = scenario.dn_3
+
+    assert not dn_3.is_ready_for_reading
+    assert scenario_manager._is_submittable(scenario)
+    assert scenario.id not in _ReadyToRunProperty._submittable_id_datanodes
+
+
+def test_scenario_not_submittable_not_in_property_because_it_is_lazy():
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    dn_config_1 = Config.configure_in_memory_data_node("dn_1", 10)
+    dn_config_2 = Config.configure_in_memory_data_node("dn_2")
+    task_config = Config.configure_task("task", print, [dn_config_1, dn_config_2], [])
+    scenario_config = Config.configure_scenario("sc", [task_config], [], Frequency.DAILY)
+    scenario = scenario_manager._create(scenario_config)
+    dn_1 = scenario.dn_1
+    dn_2 = scenario.dn_2
+
+    assert dn_1.is_ready_for_reading
+    assert not dn_2.is_ready_for_reading
+    assert not scenario_manager._is_submittable(scenario)
+
+    # Since it is a lazy property, the scenario and the datanodes is not yet in the dictionary
+    assert scenario.id not in _ReadyToRunProperty._submittable_id_datanodes
+    assert dn_1.id not in _ReadyToRunProperty._datanode_id_submittables
+    assert dn_2.id not in _ReadyToRunProperty._datanode_id_submittables
+
+
+def test_scenario_not_submittable_if_one_input_edit_in_progress():
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    dn_config_1 = Config.configure_in_memory_data_node("dn_1", 10)
+    task_config = Config.configure_task("task", print, [dn_config_1], [])
+    scenario_config = Config.configure_scenario("sc", [task_config])
+    scenario = scenario_manager._create(scenario_config)
+    dn_1 = scenario.dn_1
+    dn_1.lock_edit()
+
+    assert not dn_1.is_ready_for_reading
+    assert not scenario_manager._is_submittable(scenario)
+
+    assert scenario.id in _ReadyToRunProperty._submittable_id_datanodes
+    assert dn_1.id in _ReadyToRunProperty._submittable_id_datanodes[scenario.id]
+    assert dn_1.id in _ReadyToRunProperty._datanode_id_submittables
+    assert scenario.id in _ReadyToRunProperty._datanode_id_submittables[dn_1.id]
+    assert _ReadyToRunProperty._submittable_id_datanodes[scenario.id][dn_1.id] == {
+        f"DataNode {dn_1.id} is being edited"
+    }
+
+
+def test_scenario_not_submittable_for_multiple_reasons():
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    dn_config_1 = Config.configure_in_memory_data_node("dn_1", 10)
+    dn_config_2 = Config.configure_in_memory_data_node("dn_2")
+    dn_config_3 = Config.configure_in_memory_data_node("dn_3", 10)
+    task_config = Config.configure_task("task", print, [dn_config_1, dn_config_2, dn_config_3], [])
+    scenario_config = Config.configure_scenario("sc", [task_config])
+    scenario = scenario_manager._create(scenario_config)
+    dn_1 = scenario.dn_1
+    dn_1.lock_edit()
+    dn_2 = scenario.dn_2
+    dn_2.lock_edit()
+
+    assert not dn_1.is_ready_for_reading
+    assert not dn_2.is_ready_for_reading
+    assert not scenario_manager._is_submittable(scenario)
+
+    assert scenario.id in _ReadyToRunProperty._submittable_id_datanodes
+    assert dn_1.id in _ReadyToRunProperty._submittable_id_datanodes[scenario.id]
+    assert dn_2.id in _ReadyToRunProperty._submittable_id_datanodes[scenario.id]
+    assert dn_1.id in _ReadyToRunProperty._datanode_id_submittables
+    assert dn_2.id in _ReadyToRunProperty._datanode_id_submittables
+    assert scenario.id in _ReadyToRunProperty._datanode_id_submittables[dn_1.id]
+    assert _ReadyToRunProperty._submittable_id_datanodes[scenario.id][dn_1.id] == {
+        f"DataNode {dn_1.id} is being edited"
+    }
+    assert scenario.id in _ReadyToRunProperty._datanode_id_submittables[dn_2.id]
+    assert _ReadyToRunProperty._submittable_id_datanodes[scenario.id][dn_2.id] == {
+        f"DataNode {dn_2.id} is being edited",
+        f"DataNode {dn_2.id} is not written",
+    }
+
+
+def test_writing_input_remove_reasons():
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    dn_config_1 = Config.configure_in_memory_data_node("dn_1")
+    task_config = Config.configure_task("task", print, [dn_config_1], [])
+    scenario_config = Config.configure_scenario("sc", [task_config])
+    scenario = scenario_manager._create(scenario_config)
+    dn_1 = scenario.dn_1
+
+    assert not dn_1.is_ready_for_reading
+    assert not scenario_manager._is_submittable(scenario)
+    # Since it is a lazy property, the scenario is not yet in the dictionary
+    assert scenario.id not in _ReadyToRunProperty._submittable_id_datanodes
+
+    dn_1.lock_edit()
+    assert _ReadyToRunProperty._submittable_id_datanodes[scenario.id][dn_1.id] == {
+        f"DataNode {dn_1.id} is being edited",
+        f"DataNode {dn_1.id} is not written",
+    }
+
+    dn_1.write(10)
+    assert scenario_manager._is_submittable(scenario)
+    assert scenario.id not in _ReadyToRunProperty._submittable_id_datanodes
+    assert dn_1.id not in _ReadyToRunProperty._datanode_id_submittables
+
+
+def identity(arg):
+    return arg
+
+
+def __assert_not_submittable_becomes_submittable_when_dn_edited(sequence, manager, dn):
+    assert not dn.is_ready_for_reading
+    assert not manager._is_submittable(sequence)
+    # Since it is a lazy property, the sequence is not yet in the dictionary
+    assert sequence.id not in _ReadyToRunProperty._submittable_id_datanodes
+
+    dn.lock_edit()
+    assert _ReadyToRunProperty._submittable_id_datanodes[sequence.id][dn.id] == {
+        f"DataNode {dn.id} is being edited",
+        f"DataNode {dn.id} is not written",
+    }
+    dn.write("ANY VALUE")
+    assert manager._is_submittable(sequence)
+    assert sequence.id not in _ReadyToRunProperty._submittable_id_datanodes
+    assert dn.id not in _ReadyToRunProperty._datanode_id_submittables
+
+
+def test_writing_config_sequence_input_remove_reasons():
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    dn_config_1 = Config.configure_in_memory_data_node("dn_1", 10)
+    dn_config_2 = Config.configure_in_memory_data_node("dn_2")
+    dn_config_3 = Config.configure_in_memory_data_node("dn_3")
+    task_1_config = Config.configure_task("task_1", identity, [dn_config_1], [dn_config_2])
+    task_2_config = Config.configure_task("task_2", identity, [dn_config_2], [dn_config_3])
+    scenario_config = Config.configure_scenario(
+        "sc", [task_1_config, task_2_config], sequences={"seq": [task_2_config]}
+    )
+    scenario = scenario_manager._create(scenario_config)
+
+    manager = _SequenceManagerFactory._build_manager()
+    __assert_not_submittable_becomes_submittable_when_dn_edited(scenario.sequences["seq"], manager, scenario.dn_2)
+
+
+def test_writing_runtime_sequence_input_remove_reasons():
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    dn_config_1 = Config.configure_in_memory_data_node("dn_1", 10)
+    dn_config_2 = Config.configure_in_memory_data_node("dn_2")
+    dn_config_3 = Config.configure_in_memory_data_node("dn_3")
+    task_1_config = Config.configure_task("task_1", identity, [dn_config_1], [dn_config_2])
+    task_2_config = Config.configure_task("task_2", identity, [dn_config_2], [dn_config_3])
+    scenario_config = Config.configure_scenario("sc", [task_1_config, task_2_config])
+    scenario = scenario_manager._create(scenario_config)
+    scenario.add_sequence("seq", [scenario.tasks["task_2"]])
+
+    manager = _SequenceManagerFactory._build_manager()
+    __assert_not_submittable_becomes_submittable_when_dn_edited(scenario.sequences["seq"], manager, scenario.dn_2)
+
+
+def test_writing_task_input_remove_reasons():
+    scenario_manager = _ScenarioManagerFactory._build_manager()
+    dn_config_1 = Config.configure_in_memory_data_node("dn_1", 10)
+    dn_config_2 = Config.configure_in_memory_data_node("dn_2")
+    dn_config_3 = Config.configure_in_memory_data_node("dn_3")
+    task_1_config = Config.configure_task("task_1", identity, [dn_config_1], [dn_config_2])
+    task_2_config = Config.configure_task("task_2", identity, [dn_config_2], [dn_config_3])
+    scenario_config = Config.configure_scenario("sc", [task_1_config, task_2_config])
+    scenario = scenario_manager._create(scenario_config)
+
+    manager = _TaskManagerFactory._build_manager()
+    __assert_not_submittable_becomes_submittable_when_dn_edited(scenario.tasks["task_2"], manager, scenario.dn_2)

+ 2 - 0
tests/core/data/test_data_node.py

@@ -497,6 +497,8 @@ class TestDataNode:
         _DataManager._set(dn_2)
         _DataManager._set(dn_2)
         assert dn_1.parent_ids == {"sc1"}
         assert dn_1.parent_ids == {"sc1"}
         assert dn_2.parent_ids == {"sc1"}
         assert dn_2.parent_ids == {"sc1"}
+        dn_2._parent_ids.clear()
+        _DataManager._set(dn_2)
 
 
         # auto set & reload on edit_in_progress attribute
         # auto set & reload on edit_in_progress attribute
         assert not dn_2.edit_in_progress
         assert not dn_2.edit_in_progress

+ 6 - 6
tests/core/notification/test_events_published.py

@@ -10,7 +10,7 @@
 # specific language governing permissions and limitations under the License.
 # specific language governing permissions and limitations under the License.
 
 
 from queue import SimpleQueue
 from queue import SimpleQueue
-from typing import Dict, List
+from typing import Any, Dict, List
 
 
 from taipy.config import Config, Frequency
 from taipy.config import Config, Frequency
 from taipy.core import taipy as tp
 from taipy.core import taipy as tp
@@ -31,6 +31,7 @@ class Snapshot:
         self.entity_type_collected: Dict[EventEntityType, int] = {}
         self.entity_type_collected: Dict[EventEntityType, int] = {}
         self.operation_collected: Dict[EventEntityType, int] = {}
         self.operation_collected: Dict[EventEntityType, int] = {}
         self.attr_name_collected: Dict[EventEntityType, int] = {}
         self.attr_name_collected: Dict[EventEntityType, int] = {}
+        self.attr_value_collected: Dict[EventEntityType, List[Any]] = {}
 
 
     def capture_event(self, event):
     def capture_event(self, event):
         self.collected_events.append(event)
         self.collected_events.append(event)
@@ -38,6 +39,10 @@ class Snapshot:
         self.operation_collected[event.operation] = self.operation_collected.get(event.operation, 0) + 1
         self.operation_collected[event.operation] = self.operation_collected.get(event.operation, 0) + 1
         if event.attribute_name:
         if event.attribute_name:
             self.attr_name_collected[event.attribute_name] = self.attr_name_collected.get(event.attribute_name, 0) + 1
             self.attr_name_collected[event.attribute_name] = self.attr_name_collected.get(event.attribute_name, 0) + 1
+            if self.attr_value_collected.get(event.attribute_name, None):
+                self.attr_value_collected[event.attribute_name].append(event.attribute_value)
+            else:
+                self.attr_value_collected[event.attribute_name] = [event.attribute_value]
 
 
 
 
 class RecordingConsumer(CoreEventConsumerBase):
 class RecordingConsumer(CoreEventConsumerBase):
@@ -145,12 +150,7 @@ def test_events_published_for_writing_dn():
     scenario.the_input.write("test")
     scenario.the_input.write("test")
     snapshot = all_evts.capture()
     snapshot = all_evts.capture()
     assert len(snapshot.collected_events) == 4
     assert len(snapshot.collected_events) == 4
-    assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 0
     assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 4
     assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 4
-    assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 0
-    assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 0
-    assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 0
-    assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 0
     assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 4
     assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 4
     all_evts.stop()
     all_evts.stop()
 
 

+ 117 - 0
tests/core/notification/test_published_ready_to_run_event.py

@@ -0,0 +1,117 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+
+from taipy.config.config import Config
+from taipy.core.notification.event import EventEntityType, EventOperation
+from taipy.core.notification.notifier import Notifier
+from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
+from tests.core.notification.test_events_published import RecordingConsumer
+
+
+def empty_fct(inp):
+    return inp
+
+
+def test_lock_edit_publish_submittable_event():
+    dn_config_1 = Config.configure_pickle_data_node("dn_1")
+    dn_config_2 = Config.configure_pickle_data_node("dn_2")
+    task_config = Config.configure_task("task", empty_fct, [dn_config_1], [dn_config_2])
+    scenario_config = Config.configure_scenario("sc", [task_config])
+    scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
+    scenario.add_sequences({"sequence": [scenario.task]})
+    dn_1 = scenario.dn_1
+    register_id_0, register_queue_0 = Notifier.register()
+    all_evts = RecordingConsumer(register_id_0, register_queue_0)
+
+    all_evts.start()
+    dn_1.lock_edit()
+    snapshot = all_evts.capture()
+
+    assert len(snapshot.collected_events) == 6
+    assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 3
+    assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 1
+    assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 1
+    assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
+    assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 6
+    assert snapshot.attr_name_collected["is_submittable"] == 3
+    assert snapshot.attr_value_collected["is_submittable"] == [False, False, False]
+
+
+def test_write_never_written_input_does_not_publish_submittable_event():
+    dn_config_1 = Config.configure_pickle_data_node("dn_1")
+    dn_config_2 = Config.configure_pickle_data_node("dn_2")
+    task_config = Config.configure_task("task", empty_fct, [dn_config_1], [dn_config_2])
+    scenario_config = Config.configure_scenario("sc", [task_config])
+    scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
+    scenario.add_sequences({"sequence": [scenario.task]})
+    register_id_0, register_queue_0 = Notifier.register()
+    all_evts = RecordingConsumer(register_id_0, register_queue_0)
+
+    all_evts.start()
+    scenario.dn_1.write(15)
+    snapshot = all_evts.capture()
+
+    # Since it is a lazy property, no submittable event is published. Only the data node update events are published.
+    assert len(snapshot.collected_events) == 4
+    assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 4
+    assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 4
+
+def test_write_never_written_input_publish_submittable_event_if_scenario_in_property():
+    dn_config_1 = Config.configure_pickle_data_node("dn_1")
+    dn_config_2 = Config.configure_pickle_data_node("dn_2")
+    task_config = Config.configure_task("task", empty_fct, [dn_config_1], [dn_config_2])
+    scenario_config = Config.configure_scenario("sc", [task_config])
+    scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
+    scenario.add_sequences({"sequence": [scenario.task]})
+    register_id_0, register_queue_0 = Notifier.register()
+    all_evts = RecordingConsumer(register_id_0, register_queue_0)
+
+    # This makes the dn_1 not ready for 2 reasons. 1. It is not written. 2. It is locked. PLus it makes the scenario,
+    # the sequence and the task handled by the property.
+    scenario.dn_1.lock_edit()
+
+    all_evts.start()
+    scenario.dn_1.write(15)
+    snapshot = all_evts.capture()
+
+    # Since it is a lazy property, no submittable event is published. Only the data node update events are published.
+    assert len(snapshot.collected_events) == 13
+    assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 7
+    assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 2
+    assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 2
+    assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 2
+    assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 13
+    assert snapshot.attr_name_collected["is_submittable"] == 6
+    assert snapshot.attr_value_collected["is_submittable"] == [False, False, False, True, True, True]
+
+
+
+def test_write_output_does_not_publish_submittable_event():
+    dn_config_1 = Config.configure_pickle_data_node("dn_1", default_data="any value")
+    dn_config_2 = Config.configure_pickle_data_node("dn_2")
+    task_config = Config.configure_task("task", empty_fct, [dn_config_1], [dn_config_2])
+    scenario_config = Config.configure_scenario("sc", [task_config])
+    scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
+    scenario.add_sequences({"sequence": [scenario.task]})
+    register_id_0, register_queue_0 = Notifier.register()
+    all_evts = RecordingConsumer(register_id_0, register_queue_0)
+
+    all_evts.start()
+    scenario.dn_2.write(15)
+    snapshot = all_evts.capture()
+
+    assert len(snapshot.collected_events) == 4
+    assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 4
+    assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 4
+    assert "is_submittable" not in snapshot.attr_name_collected
+    assert "is_submittable" not in snapshot.attr_value_collected
+    all_evts.stop()

+ 39 - 18
tests/core/scenario/test_scenario.py

@@ -981,34 +981,55 @@ def test_get_inputs_outputs_intermediate_data_nodes():
 
 
 
 
 def test_is_ready_to_run():
 def test_is_ready_to_run():
-    data_node_1 = PickleDataNode("foo", Scope.SCENARIO, "s1", properties={"default_data": 1})
-    data_node_2 = PickleDataNode("bar", Scope.SCENARIO, "s2", properties={"default_data": 2})
-    data_node_4 = PickleDataNode("qux", Scope.SCENARIO, "s4", properties={"default_data": 4})
-    data_node_5 = PickleDataNode("quux", Scope.SCENARIO, "s5", properties={"default_data": 5})
-    data_node_6 = PickleDataNode("quuz", Scope.SCENARIO, "s6", properties={"default_data": 6})
-    data_node_7 = PickleDataNode("corge", Scope.SCENARIO, "s7", properties={"default_data": 7})
-    data_node_8 = PickleDataNode("d8", Scope.SCENARIO, "s8", properties={"default_data": 8})
-    data_node_9 = PickleDataNode("d9", Scope.SCENARIO, "s9", properties={"default_data": 9})
-    task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
-    task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2"))
-    task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3"))
-    task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
-    task_5 = Task("t5", {}, print, [data_node_8], [data_node_9], TaskId("t5"))
-    task_6 = Task("t6", {}, print, [data_node_7, data_node_9], id=TaskId("t6"))
-    scenario = Scenario("scenario", {task_1, task_2, task_3, task_4, task_5, task_6}, {}, set(), ScenarioId("s1"))
-    # s1 ---      s6 ---> t2 ---> s5
+    task_1_id, task_2_id, task_3_id, task_4_id, task_5_id, task_6_id = (
+        TaskId("TASK_t1"),
+        TaskId("TASK_t2"),
+        TaskId("TASK_t3"),
+        TaskId("TASK_t4"),
+        TaskId("TASK_t5"),
+        TaskId("TASK_t6"),
+    )
+    sc_id = ScenarioId("SCENARIO_s1")
+    data_node_1 = PickleDataNode("foo", Scope.SCENARIO, "d1", parent_ids={task_1_id}, properties={"default_data": 1})
+    data_node_2 = PickleDataNode("bar", Scope.SCENARIO, "d2", parent_ids={task_1_id}, properties={"default_data": 2})
+    data_node_4 = PickleDataNode(
+        "qux", Scope.SCENARIO, "d4", parent_ids={task_1_id, task_4_id, task_3_id}, properties={"default_data": 4}
+    )
+    data_node_5 = PickleDataNode(
+        "quux", Scope.SCENARIO, "d5", parent_ids={task_2_id, task_3_id}, properties={"default_data": 5}
+    )
+    data_node_6 = PickleDataNode("quuz", Scope.SCENARIO, "d6", parent_ids={task_2_id}, properties={"default_data": 6})
+    data_node_7 = PickleDataNode(
+        "corge", Scope.SCENARIO, "d7", parent_ids={task_4_id, task_6_id}, properties={"default_data": 7}
+    )
+    data_node_8 = PickleDataNode("d8", Scope.SCENARIO, "d8", parent_ids={task_5_id}, properties={"default_data": 8})
+    data_node_9 = PickleDataNode(
+        "d9", Scope.SCENARIO, "d9", parent_ids={task_5_id, task_6_id}, properties={"default_data": 9}
+    )
+    task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], id=task_1_id, parent_ids={sc_id})
+    task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], id=task_2_id, parent_ids={sc_id})
+    task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=task_3_id, parent_ids={sc_id})
+    task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], id=task_4_id, parent_ids={sc_id})
+    task_5 = Task("t5", {}, print, [data_node_8], [data_node_9], id=task_5_id, parent_ids={sc_id})
+    task_6 = Task("t6", {}, print, [data_node_7, data_node_9], id=task_6_id, parent_ids={sc_id})
+    scenario = Scenario("scenario", {task_1, task_2, task_3, task_4, task_5, task_6}, {}, set(), scenario_id=sc_id)
+    # d1 ---      d6 ---> t2 ---> d5
     #       |                     |
     #       |                     |
     #       |---> t1 ---|      -----> t3
     #       |---> t1 ---|      -----> t3
     #       |           |      |
     #       |           |      |
-    # s2 ---             ---> s4 ---> t4 ---> s7 ---> t6
+    # d2 ---             ---> d4 ---> t4 ---> d7 ---> t6
     #                                              |
     #                                              |
-    # s8 -------> t5 -------> s9 ------------------
+    # d8 -------> t5 -------> d9 ------------------
     assert scenario.get_inputs() == {data_node_1, data_node_2, data_node_6, data_node_8}
     assert scenario.get_inputs() == {data_node_1, data_node_2, data_node_6, data_node_8}
 
 
     data_manager = _DataManagerFactory._build_manager()
     data_manager = _DataManagerFactory._build_manager()
     data_manager._delete_all()
     data_manager._delete_all()
     for dn in [data_node_1, data_node_2, data_node_4, data_node_5, data_node_6, data_node_7, data_node_8, data_node_9]:
     for dn in [data_node_1, data_node_2, data_node_4, data_node_5, data_node_6, data_node_7, data_node_8, data_node_9]:
         data_manager._set(dn)
         data_manager._set(dn)
+    task_manager = _TaskManagerFactory._build_manager()
+    for task in [task_1, task_2, task_3, task_4, task_5, task_6]:
+        task_manager._set(task)
+    _ScenarioManagerFactory._build_manager()._set(scenario)
 
 
     assert scenario.is_ready_to_run()
     assert scenario.is_ready_to_run()
 
 

+ 17 - 15
tests/core/scenario/test_scenario_manager.py

@@ -580,8 +580,8 @@ def test_notification_subscribe(mocker):
             Config.configure_task(
             Config.configure_task(
                 "mult_by_2",
                 "mult_by_2",
                 mult_by_2,
                 mult_by_2,
-                [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
-                Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
+                [Config.configure_data_node("foo", "pickle", Scope.SCENARIO, default_data=1)],
+                Config.configure_data_node("bar", "pickle", Scope.SCENARIO, default_data=0),
             )
             )
         ],
         ],
     )
     )
@@ -590,34 +590,36 @@ def test_notification_subscribe(mocker):
 
 
     notify_1 = NotifyMock(scenario)
     notify_1 = NotifyMock(scenario)
     notify_2 = NotifyMock(scenario)
     notify_2 = NotifyMock(scenario)
-    mocker.patch.object(_utils, "_load_fct", side_effect=[notify_1, notify_2])
+    mocker.patch.object(
+        _utils,
+        "_load_fct",
+        side_effect=[
+            notify_1,
+            notify_1,
+            notify_1,
+            notify_1,
+            notify_2,
+            notify_2,
+            notify_2,
+            notify_2,
+        ],
+    )
 
 
     # test subscribing notification
     # test subscribing notification
     _ScenarioManager._subscribe(callback=notify_1, scenario=scenario)
     _ScenarioManager._subscribe(callback=notify_1, scenario=scenario)
     _ScenarioManager._submit(scenario)
     _ScenarioManager._submit(scenario)
     notify_1.assert_called_3_times()
     notify_1.assert_called_3_times()
-
     notify_1.reset()
     notify_1.reset()
 
 
     # test unsubscribing notification
     # test unsubscribing notification
-    # test notis subscribe only on new jobs
-    # _ScenarioManager._get(scenario)
+    # test notif subscribe only on new jobs
     _ScenarioManager._unsubscribe(callback=notify_1, scenario=scenario)
     _ScenarioManager._unsubscribe(callback=notify_1, scenario=scenario)
     _ScenarioManager._subscribe(callback=notify_2, scenario=scenario)
     _ScenarioManager._subscribe(callback=notify_2, scenario=scenario)
     _ScenarioManager._submit(scenario)
     _ScenarioManager._submit(scenario)
-
     notify_1.assert_not_called()
     notify_1.assert_not_called()
     notify_2.assert_called_3_times()
     notify_2.assert_called_3_times()
 
 
 
 
-class Notify:
-    def __call__(self, *args, **kwargs):
-        self.args = args
-
-    def assert_called_with(self, args):
-        assert args in self.args
-
-
 def test_notification_subscribe_multiple_params(mocker):
 def test_notification_subscribe_multiple_params(mocker):
     mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
     mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
 
 

+ 35 - 19
tests/core/sequence/test_sequence.py

@@ -432,26 +432,42 @@ def test_get_inputs():
 
 
 
 
 def test_is_ready_to_run():
 def test_is_ready_to_run():
-    data_node_1 = PickleDataNode("foo", Scope.SCENARIO, "s1", properties={"default_data": 1})
-    data_node_2 = PickleDataNode("bar", Scope.SCENARIO, "s2", properties={"default_data": 2})
-    data_node_4 = PickleDataNode("qux", Scope.SCENARIO, "s4", properties={"default_data": 4})
-    data_node_5 = PickleDataNode("quux", Scope.SCENARIO, "s5", properties={"default_data": 5})
-    data_node_6 = PickleDataNode("quuz", Scope.SCENARIO, "s6", properties={"default_data": 6})
-    data_node_7 = PickleDataNode("corge", Scope.SCENARIO, "s7", properties={"default_data": 7})
-    task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], TaskId("t1"))
-    task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], TaskId("t2"))
-    task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=TaskId("t3"))
-    task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
-    sequence = Sequence({}, [task_4, task_2, task_1, task_3], SequenceId("p1"))
-    # s1 ---      s6 ---> t2 ---> s5
-    #       |                     |
-    #       |---> t1 ---|      -----> t3
-    #       |           |      |
-    # s2 ---             ---> s4 ---> t4 ---> s7
+    scenario_id = "SCENARIO_scenario_id"
+    task_1_id, task_2_id, task_3_id, task_4_id = (
+        TaskId("TASK_t1"),
+        TaskId("TASK_t2"),
+        TaskId("TASK_t3"),
+        TaskId("TASK_t4"),
+    )
+    data_node_1 = PickleDataNode("foo", Scope.SCENARIO, "s1", parent_ids={task_1_id}, properties={"default_data": 1})
+    data_node_2 = PickleDataNode("bar", Scope.SCENARIO, "s2", parent_ids={task_1_id}, properties={"default_data": 2})
+    data_node_3 = PickleDataNode(
+        "qux", Scope.SCENARIO, "s3", parent_ids={task_1_id, task_3_id, task_4_id}, properties={"default_data": 4}
+    )
+    data_node_4 = PickleDataNode(
+        "quux", Scope.SCENARIO, "s4", parent_ids={task_2_id, task_3_id}, properties={"default_data": 5}
+    )
+    data_node_5 = PickleDataNode("quuz", Scope.SCENARIO, "s5", parent_ids={task_2_id}, properties={"default_data": 6})
+    data_node_6 = PickleDataNode("corge", Scope.SCENARIO, "s6", parent_ids={task_4_id}, properties={"default_data": 7})
+    task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3], id=task_1_id)
+    task_2 = Task("garply", {}, print, [data_node_5], [data_node_4], id=task_2_id)
+    task_3 = Task("waldo", {}, print, [data_node_4, data_node_3], id=task_3_id)
+    task_4 = Task("fred", {}, print, [data_node_3], [data_node_6], id=task_4_id)
+    scenario = Scenario("scenario_config", [task_1, task_2, task_3, task_4], {}, scenario_id=scenario_id)
 
 
     data_manager = _DataManagerFactory._build_manager()
     data_manager = _DataManagerFactory._build_manager()
-    for dn in [data_node_1, data_node_2, data_node_4, data_node_5, data_node_6, data_node_7]:
+    for dn in [data_node_1, data_node_2, data_node_3, data_node_4, data_node_5, data_node_6]:
         data_manager._set(dn)
         data_manager._set(dn)
+    for task in [task_1, task_2, task_3, task_4]:
+        _TaskManager._set(task)
+    _ScenarioManager._set(scenario)
+    scenario.add_sequence("sequence", [task_4, task_2, task_1, task_3])
+    sequence = scenario.sequences["sequence"]
+    # s1 ---      s5 ---> t2 ---> s4
+    #       |                     |
+    #       |---> t1 ---|      -----> t3
+    #       |           |      |
+    # s2 ---             ---> s3 ---> t4 ---> s6
 
 
     assert sequence.is_ready_to_run()
     assert sequence.is_ready_to_run()
 
 
@@ -459,12 +475,12 @@ def test_is_ready_to_run():
     assert not sequence.is_ready_to_run()
     assert not sequence.is_ready_to_run()
 
 
     data_node_2.edit_in_progress = True
     data_node_2.edit_in_progress = True
-    data_node_6.edit_in_progress = True
+    data_node_5.edit_in_progress = True
     assert not sequence.is_ready_to_run()
     assert not sequence.is_ready_to_run()
 
 
     data_node_1.edit_in_progress = False
     data_node_1.edit_in_progress = False
     data_node_2.edit_in_progress = False
     data_node_2.edit_in_progress = False
-    data_node_6.edit_in_progress = False
+    data_node_5.edit_in_progress = False
     assert sequence.is_ready_to_run()
     assert sequence.is_ready_to_run()
 
 
 
 

+ 16 - 1
tests/core/sequence/test_sequence_manager.py

@@ -471,7 +471,22 @@ def test_sequence_notification_subscribe(mocker):
     notify_2.__module__ = "notify_2"
     notify_2.__module__ = "notify_2"
     # Mocking this because NotifyMock is a class that does not loads correctly when getting the sequence
     # Mocking this because NotifyMock is a class that does not loads correctly when getting the sequence
     # from the storage.
     # from the storage.
-    mocker.patch.object(_utils, "_load_fct", side_effect=[notify_1, notify_1, notify_2, notify_2, notify_2, notify_2])
+    mocker.patch.object(
+        _utils,
+        "_load_fct",
+        side_effect=[
+            notify_1,
+            notify_1,
+            notify_1,
+            notify_1,
+            notify_2,
+            notify_2,
+            notify_2,
+            notify_2,
+            notify_2,
+            notify_2,
+        ],
+    )
 
 
     # test subscription
     # test subscription
     callback = mock.MagicMock()
     callback = mock.MagicMock()

+ 10 - 10
tests/rest/conftest.py

@@ -25,7 +25,7 @@ from taipy.config.common.scope import Scope
 from taipy.core import Cycle, DataNodeId, Job, JobId, Scenario, Sequence, Task
 from taipy.core import Cycle, DataNodeId, Job, JobId, Scenario, Sequence, Task
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.cycle._cycle_manager import _CycleManager
 from taipy.core.cycle._cycle_manager import _CycleManager
-from taipy.core.data.in_memory import InMemoryDataNode
+from taipy.core.data.pickle import PickleDataNode
 from taipy.core.job._job_manager import _JobManager
 from taipy.core.job._job_manager import _JobManager
 from taipy.core.task._task_manager import _TaskManager
 from taipy.core.task._task_manager import _TaskManager
 from taipy.rest.app import create_app
 from taipy.rest.app import create_app
@@ -124,24 +124,24 @@ def scenario_data():
 
 
 @pytest.fixture
 @pytest.fixture
 def default_datanode():
 def default_datanode():
-    return InMemoryDataNode(
+    return PickleDataNode(
         "input_ds",
         "input_ds",
         Scope.SCENARIO,
         Scope.SCENARIO,
         DataNodeId("f"),
         DataNodeId("f"),
-        "my name",
         "owner_id",
         "owner_id",
+        None,
         properties={"default_data": [1, 2, 3, 4, 5, 6]},
         properties={"default_data": [1, 2, 3, 4, 5, 6]},
     )
     )
 
 
 
 
 @pytest.fixture
 @pytest.fixture
 def default_df_datanode():
 def default_df_datanode():
-    return InMemoryDataNode(
+    return PickleDataNode(
         "input_ds",
         "input_ds",
         Scope.SCENARIO,
         Scope.SCENARIO,
         DataNodeId("id_uio2"),
         DataNodeId("id_uio2"),
-        "my name",
         "owner_id",
         "owner_id",
+        None,
         properties={"default_data": pd.DataFrame([{"a": 1, "b": 2}, {"a": 3, "b": 4}, {"a": 5, "b": 6}])},
         properties={"default_data": pd.DataFrame([{"a": 1, "b": 2}, {"a": 3, "b": 4}, {"a": 5, "b": 6}])},
     )
     )
 
 
@@ -160,21 +160,21 @@ def default_datanode_config_list():
 
 
 
 
 def __default_task():
 def __default_task():
-    input_ds = InMemoryDataNode(
+    input_ds = PickleDataNode(
         "input_ds",
         "input_ds",
         Scope.SCENARIO,
         Scope.SCENARIO,
         DataNodeId("id_uio"),
         DataNodeId("id_uio"),
-        "my name",
         "owner_id",
         "owner_id",
+        {"TASK_task_id"},
         properties={"default_data": "In memory Data Source"},
         properties={"default_data": "In memory Data Source"},
     )
     )
 
 
-    output_ds = InMemoryDataNode(
+    output_ds = PickleDataNode(
         "output_ds",
         "output_ds",
         Scope.SCENARIO,
         Scope.SCENARIO,
         DataNodeId("id_uio"),
         DataNodeId("id_uio"),
-        "my name",
         "owner_id",
         "owner_id",
+        {"TASK_task_id"},
         properties={"default_data": "In memory Data Source"},
         properties={"default_data": "In memory Data Source"},
     )
     )
     return Task(
     return Task(
@@ -183,7 +183,7 @@ def __default_task():
         function=print,
         function=print,
         input=[input_ds],
         input=[input_ds],
         output=[output_ds],
         output=[output_ds],
-        id=None,
+        id="TASK_task_id",
     )
     )