Browse Source

Add message with all job statuses when update status fails.

jrobinAV 1 year ago
parent
commit
7a7e4efea6

+ 1 - 1
tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py

@@ -128,6 +128,6 @@ def test_run():
     with mock.patch("taipy.core._orchestrator._dispatcher._job_dispatcher._JobDispatcher._execute_job") as mck:
         dispatcher = _StandaloneJobDispatcher(orchestrator)
         dispatcher.start()
-        assert_true_after_time(lambda: mck.call_count == 4, msg="The 4 jobs were not dequeued.", time=5)
+        assert_true_after_time(lambda: mck.call_count == 4, time=5, msg="The 4 jobs were not dequeued.")
         dispatcher.stop()
         mck.assert_has_calls([call(job_1), call(job_2), call(job_3), call(job_4)])

+ 64 - 69
tests/core/_orchestrator/test_orchestrator.py

@@ -27,7 +27,8 @@ from taipy.core.scenario.scenario import Scenario
 from taipy.core.submission._submission_manager import _SubmissionManager
 from taipy.core.submission.submission_status import SubmissionStatus
 from taipy.core.task.task import Task
-from tests.core.utils import assert_true_after_time
+from tests.core.utils import assert_true_after_time, assert_submission_status
+
 
 # ################################  USER FUNCTIONS  ##################################
 
@@ -70,24 +71,23 @@ def test_submit_task_multithreading_multiple_task():
             assert task_2.output[f"{task_2.config_id}_output0"].read() == 0
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_running)
-            assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
-            assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.RUNNING)
-            assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.RUNNING)
+            assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2
+            assert_submission_status(submission_1, SubmissionStatus.RUNNING)
+            assert_submission_status(submission_2, SubmissionStatus.RUNNING)
 
-        assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42)
-        assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
         assert_true_after_time(job_2.is_completed)
         assert_true_after_time(job_1.is_running)
-        assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
-        assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.RUNNING)
-        assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.COMPLETED)
+        assert task_2.output[f"{task_2.config_id}_output0"].read() == 42
+        assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
+        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1
+        assert_submission_status(submission_1, SubmissionStatus.RUNNING)
+        assert_submission_status(submission_2, SubmissionStatus.COMPLETED)
 
-    assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42)
     assert_true_after_time(job_1.is_completed)
-    assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
-    assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.COMPLETED)
-
     assert job_2.is_completed()
+    assert task_1.output[f"{task_1.config_id}_output0"].read() == 42
+    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
+    assert_submission_status(submission_1, SubmissionStatus.COMPLETED)
     assert submission_2.submission_status == SubmissionStatus.COMPLETED
 
 
@@ -117,45 +117,40 @@ def test_submit_submittable_multithreading_multiple_task():
             assert task_2.output[f"{task_2.config_id}_output0"].read() == 0
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_running)
-            assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
-            assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.RUNNING)
-        assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42)
-        assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
+            assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2  # Two processes used
+            assert_submission_status(submission, SubmissionStatus.RUNNING)
         assert_true_after_time(job_2.is_completed)
         assert_true_after_time(job_1.is_running)
-        assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
-        assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.RUNNING)
+        assert task_2.output[f"{task_2.config_id}_output0"].read() == 42
+        assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
+        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1  # job 1 is completed: One process used
+        assert_submission_status(submission, SubmissionStatus.RUNNING)
 
-    assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42)
     assert_true_after_time(job_1.is_completed)
-    assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
-    assert_true_after_time(job_2.is_completed)
-    assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.COMPLETED)
+    assert job_2.is_completed()
+    assert task_1.output[f"{task_1.config_id}_output0"].read() == 42
+    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0  # No more process used.
+    assert_submission_status(submission, SubmissionStatus.COMPLETED)
 
 
 @pytest.mark.orchestrator_dispatcher
 def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_status():
     Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
-
     m = multiprocessing.Manager()
     lock_0 = m.Lock()
     lock_1 = m.Lock()
     lock_2 = m.Lock()
-
     task_0 = _create_task(partial(lock_multiply, lock_0))
     task_1 = _create_task(partial(lock_multiply, lock_1))
     task_2 = _create_task(partial(lock_multiply, lock_2))
-
     _OrchestratorFactory._build_dispatcher(force_restart=True)
 
     with lock_0:
         submission_0 = _Orchestrator.submit_task(task_0)
         job_0 = submission_0._jobs[0]
         assert_true_after_time(job_0.is_running)
-        assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
-        assert_true_after_time(
-            lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING
-        )
+        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1
+        assert_submission_status(submission_0, SubmissionStatus.RUNNING)
         with lock_1:
             with lock_2:
                 assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
@@ -164,40 +159,39 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
                 job_2 = submission_2._jobs[0]
                 submission_1 = _Orchestrator.submit_task(task_1)
                 job_1 = submission_1._jobs[0]
-                assert_true_after_time(job_0.is_running)
+                assert job_0.is_running()
                 assert_true_after_time(job_1.is_pending)
                 assert_true_after_time(job_2.is_running)
