ソースを参照

added new algo for updating submission status

Toan Quach 1 年間 前
コミット
e86284e15c

+ 53 - 45
src/taipy/core/submission/submission.py

@@ -9,7 +9,9 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
+import threading
 import uuid
+from collections.abc import MutableSet
 from datetime import datetime
 from typing import Any, List, Optional, Union
 
@@ -18,7 +20,7 @@ from .._entity._labeled import _Labeled
 from .._entity._reload import _self_reload, _self_setter
 from .._version._version_manager_factory import _VersionManagerFactory
 from ..job._job_manager_factory import _JobManagerFactory
-from ..job.job import Job, JobId
+from ..job.job import Job, JobId, Status
 from ..notification.event import Event, EventEntityType, EventOperation, _make_event
 from .submission_id import SubmissionId
 from .submission_status import SubmissionStatus
@@ -40,6 +42,7 @@ class Submission(_Entity, _Labeled):
     _ID_PREFIX = "SUBMISSION"
     _MANAGER_NAME = "submission"
     __SEPARATOR = "_"
+    lock = threading.Lock()
 
     def __init__(
         self,
@@ -59,6 +62,14 @@ class Submission(_Entity, _Labeled):
         self._submission_status = submission_status or SubmissionStatus.SUBMITTED
         self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
 
+        self.__abandoned = False
+        self.__completed = False
+
+        self.__is_canceled = False
+        self.__running_jobs: MutableSet[str] = set()
+        self.__blocked_jobs: MutableSet[str] = set()
+        self.__pending_jobs: MutableSet[str] = set()
+
     @staticmethod
     def __new_id() -> str:
         """Generate a unique Submission identifier."""
@@ -136,56 +147,53 @@ class Submission(_Entity, _Labeled):
     def __ge__(self, other):
         return self.creation_date.timestamp() >= other.creation_date.timestamp()
 
-    def _update_submission_status(self, _: Job):
-        abandoned = False
-        canceled = False
-        blocked = False
-        pending = False
-        running = False
-        completed = False
-
-        for job in self.jobs:
-            if not job:
-                continue
-            if job.is_failed():
-                self.submission_status = SubmissionStatus.FAILED  # type: ignore
-                return
-            if job.is_canceled():
-                canceled = True
-                continue
-            if job.is_blocked():
-                blocked = True
-                continue
-            if job.is_pending() or job.is_submitted():
-                pending = True
-                continue
-            if job.is_running():
-                running = True
-                continue
-            if job.is_completed() or job.is_skipped():
-                completed = True
-                continue
-            if job.is_abandoned():
-                abandoned = True
-        if canceled:
-            self.submission_status = SubmissionStatus.CANCELED  # type: ignore
+    def _update_submission_status(self, job: Job):
+        if self._submission_status == SubmissionStatus.FAILED:
             return
-        if abandoned:
-            self.submission_status = SubmissionStatus.UNDEFINED  # type: ignore
+
+        job_status = job.status
+
+        if job_status == Status.FAILED:
+            self.submission_status = SubmissionStatus.FAILED  # type: ignore
             return
-        if running:
+
+        with self.lock:
+            if job_status == Status.CANCELED:
+                self.__is_canceled = True
+            elif job_status == Status.BLOCKED:
+                self.__blocked_jobs.add(job.id)
+                self.__pending_jobs.discard(job.id)
+            elif job_status == Status.PENDING or job_status == Status.SUBMITTED:
+                self.__pending_jobs.add(job.id)
+                self.__blocked_jobs.discard(job.id)
+            elif job_status == Status.RUNNING:
+                self.__running_jobs.add(job.id)
+                self.__pending_jobs.discard(job.id)
+            elif job_status == Status.COMPLETED or job_status == Status.SKIPPED:
+                self.__completed = True
+                self.__blocked_jobs.discard(job.id)
+                self.__pending_jobs.discard(job.id)
+                self.__running_jobs.discard(job.id)
+            elif job_status == Status.ABANDONED:
+                self.__abandoned = True
+                self.__running_jobs.discard(job.id)
+                self.__blocked_jobs.discard(job.id)
+                self.__pending_jobs.discard(job.id)
+
+        if self.__is_canceled:
+            self.submission_status = SubmissionStatus.CANCELED  # type: ignore
+        elif self.__abandoned:
+            self.submission_status = SubmissionStatus.UNDEFINED  # type: ignore
+        elif self.__running_jobs:
             self.submission_status = SubmissionStatus.RUNNING  # type: ignore
-            return
-        if pending:
+        elif self.__pending_jobs:
             self.submission_status = SubmissionStatus.PENDING  # type: ignore
-            return
-        if blocked:
+        elif self.__blocked_jobs:
             self.submission_status = SubmissionStatus.BLOCKED  # type: ignore
-            return
-        if completed:
+        elif self.__completed:
             self.submission_status = SubmissionStatus.COMPLETED  # type: ignore
-            return
-        self.submission_status = SubmissionStatus.UNDEFINED  # type: ignore
+        else:
+            self.submission_status = SubmissionStatus.UNDEFINED  # type: ignore
 
 
 @_make_event.register(Submission)

+ 170 - 5
tests/core/_orchestrator/test_orchestrator.py

@@ -27,8 +27,9 @@ from src.taipy.core.data._data_manager import _DataManager
 from src.taipy.core.data.in_memory import InMemoryDataNode
 from src.taipy.core.scenario._scenario_manager import _ScenarioManager
 from src.taipy.core.scenario.scenario import Scenario
-from src.taipy.core.sequence._sequence_manager import _SequenceManager
 from src.taipy.core.sequence.sequence import Sequence
+from src.taipy.core.submission._submission_manager import _SubmissionManager
+from src.taipy.core.submission.submission_status import SubmissionStatus
 from src.taipy.core.task._task_manager import _TaskManager
 from src.taipy.core.task.task import Task
 from taipy.config import Config
@@ -93,6 +94,7 @@ def test_submit_task():
     assert _DataManager._get(output_dn_id).job_ids == [job.id]
     assert _DataManager._get(output_dn_id).is_ready_for_reading
     assert job.is_completed()
+    assert _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED
 
 
 def test_submit_sequence_generate_unique_submit_id():
@@ -234,6 +236,7 @@ def test_data_node_not_written_due_to_wrong_result_nb():
     assert task.output[f"{task.config_id}_output0"].read() == 0
     assert job.is_failed()
     assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
+    assert _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED
 
 
 def test_scenario_only_submit_same_task_once():
@@ -261,16 +264,19 @@ def test_scenario_only_submit_same_task_once():
     assert len(jobs) == 3
     assert all([job.is_completed() for job in jobs])
     assert all(not _Orchestrator._is_blocked(job) for job in jobs)
+    assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED
 
     jobs = _Orchestrator.submit(sequence_1)
     assert len(jobs) == 2
     assert all([job.is_completed() for job in jobs])
     assert all(not _Orchestrator._is_blocked(job) for job in jobs)
+    assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED
 
     jobs = _Orchestrator.submit(sequence_2)
     assert len(jobs) == 2
     assert all([job.is_completed() for job in jobs])
     assert all(not _Orchestrator._is_blocked(job) for job in jobs)
+    assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED
 
 
 def test_update_status_fail_job():
@@ -299,6 +305,7 @@ def test_update_status_fail_job():
 
     job = _Orchestrator.submit_task(task_0)
     assert job.is_failed()
+    assert _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED
 
     jobs = _Orchestrator.submit(scenario_1)
     tasks_jobs = {job._task.id: job for job in jobs}
@@ -306,6 +313,7 @@ def test_update_status_fail_job():
     assert all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]])
     assert tasks_jobs["task_3"].is_completed()
     assert all(not _Orchestrator._is_blocked(job) for job in jobs)
