Преглед на файлове

cleaned submittable dn cache code, fixed failed test

Toan Quach преди 1 година
родител
ревизия
42111a80d4

+ 3 - 6
taipy/core/_entity/submittable.py

@@ -19,7 +19,7 @@ from ..common._listattributes import _ListAttributes
 from ..common._utils import _Subscriber
 from ..common._utils import _Subscriber
 from ..data.data_node import DataNode
 from ..data.data_node import DataNode
 from ..job.job import Job
 from ..job.job import Job
-from ..notification.dn_scenario_cache import SubmittableStatusCache
+from ..notification._submittable_status_cache import SubmittableStatusCache
 from ..submission.submission import Submission
 from ..submission.submission import Submission
 from ..task.task import Task
 from ..task.task import Task
 from ._dag import _DAG
 from ._dag import _DAG
@@ -90,11 +90,8 @@ class Submittable:
         """
         """
         if self._submittable_id not in SubmittableStatusCache.submittable_id_datanodes:
         if self._submittable_id not in SubmittableStatusCache.submittable_id_datanodes:
             for dn in self.get_inputs():
             for dn in self.get_inputs():
-                if not dn.is_ready_for_reading:
-                    SubmittableStatusCache.add(self._submittable_id, dn.id)
-        if len(SubmittableStatusCache.submittable_id_datanodes[self._submittable_id]) == 0:
-            return True
-        return False
+                SubmittableStatusCache._compute_if_dn_is_ready_for_reading(dn)
+        return SubmittableStatusCache._check_submittable_is_ready_to_submit(self._submittable_id)
 
 
     def data_nodes_being_edited(self) -> Set[DataNode]:
     def data_nodes_being_edited(self) -> Set[DataNode]:
         """Return the set of data nodes of the submittable entity that are being edited.
         """Return the set of data nodes of the submittable entity that are being edited.

+ 9 - 22
taipy/core/data/data_node.py

@@ -30,33 +30,22 @@ from .._version._version_manager_factory import _VersionManagerFactory
 from ..common._warnings import _warn_deprecated
 from ..common._warnings import _warn_deprecated
 from ..exceptions.exceptions import DataNodeIsBeingEdited, NoData
 from ..exceptions.exceptions import DataNodeIsBeingEdited, NoData
 from ..job.job_id import JobId
 from ..job.job_id import JobId
-from ..notification.dn_scenario_cache import SubmittableStatusCache
+from ..notification._submittable_status_cache import SubmittableStatusCache
 from ..notification.event import Event, EventEntityType, EventOperation, _make_event
 from ..notification.event import Event, EventEntityType, EventOperation, _make_event
 from ._filter import _FilterDataNode
 from ._filter import _FilterDataNode
 from .data_node_id import DataNodeId, Edit
 from .data_node_id import DataNodeId, Edit
 from .operator import JoinOperator
 from .operator import JoinOperator
 
 
 
 
-def _update_parent_submittable_cache_on_edit_in_progress(fct):
+def _recompute_submittable_cache_wrapper(fct):
+    # This decorator must be wrapped before self_setter decorator as self_setter will run the function twice.
+
     @functools.wraps(fct)
     @functools.wraps(fct)
-    def _do_update_parent_submittable_cache_on_edit_in_progress(dn, *args, **kwargs):
+    def _recompute_is_ready_for_reading(dn: "DataNode", *args, **kwargs):
         fct(dn, *args, **kwargs)
         fct(dn, *args, **kwargs)
-        if not dn._edit_in_progress:
-            SubmittableStatusCache.remove(dn.id)
-        else:
-            from ..scenario.scenario import Scenario
-            from ..sequence.sequence import Sequence
-            from ..task.task import Task
-
-            parent_entities = dn.get_parents()
-            for scenario_parent in parent_entities.get(Scenario._MANAGER_NAME, []):
-                SubmittableStatusCache.add(scenario_parent.id, dn.id)
-            for sequence_parent in parent_entities.get(Sequence._MANAGER_NAME, []):
-                SubmittableStatusCache.add(sequence_parent.id, dn.id)
-            for task_parent in parent_entities.get(Task._MANAGER_NAME, []):
-                SubmittableStatusCache.add(task_parent.id, dn.id)
+        SubmittableStatusCache._compute_if_dn_is_ready_for_reading(dn)
 
 
-    return _do_update_parent_submittable_cache_on_edit_in_progress
+    return _recompute_is_ready_for_reading
 
 
 
 
 class DataNode(_Entity, _Labeled):
 class DataNode(_Entity, _Labeled):