-                assert_true_after_time(lambda: submission_0.submission_status == SubmissionStatus.RUNNING)
-                assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.PENDING)
-                assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.RUNNING)
-                assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
+                assert_submission_status(submission_0, SubmissionStatus.RUNNING)
+                assert_submission_status(submission_1, SubmissionStatus.PENDING)
+                assert_submission_status(submission_2, SubmissionStatus.RUNNING)
+                assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2
 
-            assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42)
-            assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
             assert_true_after_time(job_0.is_running)
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_completed)
-            assert_true_after_time(lambda: submission_0.submission_status == SubmissionStatus.RUNNING)
-            assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.RUNNING)
-            assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.COMPLETED)
-            assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
+            assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2
+            assert task_2.output[f"{task_2.config_id}_output0"].read() == 42
+            assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
+            assert_submission_status(submission_0, SubmissionStatus.RUNNING)
+            assert_submission_status(submission_1, SubmissionStatus.RUNNING)
+            assert_submission_status(submission_2, SubmissionStatus.COMPLETED)
 
-        assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42)
-        assert task_0.output[f"{task_0.config_id}_output0"].read() == 0
         assert_true_after_time(job_0.is_running)
         assert_true_after_time(job_1.is_completed)
-        assert_true_after_time(lambda: submission_0.submission_status == SubmissionStatus.RUNNING)
-        assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.COMPLETED)
-
         assert job_2.is_completed()
+        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1
+        assert task_1.output[f"{task_1.config_id}_output0"].read() == 42
+        assert task_0.output[f"{task_0.config_id}_output0"].read() == 0
+        assert_submission_status(submission_0, SubmissionStatus.RUNNING)
+        assert_submission_status(submission_1, SubmissionStatus.COMPLETED)
         assert submission_2.submission_status == SubmissionStatus.COMPLETED
-        assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
 
-    assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
-    assert task_0.output[f"{task_0.config_id}_output0"].read() == 42
-    assert job_0.is_completed()
+    assert_true_after_time(job_0.is_completed)
     assert job_1.is_completed()
     assert job_2.is_completed()
+    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
+    assert task_0.output[f"{task_0.config_id}_output0"].read() == 42
     assert _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.COMPLETED
     assert _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
     assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
@@ -230,35 +224,35 @@ def test_blocked_task():
 
     assert len(_Orchestrator.blocked_jobs) == 0
     submission_2 = _Orchestrator.submit_task(task_2)
-    job_2 = submission_2._jobs[0]  # job 2 is submitted first
+    job_2 = submission_2._jobs[0]  # job 2 is submitted
     assert job_2.is_blocked()  # since bar is not is_valid the job 2 is blocked
-    assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
+    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0  # No process used
     assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.BLOCKED
-    assert len(_Orchestrator.blocked_jobs) == 1
+    assert len(_Orchestrator.blocked_jobs) == 1  # One job (job 2) is blocked
     with lock_2:
         with lock_1:
             submission_1 = _Orchestrator.submit_task(task_1)
             job_1 = submission_1._jobs[0]  # job 1 is submitted and locked
             assert_true_after_time(job_1.is_running)  # so it is still running
-            assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+            assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1  # One process used for job 1
             assert not _DataManager._get(task_1.bar.id).is_ready_for_reading  # And bar still not ready
-            assert_true_after_time(job_2.is_blocked)  # the job_2 remains blocked
-            assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.RUNNING)
-            assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.BLOCKED)
+            assert job_2.is_blocked  # the job_2 remains blocked
+            assert_submission_status(submission_1, SubmissionStatus.RUNNING)
+            assert_submission_status(submission_2, SubmissionStatus.BLOCKED)
         assert_true_after_time(job_1.is_completed)  # job1 unlocked and can complete
         assert _DataManager._get(task_1.bar.id).is_ready_for_reading  # bar becomes ready
         assert _DataManager._get(task_1.bar.id).read() == 2  # the data is computed and written
         assert_true_after_time(job_2.is_running)  # And job 2 can start running
-        assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1  # One process used for job 2
         assert len(_Orchestrator.blocked_jobs) == 0
-        assert_true_after_time(lambda: submission_1.submission_status == SubmissionStatus.COMPLETED)
-        assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.RUNNING)
+        assert_submission_status(submission_1, SubmissionStatus.COMPLETED)
+        assert_submission_status(submission_2, SubmissionStatus.RUNNING)
     assert_true_after_time(job_2.is_completed)  # job 2 unlocked so it can complete
     assert _DataManager._get(task_2.baz.id).is_ready_for_reading  # baz becomes ready
     assert _DataManager._get(task_2.baz.id).read() == 6  # the data is computed and written
-    assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
+    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0  # No more process used.
     assert submission_1.submission_status == SubmissionStatus.COMPLETED
-    assert_true_after_time(lambda: submission_2.submission_status == SubmissionStatus.COMPLETED)
+    assert_submission_status(submission_2, SubmissionStatus.COMPLETED)
 
 
 @pytest.mark.orchestrator_dispatcher