+    assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
 
     jobs = _Orchestrator.submit(scenario_2)
     tasks_jobs = {job._task.id: job for job in jobs}
@@ -313,6 +321,7 @@ def test_update_status_fail_job():
     assert all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]])
     assert tasks_jobs["task_3"].is_completed()
     assert all(not _Orchestrator._is_blocked(job) for job in jobs)
+    assert _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
 
 
 def test_update_status_fail_job_in_parallel():
@@ -356,18 +365,25 @@ def test_update_status_fail_job_in_parallel():
 
     job = _Orchestrator.submit_task(task_0)
     assert_true_after_time(job.is_failed)
+    assert_true_after_time(lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED)
 
     jobs = _Orchestrator.submit(sequence_1)
     tasks_jobs = {job._task.id: job for job in jobs}
     assert_true_after_time(tasks_jobs["task_0"].is_failed)
     assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]))
     assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs))
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
+    )
 
     jobs = _Orchestrator.submit(scenario_1.sequences["sequence_1"])
     tasks_jobs = {job._task.id: job for job in jobs}
     assert_true_after_time(tasks_jobs["task_0"].is_failed)
     assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]))
     assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs))
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
+    )
 
     jobs = _Orchestrator.submit(scenario_1)
     tasks_jobs = {job._task.id: job for job in jobs}