@@ -196,12 +185,10 @@ class DataNode(_Entity, _Labeled):
             return self._last_edit_date
             return self._last_edit_date
 
 
     @last_edit_date.setter  # type: ignore
     @last_edit_date.setter  # type: ignore
+    @_recompute_submittable_cache_wrapper
     @_self_setter(_MANAGER_NAME)
     @_self_setter(_MANAGER_NAME)
     def last_edit_date(self, val):
     def last_edit_date(self, val):
-        prev_last_edit_date = self._last_edit_date
         self._last_edit_date = val
         self._last_edit_date = val
-        if prev_last_edit_date is None and self._last_edit_date:
-            SubmittableStatusCache.remove(self.id)
 
 
     @property  # type: ignore
     @property  # type: ignore
     @_self_reload(_MANAGER_NAME)
     @_self_reload(_MANAGER_NAME)
@@ -263,7 +250,7 @@ class DataNode(_Entity, _Labeled):
         return self._edit_in_progress
         return self._edit_in_progress
 
 
     @edit_in_progress.setter  # type: ignore
     @edit_in_progress.setter  # type: ignore
-    @_update_parent_submittable_cache_on_edit_in_progress
+    @_recompute_submittable_cache_wrapper
     @_self_setter(_MANAGER_NAME)
     @_self_setter(_MANAGER_NAME)
     def edit_in_progress(self, val):
     def edit_in_progress(self, val):
         self._edit_in_progress = val
         self._edit_in_progress = val

+ 30 - 2
taipy/core/notification/dn_scenario_cache.py → taipy/core/notification/_submittable_status_cache.py

@@ -21,15 +21,43 @@ class SubmittableStatusCache:
     submittable_id_datanodes: Dict[str, Set["DataNodeId"]] = defaultdict(lambda: set())
     submittable_id_datanodes: Dict[str, Set["DataNodeId"]] = defaultdict(lambda: set())
 
 
     @classmethod
     @classmethod
-    def add(cls, entity_id: str, datanode_id: "DataNodeId"):
+    def __add(cls, entity_id: str, datanode_id: "DataNodeId", reason: str):
         cls.datanode_id_submittables[datanode_id].add(entity_id)
         cls.datanode_id_submittables[datanode_id].add(entity_id)
         cls.submittable_id_datanodes[entity_id].add(datanode_id)  # type: ignore
         cls.submittable_id_datanodes[entity_id].add(datanode_id)  # type: ignore
 
 
     @classmethod
     @classmethod
-    def remove(cls, datanode_id: "DataNode"):
+    def __remove(cls, datanode_id: "DataNode"):
         submittable_ids: Set = cls.datanode_id_submittables.pop(datanode_id, set())
         submittable_ids: Set = cls.datanode_id_submittables.pop(datanode_id, set())
         for submittable_id in submittable_ids:
         for submittable_id in submittable_ids:
             cls.submittable_id_datanodes[submittable_id].remove(datanode_id)
             cls.submittable_id_datanodes[submittable_id].remove(datanode_id)
             if len(cls.submittable_id_datanodes[submittable_id]) == 0:
             if len(cls.submittable_id_datanodes[submittable_id]) == 0:
                 # Notifier.publish(make_event(scenario, submittable, UPDATE))
                 # Notifier.publish(make_event(scenario, submittable, UPDATE))
                 cls.submittable_id_datanodes.pop(submittable_id)
                 cls.submittable_id_datanodes.pop(submittable_id)