@@ -294,22 +288,23 @@ def test_blocked_submittable():
             tasks_jobs = {job._task.id: job for job in submission._jobs}
             job_1, job_2 = tasks_jobs[task_1.id], tasks_jobs[task_2.id]
             assert_true_after_time(job_1.is_running)  # job 1 is submitted and locked so it is still running
-            assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+            assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1
             assert not _DataManager._get(task_1.bar.id).is_ready_for_reading  # And bar still not ready
-            assert_true_after_time(job_2.is_blocked)  # the job_2 remains blocked
-            assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.RUNNING)
+            assert job_2.is_blocked  # the job_2 remains blocked
+            assert_submission_status(submission, SubmissionStatus.RUNNING)
         assert_true_after_time(job_1.is_completed)  # job1 unlocked and can complete
         assert _DataManager._get(task_1.bar.id).is_ready_for_reading  # bar becomes ready
         assert _DataManager._get(task_1.bar.id).read() == 2  # the data is computed and written
         assert_true_after_time(job_2.is_running)  # And job 2 can start running
-        assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1 # Still one process
+        # currently used since the previous process is not used anymore
         assert len(_Orchestrator.blocked_jobs) == 0
-        assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.RUNNING)
+        assert_submission_status(submission, SubmissionStatus.RUNNING)
     assert_true_after_time(job_2.is_completed)  # job 2 unlocked so it can complete
     assert _DataManager._get(task_2.baz.id).is_ready_for_reading  # baz becomes ready
     assert _DataManager._get(task_2.baz.id).read() == 6  # the data is computed and written
-    assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
-    assert_true_after_time(lambda: submission.submission_status == SubmissionStatus.COMPLETED)
+    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0  # No more process used.
+    assert_submission_status(submission, SubmissionStatus.COMPLETED)
 
 
 # ################################  UTIL METHODS    ##################################

+ 4 - 6
tests/core/job/test_job_manager.py

@@ -411,12 +411,10 @@ def test_cancel_subsequent_jobs():
     assert_true_after_time(job_4.is_canceled)
     assert_true_after_time(job_5.is_abandoned)
     assert_true_after_time(job_6.is_abandoned)
-    assert_true_after_time(
-        lambda: all(
-            not _OrchestratorFactory._orchestrator._is_blocked(job)
-            for job in [job_1, job_2, job_3, job_4, job_5, job_6]
-        )
-    )
+    assert_true_after_time(lambda: all(
+        not _OrchestratorFactory._orchestrator._is_blocked(job)
+        for job in [job_1, job_2, job_3, job_4, job_5, job_6]
+    ))
     assert_true_after_time(lambda: _OrchestratorFactory._orchestrator.jobs_to_run.qsize() == 0)
 
 

+ 2 - 2
tests/core/test_core_cli.py

@@ -458,7 +458,7 @@ def test_force_override_production_version():
 
 
 @pytest.mark.standalone
-def test_modify_job_configuration_dont_stop_application(caplog, init_config):
+def test_modified_job_configuration_dont_block_application_run(caplog, init_config):
     scenario_config = config_scenario()
 
     with patch("sys.argv", ["prog", "--experiment", "1.0"]):
@@ -485,7 +485,7 @@ def test_modify_job_configuration_dont_stop_application(caplog, init_config):
 
 
 @pytest.mark.standalone
-def test_modify_config_properties_without_force(caplog, init_config):
+def test_modified_config_properties_without_force(caplog, init_config):
     scenario_config = config_scenario()
 
     with patch("sys.argv", ["prog", "--experiment", "1.0"]):

+ 25 - 2
tests/core/utils/__init__.py

@@ -8,12 +8,13 @@
 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
+from taipy import Submission
 
 from datetime import datetime
 from time import sleep
 
 
-def assert_true_after_time(assertion, msg=None, time=120):
+def assert_true_after_time(assertion, time=120, msg=None, **msg_params):
     loops = 0
     start = datetime.now()
     while (datetime.now() - start).seconds < time:
@@ -26,5 +27,27 @@ def assert_true_after_time(assertion, msg=None, time=120):
             loops += 1
             continue
     if msg:
-        print(msg)  # noqa: T201
+        print(msg(**msg_params))  # noqa: T201
     assert assertion()
+
+
+def assert_submission_status(submission: Submission, expected_status, timeout=120):
+    assert_true_after_time(
+        lambda: submission.submission_status == expected_status,
+        time=timeout,
+        msg=submission_status_message,
+        submission=submission,
+        timeout=timeout)
+
+
+def submission_status_message(submission: Submission, expected_status, timeout=120):
+    ms = "\n--------------------------------------------------------------------------------\n"
+    ms += f"Submission status is {submission.submission_status} after {timeout} seconds.\n"
+    ms += f"                Expected status was {expected_status}.\n"
+    ms += "--------------------------------------------------------------------------------\n"
+    ms += "                              --------------                                    \n"
+    ms += "                               Job statuses                                     \n"
+    for job in submission.jobs:
+        ms += f"{job.id}: {job.status}\n"
+    ms += "--------------------------------------------------------------------------------\n"
+    return ms