Explorar o código

#2597 Fix missing parent_ids (#2601)

* #2597 Fix missing parent_ids

* remove unused task_manager._create method
Jean-Robin hai 1 semana
pai
achega
bd9576b5f6

+ 0 - 7
taipy/core/task/_task_manager.py

@@ -9,7 +9,6 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
-import itertools
 from typing import Callable, List, Optional, Type, Union, cast
 
 from taipy.common.config import Config
@@ -52,12 +51,6 @@ class _TaskManager(_Manager[Task], _VersionMixin):
 
         return _OrchestratorFactory._build_orchestrator()
 
-    @classmethod
-    def _create(cls, task: Task) -> None:
-        for dn in itertools.chain(task.input.values(), task.output.values()):
-            _DataManagerFactory._build_manager()._repository._save(dn)
-        cls._repository._save(task)
-
     @classmethod
     def _get_owner_id(
         cls, scope, cycle_id, scenario_id

+ 3 - 1
tests/core/_orchestrator/_dispatcher/test_dispatcher__update_job_status.py

@@ -14,6 +14,7 @@ from taipy import Job, JobId, Scope, Status, Task
 from taipy.core._orchestrator._dispatcher import _JobDispatcher
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.data import InMemoryDataNode
+from taipy.core.data._data_manager_factory import _DataManagerFactory
 from taipy.core.data.data_node_id import EDIT_JOB_ID_KEY, EDIT_TIMESTAMP_KEY
 from taipy.core.job._job_manager_factory import _JobManagerFactory
 from taipy.core.task._task_manager_factory import _TaskManagerFactory
@@ -29,8 +30,9 @@ def _error():
 
 def test_update_job_status_no_exception():
     output = InMemoryDataNode("data_node", scope=Scope.SCENARIO)
+    _DataManagerFactory._build_manager()._repository._save(output)
     task = Task("config_id", {}, nothing, output=[output])
-    _TaskManagerFactory._build_manager()._create(task)
+    _TaskManagerFactory._build_manager()._repository._save(task)
     job = Job(JobId("id"), task, "s_id", task.id)
     _JobManagerFactory._build_manager()._repository._save(job)
 

+ 2 - 0
tests/core/conftest.py

@@ -225,7 +225,9 @@ def data_node_model():
 
 @pytest.fixture(scope="function")
 def task(data_node):
+    _DataManagerFactory._build_manager()._repository._save(data_node)
     dn = InMemoryDataNode("dn_config_id", Scope.SCENARIO, version="random_version_number")
+    _DataManagerFactory._build_manager()._repository._save(dn)
     return Task("task_config_id", {}, print, [data_node], [dn], TaskId("TASK_task_id"))
 
 

+ 5 - 2
tests/core/job/test_job.py

@@ -26,6 +26,7 @@ from taipy.core._orchestrator._dispatcher._development_job_dispatcher import _De
 from taipy.core._orchestrator._dispatcher._standalone_job_dispatcher import _StandaloneJobDispatcher
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.config.job_config import JobConfig
+from taipy.core.data._data_manager_factory import _DataManagerFactory
 from taipy.core.data.in_memory import InMemoryDataNode
 from taipy.core.job._job_manager import _JobManager
 from taipy.core.job._job_manager_factory import _JobManagerFactory
@@ -224,8 +225,9 @@ def test_handle_exception_in_user_function(task_id, job_id):
 
 def test_handle_exception_in_input_data_node(task_id, job_id):
     data_node = InMemoryDataNode("data_node", scope=Scope.SCENARIO)
+    _DataManagerFactory._build_manager()._repository._save(data_node)
     task = Task(config_id="name", properties={}, input=[data_node], function=print, output=[], id=task_id)
-    _TaskManagerFactory._build_manager()._create(task)
+    _TaskManagerFactory._build_manager()._repository._save(task)
     submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
     job = Job(job_id, task, submission.id, "scenario_entity_id")
     _JobManagerFactory._build_manager()._repository._save(job)
@@ -240,8 +242,9 @@ def test_handle_exception_in_input_data_node(task_id, job_id):
 
 def test_handle_exception_in_ouptut_data_node(replace_in_memory_write_fct, task_id, job_id):
     data_node = InMemoryDataNode("data_node", scope=Scope.SCENARIO)
+    _DataManagerFactory._build_manager()._repository._save(data_node)
     task = Task(config_id="name", properties={}, input=[], function=_foo, output=[data_node], id=task_id)
-    _TaskManagerFactory._build_manager()._create(task)
+    _TaskManagerFactory._build_manager()._repository._save(task)
     submission = _SubmissionManagerFactory._build_manager()._create(task.id, task._ID_PREFIX, task.config_id)
     job = Job(job_id, task, submission.id, "scenario_entity_id")
     _JobManagerFactory._build_manager()._repository._save(job)

+ 16 - 11
tests/core/scenario/test_scenario.py

@@ -351,14 +351,15 @@ def test_raise_sequence_tasks_not_in_scenario(data_node):
 
 
 def test_adding_sequence_raises_tasks_not_in_scenario(data_node):
+    _DataManagerFactory._build_manager()._repository._save(data_node)
     task_1 = Task("task_1", {}, print, output=[data_node])
     task_2 = Task("task_2", {}, print, input=[data_node])
     scenario = Scenario("scenario", [task_1], {})
     scenario_manager = _ScenarioManagerFactory._build_manager()
     task_manager = _TaskManagerFactory._build_manager()
     scenario_manager._repository._save(scenario)
-    task_manager._create(task_1)
-    task_manager._create(task_2)
+    task_manager._repository._save(task_1)
+    task_manager._repository._save(task_2)
 
     scenario.add_sequences({"sequence_1": {}})
 
@@ -381,10 +382,11 @@ def test_adding_sequence_raises_tasks_not_in_scenario(data_node):
 
 
 def test_adding_existing_sequence_raises_exception(data_node):
+    _DataManagerFactory._build_manager()._repository._save(data_node)
     task_1 = Task("task_1", {}, print, output=[data_node])
-    _TaskManagerFactory._build_manager()._create(task_1)
+    _TaskManagerFactory._build_manager()._repository._save(task_1)
     task_2 = Task("task_2", {}, print, input=[data_node])
-    _TaskManagerFactory._build_manager()._create(task_2)
+    _TaskManagerFactory._build_manager()._repository._save(task_2)
     scenario = Scenario("scenario", tasks={task_1, task_2}, properties={})
     _ScenarioManagerFactory._build_manager()._repository._save(scenario)
 
@@ -394,10 +396,11 @@ def test_adding_existing_sequence_raises_exception(data_node):
 
 
 def test_renaming_existing_sequence_raises_exception(data_node):
+    _DataManagerFactory._build_manager()._repository._save(data_node)
     task_1 = Task("task_1", {}, print, output=[data_node])
-    _TaskManagerFactory._build_manager()._create(task_1)
+    _TaskManagerFactory._build_manager()._repository._save(task_1)
     task_2 = Task("task_2", {}, print, input=[data_node])
-    _TaskManagerFactory._build_manager()._create(task_2)
+    _TaskManagerFactory._build_manager()._repository._save(task_2)
     scenario = Scenario("scenario", {task_1, task_2}, {})
     _ScenarioManagerFactory._build_manager()._repository._save(scenario)
 
@@ -467,11 +470,12 @@ def test_add_rename_and_remove_sequences():
 
 
 def test_update_sequence(data_node):
+    _DataManagerFactory._build_manager()._repository._save(data_node)
     task_1 = Task("foo", {}, print, [data_node], [], TaskId("t1"))
     task_2 = Task("bar", {}, print, [], [data_node], id=TaskId("t2"))
     scenario = Scenario("baz", {task_1, task_2}, {})
-    _TaskManagerFactory._build_manager()._create(task_1)
-    _TaskManagerFactory._build_manager()._create(task_2)
+    _TaskManagerFactory._build_manager()._repository._save(task_1)
+    _TaskManagerFactory._build_manager()._repository._save(task_2)
     _ScenarioManagerFactory._build_manager()._repository._save(scenario)
     scenario.add_sequence("seq_1", [task_1])
 
@@ -486,10 +490,11 @@ def test_update_sequence(data_node):
 
 
 def test_add_rename_and_remove_sequences_within_context(data_node):
+    _DataManagerFactory._build_manager()._repository._save(data_node)
     task_1 = Task("task_1", {}, print, output=[data_node])
     task_2 = Task("task_2", {}, print, input=[data_node])
-    _TaskManagerFactory._build_manager()._create(task_1)
-    _TaskManagerFactory._build_manager()._create(task_2)
+    _TaskManagerFactory._build_manager()._repository._save(task_1)
+    _TaskManagerFactory._build_manager()._repository._save(task_2)
     scenario = Scenario(config_id="scenario", tasks={task_1, task_2}, properties={})
     _ScenarioManagerFactory._build_manager()._repository._save(scenario)
 
@@ -589,7 +594,7 @@ def test_auto_update_and_reload(cycle, current_datetime, task, data_node):
         SequenceId(f"SEQUENCE_{tmp_sequence_name}_{scenario_1.id}"),
     )
 