+
+    @classmethod
+    def _check_submittable_is_ready_to_submit(cls, entity_id: str):
+        return len(SubmittableStatusCache.submittable_id_datanodes.get(entity_id, [])) == 0
+
+    @classmethod
+    def __add_parent_entities_to_submittable_cache(cls, dn: "DataNode", reason: str):
+        from ..scenario.scenario import Scenario
+        from ..sequence.sequence import Sequence
+        from ..task.task import Task
+
+        parent_entities = dn.get_parents()
+
+        for scenario_parent in parent_entities.get(Scenario._MANAGER_NAME, []):
+            SubmittableStatusCache.__add(scenario_parent.id, dn.id, reason)
+        for sequence_parent in parent_entities.get(Sequence._MANAGER_NAME, []):
+            SubmittableStatusCache.__add(sequence_parent.id, dn.id, reason)
+        for task_parent in parent_entities.get(Task._MANAGER_NAME, []):
+            SubmittableStatusCache.__add(task_parent.id, dn.id, reason)
+
+    @classmethod
+    def _compute_if_dn_is_ready_for_reading(cls, dn: "DataNode"):
+        if dn._edit_in_progress:
+            cls.__add_parent_entities_to_submittable_cache(dn, f"DataNode {dn.id} is being edited.")
+        elif not dn._last_edit_date:
+            cls.__add_parent_entities_to_submittable_cache(dn, f"DataNode {dn.id} is not written.")
+        elif dn.is_ready_for_reading:
+            SubmittableStatusCache.__remove(dn.id)

+ 3 - 4
tests/core/scenario/test_scenario_manager.py

@@ -579,8 +579,8 @@ def test_notification_subscribe(mocker):
             Config.configure_task(
             Config.configure_task(
                 "mult_by_2",
                 "mult_by_2",
                 mult_by_2,
                 mult_by_2,
-                [Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)],
-                Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0),
+                [Config.configure_data_node("foo", "pickle", Scope.SCENARIO, default_data=1)],
+                Config.configure_data_node("bar", "pickle", Scope.SCENARIO, default_data=0),
             )
             )
         ],
         ],
     )
     )
@@ -589,7 +589,7 @@ def test_notification_subscribe(mocker):
 
 
     notify_1 = NotifyMock(scenario)
     notify_1 = NotifyMock(scenario)
     notify_2 = NotifyMock(scenario)
     notify_2 = NotifyMock(scenario)
-    mocker.patch.object(_utils, "_load_fct", side_effect=[notify_1, notify_2])
+    mocker.patch.object(_utils, "_load_fct", side_effect=[notify_1, notify_1, notify_1, notify_2, notify_2, notify_2])
 
 
     # test subscribing notification
     # test subscribing notification
     _ScenarioManager._subscribe(callback=notify_1, scenario=scenario)
     _ScenarioManager._subscribe(callback=notify_1, scenario=scenario)
@@ -600,7 +600,6 @@ def test_notification_subscribe(mocker):
 
 
     # test unsubscribing notification
     # test unsubscribing notification
     # test notis subscribe only on new jobs
     # test notis subscribe only on new jobs
-    # mocker.patch.object(_utils, "_load_fct", side_effect=[notify_1, notify_2])
     _ScenarioManager._unsubscribe(callback=notify_1, scenario=scenario)
     _ScenarioManager._unsubscribe(callback=notify_1, scenario=scenario)
     _ScenarioManager._subscribe(callback=notify_2, scenario=scenario)
     _ScenarioManager._subscribe(callback=notify_2, scenario=scenario)
     _ScenarioManager._submit(scenario)
     _ScenarioManager._submit(scenario)

+ 15 - 15
tests/core/sequence/test_sequence.py

@@ -421,33 +421,33 @@ def test_is_ready_to_run():
     )
     )
     data_node_1 = PickleDataNode("foo", Scope.SCENARIO, "s1", parent_ids={task_1_id}, properties={"default_data": 1})
     data_node_1 = PickleDataNode("foo", Scope.SCENARIO, "s1", parent_ids={task_1_id}, properties={"default_data": 1})
     data_node_2 = PickleDataNode("bar", Scope.SCENARIO, "s2", parent_ids={task_1_id}, properties={"default_data": 2})
     data_node_2 = PickleDataNode("bar", Scope.SCENARIO, "s2", parent_ids={task_1_id}, properties={"default_data": 2})
-    data_node_4 = PickleDataNode(
-        "qux", Scope.SCENARIO, "s4", parent_ids={task_1_id, task_3_id, task_4_id}, properties={"default_data": 4}
+    data_node_3 = PickleDataNode(
+        "qux", Scope.SCENARIO, "s3", parent_ids={task_1_id, task_3_id, task_4_id}, properties={"default_data": 4}
     )
     )