@@ -375,6 +391,9 @@ def test_update_status_fail_job_in_parallel():
     assert_true_after_time(tasks_jobs["task_3"].is_completed)
     assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]))
     assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs))
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
+    )
 
     jobs = _Orchestrator.submit(scenario_2)
     tasks_jobs = {job._task.id: job for job in jobs}
@@ -382,6 +401,9 @@ def test_update_status_fail_job_in_parallel():
     assert_true_after_time(tasks_jobs["task_3"].is_completed)
     assert_true_after_time(lambda: all([job.is_abandoned() for job in [tasks_jobs["task_1"], tasks_jobs["task_2"]]]))
     assert_true_after_time(lambda: all(not _Orchestrator._is_blocked(job) for job in jobs))
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
+    )
 
 
 def test_submit_task_in_parallel():
@@ -399,9 +421,15 @@ def test_submit_task_in_parallel():
         job = _Orchestrator.submit_task(task)
         assert_true_after_time(job.is_running)
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
 
     assert_true_after_time(lambda: task.output[f"{task.config_id}_output0"].read() == 42)
     assert_true_after_time(job.is_completed)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
     assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
 
 
@@ -422,9 +450,15 @@ def test_submit_sequence_in_parallel():
         job = _Orchestrator.submit(sequence)[0]
         assert_true_after_time(job.is_running)
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
 
     assert_true_after_time(lambda: task.output[f"{task.config_id}_output0"].read() == 42)
     assert_true_after_time(job.is_completed)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
     assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
 
 
@@ -444,9 +478,15 @@ def test_submit_scenario_in_parallel():
         job = _Orchestrator.submit(scenario)[0]
         assert_true_after_time(job.is_running)
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
 
     assert_true_after_time(lambda: task.output[f"{task.config_id}_output0"].read() == 42)
     assert_true_after_time(job.is_completed)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
     assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
 
 
@@ -469,6 +509,9 @@ def test_submit_task_synchronously_in_parallel():
     job = _Orchestrator.submit_task(task, wait=True)
     assert (datetime.now() - start_time).seconds >= sleep_period
     assert_true_after_time(job.is_completed)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
 
 
 def test_submit_sequence_synchronously_in_parallel():
@@ -483,6 +526,9 @@ def test_submit_sequence_synchronously_in_parallel():
     job = _Orchestrator.submit(sequence, wait=True)[0]
     assert (datetime.now() - start_time).seconds >= sleep_period
     assert_true_after_time(job.is_completed)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
 
 
 def test_submit_scenario_synchronously_in_parallel():
@@ -497,6 +543,9 @@ def test_submit_scenario_synchronously_in_parallel():
     job = _Orchestrator.submit(scenario, wait=True)[0]
     assert (datetime.now() - start_time).seconds >= sleep_period
     assert_true_after_time(job.is_completed)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
 
 
 def test_submit_fail_task_synchronously_in_parallel():
