Ver código fonte

Split test and fix it.

jrobinAV 1 ano atrás
pai
commit
0ec9e25010
1 arquivos alterados com 66 adições e 70 exclusões
  1. 66 70
      tests/core/_orchestrator/test_orchestrator.py

+ 66 - 70
tests/core/_orchestrator/test_orchestrator.py

@@ -24,7 +24,7 @@ from src.taipy.core._orchestrator._orchestrator import _Orchestrator
 from src.taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from src.taipy.core.config.job_config import JobConfig
 from src.taipy.core.data._data_manager import _DataManager
-from src.taipy.core.data.in_memory import InMemoryDataNode
+from src.taipy.core.data.pickle import PickleDataNode
 from src.taipy.core.scenario._scenario_manager import _ScenarioManager
 from src.taipy.core.scenario.scenario import Scenario
 from src.taipy.core.sequence.sequence import Sequence
@@ -37,6 +37,7 @@ from taipy.config.common.scope import Scope
 from taipy.config.exceptions.exceptions import ConfigurationUpdateBlocked
 from tests.core.utils import assert_true_after_time
 
+
 # ################################  USER FUNCTIONS  ##################################
 
 
@@ -98,9 +99,8 @@ def test_submit_task():
 
 
 def test_submit_sequence_generate_unique_submit_id():
-
-    dn_1 = InMemoryDataNode("dn_config_id_1", Scope.SCENARIO)
-    dn_2 = InMemoryDataNode("dn_config_id_2", Scope.SCENARIO)
+    dn_1 = PickleDataNode("dn_config_id_1", Scope.SCENARIO)
+    dn_2 = PickleDataNode("dn_config_id_2", Scope.SCENARIO)
     task_1 = Task("task_config_id_1", {}, print, [dn_1])
     task_2 = Task("task_config_id_2", {}, print, [dn_1], [dn_2])
 
@@ -126,9 +126,9 @@ def test_submit_sequence_generate_unique_submit_id():
 
 
 def test_submit_scenario_generate_unique_submit_id():
-    dn_1 = InMemoryDataNode("dn_config_id_1", Scope.SCENARIO)
-    dn_2 = InMemoryDataNode("dn_config_id_2", Scope.SCENARIO)
-    dn_3 = InMemoryDataNode("dn_config_id_3", Scope.SCENARIO)
+    dn_1 = PickleDataNode("dn_config_id_1", Scope.SCENARIO)
+    dn_2 = PickleDataNode("dn_config_id_2", Scope.SCENARIO)
+    dn_3 = PickleDataNode("dn_config_id_3", Scope.SCENARIO)
     task_1 = Task("task_config_id_1", {}, print, [dn_1])
     task_2 = Task("task_config_id_2", {}, print, [dn_2])
     task_3 = Task("task_config_id_3", {}, print, [dn_3])
@@ -149,9 +149,9 @@ def test_submit_scenario_generate_unique_submit_id():
 
 
 def test_submit_entity_store_entity_id_in_job():
-    dn_1 = InMemoryDataNode("dn_config_id_1", Scope.SCENARIO)
-    dn_2 = InMemoryDataNode("dn_config_id_2", Scope.SCENARIO)
-    dn_3 = InMemoryDataNode("dn_config_id_3", Scope.SCENARIO)
+    dn_1 = PickleDataNode("dn_config_id_1", Scope.SCENARIO)
+    dn_2 = PickleDataNode("dn_config_id_2", Scope.SCENARIO)
+    dn_3 = PickleDataNode("dn_config_id_3", Scope.SCENARIO)
     task_1 = Task("task_config_id_1", {}, print, [dn_1])
     task_2 = Task("task_config_id_2", {}, print, [dn_2])
     task_3 = Task("task_config_id_3", {}, print, [dn_3])
@@ -240,13 +240,12 @@ def test_data_node_not_written_due_to_wrong_result_nb():
 
 
 def test_scenario_only_submit_same_task_once():
-
     Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
     _OrchestratorFactory._build_dispatcher()
 
-    dn_0 = InMemoryDataNode("dn_config_0", Scope.SCENARIO, properties={"default_data": 0})
-    dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
-    dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
+    dn_0 = PickleDataNode("dn_config_0", Scope.SCENARIO, properties={"default_data": 0})
+    dn_1 = PickleDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
+    dn_2 = PickleDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
     task_1 = Task("task_config_1", {}, print, input=[dn_0], output=[dn_1], id="task_1")
     task_2 = Task("task_config_2", {}, print, input=[dn_1], id="task_2")
     task_3 = Task("task_config_3", {}, print, input=[dn_1], output=[dn_2], id="task_3")
@@ -283,9 +282,9 @@ def test_update_status_fail_job():
     Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
     _OrchestratorFactory._build_dispatcher()
 
-    dn_0 = InMemoryDataNode("dn_config_0", Scope.SCENARIO, properties={"default_data": 0})
-    dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
-    dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
+    dn_0 = PickleDataNode("dn_config_0", Scope.SCENARIO, properties={"default_data": 0})
+    dn_1 = PickleDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
+    dn_2 = PickleDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
     task_0 = Task("task_config_0", {}, _error, output=[dn_0], id="task_0")
     task_1 = Task("task_config_1", {}, print, input=[dn_0], output=[dn_1], id="task_1")
     task_2 = Task("task_config_2", {}, print, input=[dn_1], id="task_2")
@@ -324,33 +323,32 @@ def test_update_status_fail_job():
     assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
 
 
-def test_update_status_fail_job_in_parallel():
+def test_update_status_fail_job_in_parallel_one_job():
     Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
     _OrchestratorFactory._build_dispatcher()
 