-    data_node_5 = PickleDataNode(
-        "quux", Scope.SCENARIO, "s5", parent_ids={task_2_id, task_3_id}, properties={"default_data": 5}
+    data_node_4 = PickleDataNode(
+        "quux", Scope.SCENARIO, "s4", parent_ids={task_2_id, task_3_id}, properties={"default_data": 5}
     )
     )
-    data_node_6 = PickleDataNode("quuz", Scope.SCENARIO, "s6", parent_ids={task_2_id}, properties={"default_data": 6})
-    data_node_7 = PickleDataNode("corge", Scope.SCENARIO, "s7", parent_ids={task_4_id}, properties={"default_data": 7})
-    task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_4], id=task_1_id)
-    task_2 = Task("garply", {}, print, [data_node_6], [data_node_5], id=task_2_id)
-    task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], id=task_3_id)
-    task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], id=task_4_id)
+    data_node_5 = PickleDataNode("quuz", Scope.SCENARIO, "s5", parent_ids={task_2_id}, properties={"default_data": 6})
+    data_node_6 = PickleDataNode("corge", Scope.SCENARIO, "s6", parent_ids={task_4_id}, properties={"default_data": 7})
+    task_1 = Task("grault", {}, print, [data_node_1, data_node_2], [data_node_3], id=task_1_id)
+    task_2 = Task("garply", {}, print, [data_node_5], [data_node_4], id=task_2_id)
+    task_3 = Task("waldo", {}, print, [data_node_4, data_node_3], id=task_3_id)
+    task_4 = Task("fred", {}, print, [data_node_3], [data_node_6], id=task_4_id)
     scenario = Scenario("scenario_config", [task_1, task_2, task_3, task_4], {}, scenario_id=scenario_id)
     scenario = Scenario("scenario_config", [task_1, task_2, task_3, task_4], {}, scenario_id=scenario_id)
 
 
     data_manager = _DataManagerFactory._build_manager()
     data_manager = _DataManagerFactory._build_manager()
-    for dn in [data_node_1, data_node_2, data_node_4, data_node_5, data_node_6, data_node_7]:
+    for dn in [data_node_1, data_node_2, data_node_3, data_node_4, data_node_5, data_node_6]:
         data_manager._set(dn)
         data_manager._set(dn)
     for task in [task_1, task_2, task_3, task_4]:
     for task in [task_1, task_2, task_3, task_4]:
         _TaskManager._set(task)
         _TaskManager._set(task)
     _ScenarioManager._set(scenario)
     _ScenarioManager._set(scenario)
     scenario.add_sequence("sequence", [task_4, task_2, task_1, task_3])
     scenario.add_sequence("sequence", [task_4, task_2, task_1, task_3])
     sequence = scenario.sequences["sequence"]
     sequence = scenario.sequences["sequence"]
-    # s1 ---      s6 ---> t2 ---> s5
+    # s1 ---      s5 ---> t2 ---> s4
     #       |                     |
     #       |                     |
     #       |---> t1 ---|      -----> t3
     #       |---> t1 ---|      -----> t3
     #       |           |      |
     #       |           |      |
-    # s2 ---             ---> s4 ---> t4 ---> s7
+    # s2 ---             ---> s3 ---> t4 ---> s6
 
 
     assert sequence.is_ready_to_run()
     assert sequence.is_ready_to_run()
 
 
@@ -455,12 +455,12 @@ def test_is_ready_to_run():
     assert not sequence.is_ready_to_run()
     assert not sequence.is_ready_to_run()
 
 
     data_node_2.edit_in_progress = True
     data_node_2.edit_in_progress = True
-    data_node_6.edit_in_progress = True
+    data_node_5.edit_in_progress = True
     assert not sequence.is_ready_to_run()
     assert not sequence.is_ready_to_run()
 
 
     data_node_1.edit_in_progress = False
     data_node_1.edit_in_progress = False
     data_node_2.edit_in_progress = False
     data_node_2.edit_in_progress = False
-    data_node_6.edit_in_progress = False
+    data_node_5.edit_in_progress = False
     assert sequence.is_ready_to_run()
     assert sequence.is_ready_to_run()