@@ -509,6 +558,7 @@ def test_submit_fail_task_synchronously_in_parallel():
     job = _Orchestrator.submit_task(task, wait=True)
     assert (datetime.now() - start_time).seconds >= sleep_period
     assert_true_after_time(job.is_failed)
+    assert_true_after_time(lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED)
 
 
 def test_submit_fail_sequence_synchronously_in_parallel():
@@ -523,6 +573,7 @@ def test_submit_fail_sequence_synchronously_in_parallel():
     job = _Orchestrator.submit(sequence, wait=True)[0]
     assert (datetime.now() - start_time).seconds >= sleep_period
     assert_true_after_time(job.is_failed)
+    assert_true_after_time(lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED)
 
 
 def test_submit_fail_scenario_synchronously_in_parallel():
@@ -537,6 +588,7 @@ def test_submit_fail_scenario_synchronously_in_parallel():
     job = _Orchestrator.submit(scenario, wait=True)[0]
     assert (datetime.now() - start_time).seconds >= sleep_period
     assert_true_after_time(job.is_failed)
+    assert_true_after_time(lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.FAILED)
 
 
 def test_submit_task_synchronously_in_parallel_with_timeout():
@@ -553,6 +605,9 @@ def test_submit_task_synchronously_in_parallel_with_timeout():
 
     assert timeout_duration <= (end_time - start_time).seconds
     assert_true_after_time(job.is_completed)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
 
 
 def test_submit_task_multithreading_multiple_task():
@@ -577,17 +632,34 @@ def test_submit_task_multithreading_multiple_task():
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_running)
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+            )
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.RUNNING
+            )
 
         assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42)
         assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
         assert_true_after_time(job_2.is_completed)
         assert_true_after_time(job_1.is_running)
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
+        )
 
     assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42)
     assert_true_after_time(job_1.is_completed)
-    assert_true_after_time(job_2.is_completed)
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
+
+    assert job_2.is_completed()
+    assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
 
 
 def test_submit_sequence_multithreading_multiple_task():
@@ -615,17 +687,28 @@ def test_submit_sequence_multithreading_multiple_task():
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_running)
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+            )
 
         assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42)
         assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
         assert_true_after_time(job_2.is_completed)
         assert_true_after_time(job_1.is_running)
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
 
     assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42)
     assert_true_after_time(job_1.is_completed)
-    assert_true_after_time(job_2.is_completed)
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
+
+    assert job_2.is_completed()
+    assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
 
 
 def test_submit_scenario_multithreading_multiple_task():
@@ -653,20 +736,29 @@ def test_submit_scenario_multithreading_multiple_task():
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_running)
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
-
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+            )
         assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42)
         assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
         assert_true_after_time(job_2.is_completed)
         assert_true_after_time(job_1.is_running)
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
 
     assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42)
     assert_true_after_time(job_1.is_completed)
-    assert_true_after_time(job_2.is_completed)
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
+    assert_true_after_time(job_2.is_completed)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
 
 
 def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_status():
+    # TODO
     Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
 
     m = multiprocessing.Manager()
@@ -684,6 +776,9 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
         job_0 = _Orchestrator.submit_task(task_0)
         assert_true_after_time(job_0.is_running)
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
         with lock_1:
             with lock_2:
                 assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
@@ -693,6 +788,15 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
                 assert_true_after_time(job_0.is_running)
                 assert_true_after_time(job_1.is_pending)
                 assert_true_after_time(job_2.is_running)
+                assert_true_after_time(
+                    lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING
+                )
+                assert_true_after_time(
+                    lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.PENDING
+                )
+                assert_true_after_time(
+                    lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.RUNNING
+                )
                 assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
 
             assert_true_after_time(lambda: task_2.output[f"{task_2.config_id}_output0"].read() == 42)