-    dn_0 = InMemoryDataNode("dn_config_0", Scope.SCENARIO, properties={"default_data": 0})
-    dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
-    dn_2 = InMemoryDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
+    dn = PickleDataNode("dn_config_0", Scope.SCENARIO, properties={"default_data": 0})
+    task = Task("task_config_0", {}, _error, output=[dn], id="task_0")
+    _DataManager._set(dn)
+    _TaskManager._set(task)
+    job = _Orchestrator.submit_task(task)
+    assert_true_after_time(job.is_failed)
+    assert_true_after_time(lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED)
+
+
+def test_update_status_fail_job_in_parallel_one_sequence():
+    Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
+    _OrchestratorFactory._build_dispatcher()
+
+    dn_0 = PickleDataNode("dn_config_0", Scope.SCENARIO, properties={"default_data": 0})
+    dn_1 = PickleDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
+    dn_2 = PickleDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
     task_0 = Task("task_config_0", {}, _error, output=[dn_0], id="task_0")
     task_1 = Task("task_config_1", {}, print, input=[dn_0], output=[dn_1], id="task_1")
     task_2 = Task("task_config_2", {}, print, input=[dn_1], id="task_2")
     task_3 = Task("task_config_3", {}, print, input=[dn_2], id="task_3")
-    scenario_1 = Scenario(
-        "scenario_config_1",
-        set([task_0, task_1, task_2, task_3]),
-        {},
-        set(),
-        "scenario_1",
-        sequences={"sequence_1": {"tasks": [task_0, task_1, task_2]}},
-    )
-    scenario_2 = Scenario(
-        "scenario_config_2",
-        set([task_0, task_1, task_2, task_3]),
-        {},
-        set(),
-        "scenario_2",
-    )
-
+    sc = Scenario("scenario_config_1", set([task_0, task_1, task_2, task_3]), {}, set(), "scenario_1",
+                  sequences={"sequence_1": {"tasks": [task_0, task_1, task_2]}})
     _DataManager._set(dn_0)
     _DataManager._set(dn_1)
     _DataManager._set(dn_2)
@@ -358,52 +356,51 @@ def test_update_status_fail_job_in_parallel():
     _TaskManager._set(task_1)
     _TaskManager._set(task_2)
     _TaskManager._set(task_3)
-    _ScenarioManager._set(scenario_1)
-    _ScenarioManager._set(scenario_2)
+    _ScenarioManager._set(sc)
 
-    sequence_1 = scenario_1.sequences["sequence_1"]
-
-    job = _Orchestrator.submit_task(task_0)
-    assert_true_after_time(job.is_failed)
-    assert_true_after_time(lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED)
+    jobs = _Orchestrator.submit(sc.sequences["sequence_1"])
 
-    jobs = _Orchestrator.submit(sequence_1)
     tasks_jobs = {job._task.id: job for job in jobs}
     assert_true_after_time(tasks_jobs["task_0"].is_failed)
     assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]))
     assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs))
-    assert_true_after_time(
-        lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
-    )
+    submit_id = jobs[0].submit_id
+    submission = _SubmissionManager._get(submit_id)
+    assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.FAILED)
 
-    jobs = _Orchestrator.submit(scenario_1.sequences["sequence_1"])
-    tasks_jobs = {job._task.id: job for job in jobs}
-    assert_true_after_time(tasks_jobs["task_0"].is_failed)
-    assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]))
-    assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs))
-    assert_true_after_time(
-        lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
-    )
 
-    jobs = _Orchestrator.submit(scenario_1)
-    tasks_jobs = {job._task.id: job for job in jobs}
-    assert_true_after_time(tasks_jobs["task_0"].is_failed)
-    assert_true_after_time(tasks_jobs["task_3"].is_completed)
-    assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]))
-    assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs))
-    assert_true_after_time(
-        lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
-    )
+def test_update_status_fail_job_in_parallel_one_scenario():
+    Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
+    _OrchestratorFactory._build_dispatcher()
+
+    dn_0 = PickleDataNode("dn_config_0", Scope.SCENARIO, properties={"default_data": 0})
+    dn_1 = PickleDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
+    dn_2 = PickleDataNode("dn_config_2", Scope.SCENARIO, properties={"default_data": 2})
+    task_0 = Task("task_config_0", {}, _error, output=[dn_0], id="task_0")
+    task_1 = Task("task_config_1", {}, print, input=[dn_0], output=[dn_1], id="task_1")
+    task_2 = Task("task_config_2", {}, print, input=[dn_1], id="task_2")
+    task_3 = Task("task_config_3", {}, print, input=[dn_2], id="task_3")
+    scenario = Scenario("scenario_config_1", set([task_0, task_1, task_2, task_3]), {}, set(), "scenario_1")
+
+    _DataManager._set(dn_0)
+    _DataManager._set(dn_1)
+    _DataManager._set(dn_2)
+    _TaskManager._set(task_0)
+    _TaskManager._set(task_1)
+    _TaskManager._set(task_2)
+    _TaskManager._set(task_3)
+    _ScenarioManager._set(scenario)
+
+    jobs = _Orchestrator.submit(scenario)
 
-    jobs = _Orchestrator.submit(scenario_2)
     tasks_jobs = {job._task.id: job for job in jobs}
     assert_true_after_time(tasks_jobs["task_0"].is_failed)
     assert_true_after_time(tasks_jobs["task_3"].is_completed)
     assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]))
     assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs))
-    assert_true_after_time(
-        lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
-    )
+    submit_id = jobs[0].submit_id
+    submission = _SubmissionManager._get(submit_id)
+    assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.FAILED)
 
 
 def test_submit_task_in_parallel():
@@ -434,7 +431,6 @@ def test_submit_task_in_parallel():
 
 
 def test_submit_sequence_in_parallel():
-
     m = multiprocessing.Manager()
     lock = m.Lock()