-    _TaskManagerFactory._build_manager()._create(task)
+    _TaskManagerFactory._build_manager()._repository._save(task)
     _DataManagerFactory._build_manager()._repository._save(data_node)
     _DataManagerFactory._build_manager()._repository._save(additional_dn)
     scenario_manager = _ScenarioManagerFactory._build_manager()

+ 17 - 6
tests/core/scenario/test_scenario_manager.py

@@ -29,6 +29,7 @@ from taipy.core.common.scope import Scope
 from taipy.core.config.scenario_config import ScenarioConfig
 from taipy.core.cycle._cycle_manager import _CycleManager
 from taipy.core.data._data_manager import _DataManager
+from taipy.core.data._data_manager_factory import _DataManagerFactory
 from taipy.core.data.in_memory import InMemoryDataNode
 from taipy.core.exceptions.exceptions import (
     DeletingPrimaryScenario,
@@ -118,7 +119,9 @@ def test_save_and_get_scenario(cycle):
     assert _ScenarioManager._get(scenario_2) is None
 
     # Save a second scenario. Now, we expect to have a total of two scenarios stored
-    _TaskManager._create(task_2)
+    _DataManagerFactory._build_manager()._repository._save(input_dn_2)
+    _DataManagerFactory._build_manager()._repository._save(output_dn_2)
+    _TaskManager._repository._save(task_2)
     _CycleManager._repository._save(cycle)
     _ScenarioManager._repository._save(scenario_2)
     _DataManager._repository._save(additional_dn_2)
@@ -1129,11 +1132,19 @@ def test_submit():
         # scenario, sequence, and tasks do exist.
         # We expect all the tasks to be submitted once,
         # and respecting specific constraints on the order
-        _TaskManager._create(task_1)
-        _TaskManager._create(task_2)
-        _TaskManager._create(task_3)
-        _TaskManager._create(task_4)
-        _TaskManager._create(task_5)
+        _DataManager._repository._save(data_node_1)
+        _DataManager._repository._save(data_node_2)
+        _DataManager._repository._save(data_node_3)
+        _DataManager._repository._save(data_node_4)
+        _DataManager._repository._save(data_node_5)
+        _DataManager._repository._save(data_node_6)
+        _DataManager._repository._save(data_node_7)
+        _DataManager._repository._save(data_node_8)
+        _TaskManager._repository._save(task_1)
+        _TaskManager._repository._save(task_2)
+        _TaskManager._repository._save(task_3)
+        _TaskManager._repository._save(task_4)
+        _TaskManager._repository._save(task_5)
         _ScenarioManager._submit(scenario.id)
         submit_calls = _TaskManager._orchestrator().submit_calls
         assert len(submit_calls) == 5

+ 2 - 2
tests/core/sequence/test_sequence.py

@@ -585,8 +585,8 @@ def test_auto_update_and_reload(task):
     tmp_task = Task("tmp_task_config_id", {}, print, list(task.output.values()), [], TaskId("tmp_task_id"))
     scenario = Scenario("scenario", [task, tmp_task], {}, sequences={"foo": {}})
 
-    _TaskManager._create(task)
-    _TaskManager._create(tmp_task)
+    _TaskManager._repository._save(task)
+    _TaskManager._repository._save(tmp_task)
     _ScenarioManager._repository._save(scenario)
 
     sequence_1 = scenario.sequences["foo"]

+ 16 - 10
tests/core/sequence/test_sequence_manager.py

@@ -71,8 +71,10 @@ def test_raise_sequence_does_not_belong_to_scenario():
 def __init():
     input_dn = InMemoryDataNode("foo", Scope.SCENARIO)
     output_dn = InMemoryDataNode("foo", Scope.SCENARIO)
+    _DataManager._repository._save(input_dn)
+    _DataManager._repository._save(output_dn)
     task = Task("task", {}, print, [input_dn], [output_dn], TaskId("Task_task_id"))
-    _TaskManager._create(task)
+    _TaskManager._repository._save(task)
     scenario = Scenario("scenario", {task}, {}, set())
     _ScenarioManager._repository._save(scenario)
     return scenario, task
@@ -201,8 +203,9 @@ def test_get_all_on_multiple_versions_environment():
 
 def test_is_submittable():
     dn = InMemoryDataNode("dn", Scope.SCENARIO, properties={"default_data": 10})
+    _DataManager._repository._save(dn)
     task = Task("task", {}, print, [dn])
-    _TaskManager._create(task)
+    _TaskManager._repository._save(task)
     scenario = Scenario("scenario", {task}, {}, set())
     _ScenarioManager._repository._save(scenario)
 
@@ -236,6 +239,13 @@ def test_submit():
     data_node_5 = InMemoryDataNode("quux", Scope.SCENARIO, "s5")
     data_node_6 = InMemoryDataNode("quuz", Scope.SCENARIO, "s6")
     data_node_7 = InMemoryDataNode("corge", Scope.SCENARIO, "s7")
+    _DataManager._repository._save(data_node_1)
+    _DataManager._repository._save(data_node_2)
+    _DataManager._repository._save(data_node_3)
+    _DataManager._repository._save(data_node_4)
+    _DataManager._repository._save(data_node_5)
+    _DataManager._repository._save(data_node_6)
+    _DataManager._repository._save(data_node_7)
     task_1 = Task(
         "grault",
         {},
@@ -247,10 +257,10 @@ def test_submit():
     task_2 = Task("garply", {}, print, [data_node_3], [data_node_5], TaskId("t2"))
     task_3 = Task("waldo", {}, print, [data_node_5, data_node_4], [data_node_6], TaskId("t3"))
     task_4 = Task("fred", {}, print, [data_node_4], [data_node_7], TaskId("t4"))
-    _TaskManager._create(task_1)
-    _TaskManager._create(task_2)
-    _TaskManager._create(task_3)
-    _TaskManager._create(task_4)
+    _TaskManager._repository._save(task_1)
+    _TaskManager._repository._save(task_2)
+    _TaskManager._repository._save(task_3)
+    _TaskManager._repository._save(task_4)
     scenario = Scenario("sce", {task_1, task_2, task_3, task_4}, {})
 
     sequence_name = "sequence"
@@ -281,10 +291,6 @@ def test_submit():
 
         # sequence, and tasks does exist. We expect the tasks to be submitted
         # in a specific order
-        _TaskManager._repository._save(task_1)
-        _TaskManager._repository._save(task_2)
-        _TaskManager._repository._save(task_3)
-        _TaskManager._repository._save(task_4)
         sequence = scenario.sequences[sequence_name]
 
         _SequenceManager._submit(sequence.id)

+ 1 - 1
tests/core/task/test_task.py

@@ -52,7 +52,7 @@ def test_task_equals(task):
     task_manager = _TaskManagerFactory()._build_manager()
 
     task_id = task.id
-    task_manager._create(task)
+    task_manager._repository._save(task)
 
     # To test if instance is same type
     dn = CSVDataNode("foo_bar", Scope.SCENARIO, task_id)

+ 3 - 1
tests/core/task/test_task_manager.py

@@ -329,7 +329,9 @@ def test_is_submittable():
 
 def test_submit_task():
     data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
+    _DataManager._repository._save(data_node_1)
     data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
+    _DataManager._repository._save(data_node_2)
     task_1 = Task(
         "grault",
         {},
@@ -356,7 +358,7 @@ def test_submit_task():
         with pytest.raises(NonExistingTask):
             _TaskManager._submit(task_1.id)
 
-        _TaskManager._create(task_1)
+        _TaskManager._repository._save(task_1)
         _TaskManager._submit(task_1)
         call_ids = [call.id for call in MockOrchestrator.submit_calls]
         assert call_ids == [task_1.id]

+ 4 - 2
tests/rest/conftest.py

@@ -26,6 +26,7 @@ from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.common.frequency import Frequency
 from taipy.core.common.scope import Scope
 from taipy.core.cycle._cycle_manager import _CycleManager
+from taipy.core.data._data_manager_factory import _DataManagerFactory
 from taipy.core.data.pickle import PickleDataNode
 from taipy.core.job._job_manager import _JobManager
 from taipy.core.task._task_manager import _TaskManager
@@ -170,7 +171,7 @@ def __default_task():
         {"TASK_task_id"},
         properties={"default_data": "In memory Data Source"},
     )
-
+    _DataManagerFactory._build_manager()._repository._save(input_ds)
     output_ds = PickleDataNode(
         "output_ds",
         Scope.SCENARIO,
@@ -179,6 +180,7 @@ def __default_task():
         {"TASK_task_id"},
         properties={"default_data": "In memory Data Source"},
     )
+    _DataManagerFactory._build_manager()._repository._save(output_ds)
     return Task(
         config_id="foo",
         properties={},
@@ -292,7 +294,7 @@ def default_cycle():
 def __create_job():
     task_manager = _TaskManager
     task = __default_task()
-    task_manager._create(task)
+    task_manager._repository._save(task)
     submit_id = f"SUBMISSION_{str(uuid.uuid4())}"
     return Job(id=JobId(f"JOB_{uuid.uuid4()}"), task=task, submit_id=submit_id, submit_entity_id=task.id)
 

+ 1 - 1
tests/rest/test_task.py

@@ -78,7 +78,7 @@ def test_get_all_tasks(client, task_data, default_task_config_list):
 
 
 def test_execute_task(client, default_task):
-    _TaskManagerFactory._build_manager()._create(default_task)
+    _TaskManagerFactory._build_manager()._repository._save(default_task)
 
     # test 404
     user_url = url_for("api.task_submit", task_id="foo")