@@ -700,13 +804,30 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
             assert_true_after_time(job_0.is_running)
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_completed)
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING
+            )
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+            )
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
+            )
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2)
 
         assert_true_after_time(lambda: task_1.output[f"{task_1.config_id}_output0"].read() == 42)
         assert task_0.output[f"{task_0.config_id}_output0"].read() == 0
         assert_true_after_time(job_0.is_running)
         assert_true_after_time(job_1.is_completed)
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
+        )
+
         assert job_2.is_completed()
+        assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
 
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
@@ -714,6 +835,9 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
     assert job_0.is_completed()
     assert job_1.is_completed()
     assert job_2.is_completed()
+    assert _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.COMPLETED
+    assert _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
+    assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
 
 
 def test_blocked_task():
@@ -744,6 +868,7 @@ def test_blocked_task():
     job_2 = _Orchestrator.submit_task(task_2)  # job 2 is submitted first
     assert job_2.is_blocked()  # since bar is not is_valid the job 2 is blocked
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
+    assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.BLOCKED
     assert len(_Orchestrator.blocked_jobs) == 1
     with lock_2:
         with lock_1:
@@ -754,16 +879,32 @@ def test_blocked_task():
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
             assert not _DataManager._get(task_1.bar.id).is_ready_for_reading  # And bar still not ready
             assert_true_after_time(job_2.is_blocked)  # the job_2 remains blocked
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+            )
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.BLOCKED
+            )
         assert_true_after_time(job_1.is_completed)  # job1 unlocked and can complete
         assert _DataManager._get(task_1.bar.id).is_ready_for_reading  # bar becomes ready
         assert _DataManager._get(task_1.bar.id).read() == 2  # the data is computed and written
         assert_true_after_time(job_2.is_running)  # And job 2 can start running
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
         assert len(_Orchestrator.blocked_jobs) == 0
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
+        )
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
     assert_true_after_time(job_2.is_completed)  # job 2 unlocked so it can complete
     assert _DataManager._get(task_2.baz.id).is_ready_for_reading  # baz becomes ready
     assert _DataManager._get(task_2.baz.id).read() == 6  # the data is computed and written
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
+    assert _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
 
 
 def test_blocked_sequence():
@@ -801,16 +942,25 @@ def test_blocked_sequence():
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
             assert not _DataManager._get(task_1.bar.id).is_ready_for_reading  # And bar still not ready
             assert_true_after_time(job_2.is_blocked)  # the job_2 remains blocked
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+            )
         assert_true_after_time(job_1.is_completed)  # job1 unlocked and can complete
         assert _DataManager._get(task_1.bar.id).is_ready_for_reading  # bar becomes ready
         assert _DataManager._get(task_1.bar.id).read() == 2  # the data is computed and written
         assert_true_after_time(job_2.is_running)  # And job 2 can start running
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
         assert len(_Orchestrator.blocked_jobs) == 0
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
     assert_true_after_time(job_2.is_completed)  # job 2 unlocked so it can complete
     assert _DataManager._get(task_2.baz.id).is_ready_for_reading  # baz becomes ready
     assert _DataManager._get(task_2.baz.id).read() == 6  # the data is computed and written
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
 
 
 def test_blocked_scenario():
@@ -848,16 +998,25 @@ def test_blocked_scenario():
             assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
             assert not _DataManager._get(task_1.bar.id).is_ready_for_reading  # And bar still not ready
             assert_true_after_time(job_2.is_blocked)  # the job_2 remains blocked
+            assert_true_after_time(
+                lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+            )
         assert_true_after_time(job_1.is_completed)  # job1 unlocked and can complete
         assert _DataManager._get(task_1.bar.id).is_ready_for_reading  # bar becomes ready
         assert _DataManager._get(task_1.bar.id).read() == 2  # the data is computed and written
         assert_true_after_time(job_2.is_running)  # And job 2 can start running
         assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1)
         assert len(_Orchestrator.blocked_jobs) == 0
+        assert_true_after_time(
+            lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.RUNNING
+        )
     assert_true_after_time(job_2.is_completed)  # job 2 unlocked so it can complete
     assert _DataManager._get(task_2.baz.id).is_ready_for_reading  # baz becomes ready
     assert _DataManager._get(task_2.baz.id).read() == 6  # the data is computed and written
     assert_true_after_time(lambda: len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
 
 
 def test_task_orchestrator_create_synchronous_dispatcher():
@@ -900,6 +1059,9 @@ def test_can_exec_task_with_modified_config():
     assert_true_after_time(
         jobs[0].is_completed
     )  # If the job is completed, that means the asserts in the task are successful
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.COMPLETED
+    )
 
 
 def update_config_task(n):
@@ -940,6 +1102,9 @@ def test_cannot_exec_task_that_update_config():
 
     # The job should fail due to an exception is raised
     assert_true_after_time(jobs[0].is_failed)
+    assert_true_after_time(
+        lambda: _SubmissionManager._get(jobs[0].submit_id).submission_status == SubmissionStatus.FAILED
+    )
 
 
 def test_can_execute_task_with_development_mode():

+ 0 - 1
tests/core/data/test_parquet_data_node.py

@@ -500,7 +500,6 @@ class TestParquetDataNode:
         dn.write(df)
 
         assert set(pd.read_parquet(temp_file_path).columns) == {"id", "integer", "text"}
-        print(dn.read())
         assert set(dn.read().columns) == set(read_kwargs["columns"])
 
         # !!! filter doesn't work with `fastparquet` without partition_cols

+ 0 - 2
tests/core/notification/test_notifier.py

@@ -108,8 +108,6 @@ def test_register():
 
     Notifier.unregister(registration_id_1)
     assert len(Notifier._topics_registrations_list.keys()) == 2
-
-    print(Notifier._topics_registrations_list.keys())
     assert all(topic not in Notifier._topics_registrations_list.keys() for topic in [topic_0, topic_1])
 
     Notifier.unregister(registration_id_2)

+ 424 - 14
tests/core/submission/test_submission.py

@@ -89,7 +89,7 @@ class MockJob:
         return self.status == Status.SUBMITTED
 
 
-def mock_get_jobs(job_ids):
+def __test_update_submission_status(job_ids, expected_submission_status):
     jobs = {
         "job0_submitted": MockJob("job0_submitted", Status.SUBMITTED),
         "job1_failed": MockJob("job1_failed", Status.FAILED),
@@ -101,20 +101,13 @@ def mock_get_jobs(job_ids):
         "job7_skipped": MockJob("job7_skipped", Status.SKIPPED),
         "job8_abandoned": MockJob("job8_abandoned", Status.ABANDONED),
     }
-    return [jobs[job_id] for job_id in job_ids]
-
 
-def __test_update_submission_status(job_ids, expected_submission_status):
-    with (
-        patch(
-            "src.taipy.core.submission.submission.Submission.jobs",
-            new_callable=mock.PropertyMock,
-            return_value=(mock_get_jobs(job_ids)),
-        )
-    ):
-        submission = Submission("submission_id", "ENTITY_TYPE")
-        submission._update_submission_status(None)
-        assert submission.submission_status == expected_submission_status
+    submission = Submission("submission_id", "ENTITY_TYPE")
+    submission.jobs = [jobs[job_id] for job_id in job_ids]
+    for job_id in job_ids:
+        job = jobs[job_id]
+        submission._update_submission_status(job)
+    assert submission.submission_status == expected_submission_status
 
 
 @pytest.mark.parametrize(
@@ -325,3 +318,420 @@ def test_auto_set_and_reload():
     assert submission_1.submission_status == SubmissionStatus.PENDING
     assert submission_2.jobs == [job_1]
     assert submission_2.submission_status == SubmissionStatus.PENDING
+
+
+@pytest.mark.parametrize(
+    "job_statuses, expected_submission_statuses",
+    [
+        (
+            [Status.SUBMITTED, Status.PENDING, Status.RUNNING, Status.COMPLETED],
+            [SubmissionStatus.PENDING, SubmissionStatus.PENDING, SubmissionStatus.RUNNING, SubmissionStatus.COMPLETED],
+        ),
+        (
+            [Status.SUBMITTED, Status.PENDING, Status.RUNNING, Status.SKIPPED],
+            [SubmissionStatus.PENDING, SubmissionStatus.PENDING, SubmissionStatus.RUNNING, SubmissionStatus.COMPLETED],
+        ),
+        (
+            [Status.SUBMITTED, Status.PENDING, Status.RUNNING, Status.FAILED],
+            [SubmissionStatus.PENDING, SubmissionStatus.PENDING, SubmissionStatus.RUNNING, SubmissionStatus.FAILED],
+        ),
+        (
+            [Status.SUBMITTED, Status.PENDING, Status.CANCELED],
+            [SubmissionStatus.PENDING, SubmissionStatus.PENDING, SubmissionStatus.CANCELED],
+        ),
+        (
+            [Status.SUBMITTED, Status.PENDING, Status.RUNNING, Status.CANCELED],
+            [SubmissionStatus.PENDING, SubmissionStatus.PENDING, SubmissionStatus.RUNNING, SubmissionStatus.CANCELED],
+        ),
+        ([Status.SUBMITTED, Status.BLOCKED], [SubmissionStatus.PENDING, SubmissionStatus.BLOCKED]),
+        ([Status.SUBMITTED, Status.SKIPPED], [SubmissionStatus.PENDING, SubmissionStatus.COMPLETED]),
+    ],
+)
+def test_update_submission_status_with_single_job_completed(job_statuses, expected_submission_statuses):
+    job = MockJob("job_id", Status.SUBMITTED)
+    submission = Submission("submission_id", "ENTITY_TYPE")
+
+    assert submission.submission_status == SubmissionStatus.SUBMITTED
+
+    for job_status, submission_status in zip(job_statuses, expected_submission_statuses):
+        job.status = job_status
+        submission._update_submission_status(job)
+        assert submission.submission_status == submission_status
+
+
+def __test_update_submission_status_with_two_jobs(job_ids, job_statuses, expected_submission_statuses):
+    jobs = {job_id: MockJob(job_id, Status.SUBMITTED) for job_id in job_ids}
+    submission = Submission("submission_id", "ENTITY_TYPE")
+
+    assert submission.submission_status == SubmissionStatus.SUBMITTED
+
+    for (job_id, job_status), submission_status in zip(job_statuses, expected_submission_statuses):
+        job = jobs[job_id]
+        job.status = job_status
+        submission._update_submission_status(job)
+        assert submission.submission_status == submission_status
+
+
+@pytest.mark.parametrize(
+    "job_ids, job_statuses, expected_submission_statuses",
+    [
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.PENDING),
+                ("job_2", Status.PENDING),
+                ("job_1", Status.RUNNING),
+                ("job_2", Status.RUNNING),
+                ("job_1", Status.COMPLETED),
+                ("job_2", Status.COMPLETED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.COMPLETED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.PENDING),
+                ("job_1", Status.RUNNING),
+                ("job_2", Status.PENDING),
+                ("job_2", Status.RUNNING),
+                ("job_1", Status.COMPLETED),
+                ("job_2", Status.COMPLETED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.COMPLETED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.BLOCKED),
+                ("job_2", Status.PENDING),
+                ("job_2", Status.RUNNING),
+                ("job_2", Status.COMPLETED),
+                ("job_1", Status.PENDING),
+                ("job_1", Status.RUNNING),
+                ("job_1", Status.COMPLETED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.BLOCKED,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.COMPLETED,
+            ],
+        ),
+    ],
+)
+def test_update_submission_status_with_two_jobs_completed(job_ids, job_statuses, expected_submission_statuses):
+    __test_update_submission_status_with_two_jobs(job_ids, job_statuses, expected_submission_statuses)
+
+
+@pytest.mark.parametrize(
+    "job_ids, job_statuses, expected_submission_statuses",
+    [
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.PENDING),
+                ("job_2", Status.PENDING),
+                ("job_1", Status.RUNNING),
+                ("job_2", Status.SKIPPED),
+                ("job_1", Status.COMPLETED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.COMPLETED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.PENDING),
+                ("job_1", Status.RUNNING),
+                ("job_2", Status.PENDING),
+                ("job_2", Status.SKIPPED),
+                ("job_1", Status.COMPLETED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.COMPLETED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.BLOCKED),
+                ("job_2", Status.PENDING),
+                ("job_2", Status.RUNNING),
+                ("job_2", Status.COMPLETED),
+                ("job_1", Status.PENDING),
+                ("job_1", Status.SKIPPED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.BLOCKED,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.COMPLETED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.PENDING),
+                ("job_2", Status.PENDING),
+                ("job_1", Status.SKIPPED),
+                ("job_2", Status.SKIPPED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.COMPLETED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.PENDING),
+                ("job_1", Status.SKIPPED),
+                ("job_2", Status.PENDING),
+                ("job_2", Status.SKIPPED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.COMPLETED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.BLOCKED),
+                ("job_2", Status.PENDING),
+                ("job_2", Status.SKIPPED),
+                ("job_1", Status.PENDING),
+                ("job_1", Status.SKIPPED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.BLOCKED,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.COMPLETED,
+            ],
+        ),
+    ],
+)
+def test_update_submission_status_with_two_jobs_skipped(job_ids, job_statuses, expected_submission_statuses):
+    __test_update_submission_status_with_two_jobs(job_ids, job_statuses, expected_submission_statuses)
+
+
+@pytest.mark.parametrize(
+    "job_ids, job_statuses, expected_submission_statuses",
+    [
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.PENDING),
+                ("job_2", Status.PENDING),
+                ("job_1", Status.RUNNING),
+                ("job_2", Status.RUNNING),
+                ("job_1", Status.FAILED),
+                ("job_2", Status.COMPLETED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.FAILED,
+                SubmissionStatus.FAILED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.PENDING),
+                ("job_1", Status.RUNNING),
+                ("job_2", Status.PENDING),
+                ("job_2", Status.RUNNING),
+                ("job_1", Status.COMPLETED),
+                ("job_2", Status.FAILED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.FAILED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.BLOCKED),
+                ("job_2", Status.PENDING),
+                ("job_2", Status.RUNNING),
+                ("job_2", Status.FAILED),
+                ("job_1", Status.ABANDONED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.FAILED,
+                SubmissionStatus.FAILED,
+            ],
+        ),
+    ],
+)
+def test_update_submission_status_with_two_jobs_failed(job_ids, job_statuses, expected_submission_statuses):
+    __test_update_submission_status_with_two_jobs(job_ids, job_statuses, expected_submission_statuses)
+
+
+@pytest.mark.parametrize(
+    "job_ids, job_statuses, expected_submission_statuses",
+    [
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.PENDING),
+                ("job_2", Status.PENDING),
+                ("job_1", Status.RUNNING),
+                ("job_2", Status.RUNNING),
+                ("job_1", Status.CANCELED),
+                ("job_2", Status.COMPLETED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.CANCELED,
+                SubmissionStatus.CANCELED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.PENDING),
+                ("job_1", Status.RUNNING),
+                ("job_2", Status.PENDING),
+                ("job_2", Status.RUNNING),
+                ("job_1", Status.COMPLETED),
+                ("job_2", Status.CANCELED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.CANCELED,
+            ],
+        ),
+        (
+            ["job_1", "job_2"],
+            [
+                ("job_1", Status.SUBMITTED),
+                ("job_2", Status.SUBMITTED),
+                ("job_1", Status.BLOCKED),
+                ("job_2", Status.PENDING),
+                ("job_2", Status.RUNNING),
+                ("job_2", Status.CANCELED),
+                ("job_1", Status.ABANDONED),
+            ],
+            [
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.PENDING,
+                SubmissionStatus.RUNNING,
+                SubmissionStatus.CANCELED,
+                SubmissionStatus.CANCELED,
+            ],
+        ),
+    ],
+)
+def test_update_submission_status_with_two_jobs_canceled(job_ids, job_statuses, expected_submission_statuses):
+    __test_update_submission_status_with_two_jobs(job_ids, job_statuses, expected_submission_statuses)