Explorar o código

Merge pull request #939 from Avaiga/feature/cleaning-dispatcher

Move specific attributes from job_dispatcher to standalone job dispatcher
Jean-Robin hai 1 ano
pai
achega
01b16d59d3

+ 10 - 0
taipy/core/_orchestrator/_abstract_orchestrator.py

@@ -21,6 +21,16 @@ from ..task.task import Task
 class _AbstractOrchestrator:
     """Creates, enqueues, and orchestrates jobs as instances of `Job^` class."""
 
+    @property
+    @abstractmethod
+    def jobs_to_run(self):
+        pass
+
+    @property
+    @abstractmethod
+    def blocked_jobs(self):
+        pass
+
     @classmethod
     @abstractmethod
     def initialize(cls):

+ 3 - 0
taipy/core/_orchestrator/_dispatcher/_development_job_dispatcher.py

@@ -23,6 +23,9 @@ class _DevelopmentJobDispatcher(_JobDispatcher):
     def __init__(self, orchestrator: _AbstractOrchestrator):
         super().__init__(orchestrator)
 
+    def _can_execute(self) -> bool:
+        return True
+
     def start(self):
         raise NotImplementedError
 

+ 11 - 16
taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py

@@ -13,7 +13,7 @@ import threading
 import time
 from abc import abstractmethod
 from queue import Empty
-from typing import Dict, Optional
+from typing import Optional
 
 from taipy.config.config import Config
 from taipy.logger._taipy_logger import _TaipyLogger
@@ -29,9 +29,9 @@ class _JobDispatcher(threading.Thread):
     """Manages job dispatching (instances of `Job^` class) on executors."""
 
     _STOP_FLAG = False
-    _dispatched_processes: Dict = {}
+    stop_wait = True
+    stop_timeout = None
     _logger = _TaipyLogger._get_logger()
-    _nb_available_workers: int = 1
 
     def __init__(self, orchestrator: _AbstractOrchestrator):
         threading.Thread.__init__(self, name="Thread-Taipy-JobDispatcher")
@@ -55,10 +55,9 @@ class _JobDispatcher(threading.Thread):
             wait (bool): If True, the method will wait for the dispatcher to stop.
             timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely.
         """
+        self.stop_wait = wait
+        self.stop_timeout = timeout
         self._STOP_FLAG = True
-        if wait and self.is_alive():
-            self._logger.debug("Waiting for the dispatcher thread to stop...")
-            self.join(timeout=timeout)
 
     def run(self):
         self._logger.debug("Job dispatcher started.")
@@ -77,11 +76,15 @@ class _JobDispatcher(threading.Thread):
             except Exception as e:
                 self._logger.exception(e)
                 pass
+        if self.stop_wait:
+            self._logger.debug("Waiting for the dispatcher thread to stop...")
+            self.join(timeout=self.stop_timeout)
         self._logger.debug("Job dispatcher stopped.")
 
+    @abstractmethod
     def _can_execute(self) -> bool:
-        """Returns True if the dispatcher have resources to execute a new job."""
-        return self._nb_available_workers > 0
+        """Returns True if the dispatcher have resources to dispatch a new job."""
+        raise NotImplementedError
 
     def _execute_job(self, job: Job):
         if job.force or self._needs_to_run(job.task):
@@ -141,11 +144,3 @@ class _JobDispatcher(threading.Thread):
     def _update_job_status(job: Job, exceptions):
         job.update_status(exceptions)
         _JobManagerFactory._build_manager()._set(job)
-
-    @classmethod
-    def _set_dispatched_processes(cls, job_id, process):
-        cls._dispatched_processes[job_id] = process
-
-    @classmethod
-    def _pop_dispatched_process(cls, job_id, default=None):
-        return cls._dispatched_processes.pop(job_id, default)  # type: ignore

+ 5 - 3
taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py

@@ -34,10 +34,14 @@ class _StandaloneJobDispatcher(_JobDispatcher):
         )  # type: ignore
         self._nb_available_workers = self._executor._max_workers  # type: ignore
 
+    def _can_execute(self) -> bool:
+        """Returns True if the dispatcher have resources to dispatch a job."""
+        return self._nb_available_workers > 0
+
     def run(self):
         with self._executor:
             super().run()
-        self._logger.info("Standalone job dispatcher: Pool executor shut down")
+        self._logger.debug("Standalone job dispatcher: Pool executor shut down")
 
     def _dispatch(self, job: Job):
         """Dispatches the given `Job^` on an available worker for execution.
@@ -50,7 +54,6 @@ class _StandaloneJobDispatcher(_JobDispatcher):
         config_as_string = _TomlSerializer()._serialize(Config._applied_config)  # type: ignore[attr-defined]
         future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string)
 
-        self._set_dispatched_processes(job.id, future)  # type: ignore
         future.add_done_callback(self._release_worker)  # We must release the worker before updating the job status
         # so that the worker is available for another job as soon as possible.
         future.add_done_callback(partial(self._update_job_status_from_future, job))
@@ -59,5 +62,4 @@ class _StandaloneJobDispatcher(_JobDispatcher):
         self._nb_available_workers += 1
 
     def _update_job_status_from_future(self, job: Job, ft):
-        self._pop_dispatched_process(job.id)  # type: ignore
         self._update_job_status(job, ft.result())

+ 3 - 3
taipy/core/_orchestrator/_orchestrator.py

@@ -37,6 +37,7 @@ class _Orchestrator(_AbstractOrchestrator):
 
     jobs_to_run: Queue = Queue()
     blocked_jobs: List = []
+
     lock = Lock()
     __logger = _TaipyLogger._get_logger()
 
@@ -255,7 +256,7 @@ class _Orchestrator(_AbstractOrchestrator):
             cls.__logger.info(f"{job.id} has already failed and cannot be canceled.")
         else:
             with cls.lock:
-                to_cancel_or_abandon_jobs = set([job])
+                to_cancel_or_abandon_jobs = {job}
                 to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys())))
                 cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs)
                 cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs)
@@ -307,10 +308,9 @@ class _Orchestrator(_AbstractOrchestrator):
 
     @classmethod
     def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]):
-        from ._orchestrator_factory import _OrchestratorFactory
 
         for job in jobs:
-            if job.id in _OrchestratorFactory._dispatcher._dispatched_processes.keys():  # type: ignore
+            if job.is_running():
                 cls.__logger.info(f"{job.id} is running and cannot be canceled.")
             elif job.is_completed():
                 cls.__logger.info(f"{job.id} has already been completed and cannot be canceled.")

+ 0 - 3
taipy/core/job/_job_manager.py

@@ -61,9 +61,6 @@ class _JobManager(_Manager[Job], _VersionMixin):
     def _delete(cls, job: Job, force=False):
         if cls._is_deletable(job) or force:
             super()._delete(job.id)
-            from .._orchestrator._dispatcher._job_dispatcher import _JobDispatcher
-
-            _JobDispatcher._pop_dispatched_process(job.id)
         else:
             err = JobNotDeletedException(job.id)
             cls._logger.error(err)

+ 2 - 0
tests/core/_entity/test_migrate_cli.py

@@ -26,6 +26,8 @@ from taipy.core._entity._migrate_cli import _MigrateCLI
 def clean_data_folder():
     if os.path.exists("tests/core/_entity/.data"):
         shutil.rmtree("tests/core/_entity/.data")
+    if os.path.exists("tests/core/_entity/.taipy"):
+        shutil.rmtree("tests/core/_entity/.taipy")
     yield
 
 

+ 2 - 10
tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py

@@ -38,10 +38,10 @@ class MockStandaloneDispatcher(_StandaloneJobDispatcher):
     def __init__(self, orchestrator: _AbstractOrchestrator):
         super(_StandaloneJobDispatcher, self).__init__(orchestrator)
         self._executor: Executor = MockProcessPoolExecutor()
+        self._nb_available_workers = 1
+
         self.dispatch_calls: List = []
         self.release_worker_calls: List = []
-        self.set_dispatch_processes_calls: List = []
-        self.pop_dispatch_processes_calls: List = []
         self.update_job_status_from_future_calls: List = []
 
     def mock_exception_for_job(self, task_id, e: Exception):
@@ -51,14 +51,6 @@ class MockStandaloneDispatcher(_StandaloneJobDispatcher):
         self.dispatch_calls.append(job)
         super()._dispatch(job)
 
-    def _set_dispatched_processes(self, job_id, future):
-        self.set_dispatch_processes_calls.append((job_id, future))
-        super()._set_dispatched_processes(job_id, future)
-
-    def _pop_dispatched_process(self, job_id, default=None):
-        self.pop_dispatch_processes_calls.append(job_id)
-        return super()._pop_dispatched_process(job_id, default)
-
     def _release_worker(self, _):
         self.release_worker_calls.append(None)
         super()._release_worker(_)

+ 6 - 0
tests/core/_orchestrator/_dispatcher/test_development_job_dispatcher.py

@@ -13,6 +13,7 @@ import traceback
 from unittest.mock import patch
 
 from taipy.core import JobId
+from taipy.core._orchestrator._dispatcher import _DevelopmentJobDispatcher
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.job.job import Job
 from taipy.core.task._task_manager_factory import _TaskManagerFactory
@@ -61,3 +62,8 @@ def test_dispatch_executes_the_function_with_exceptions():
     assert job.stacktrace[1] == "".join(traceback.format_exception(type(e_2), value=e_2, tb=e_2.__traceback__))
     assert job.stacktrace[0] == "".join(traceback.format_exception(type(e_1), value=e_1, tb=e_1.__traceback__))
     assert job.is_failed()
+
+
+def test_can_execute():
+    dispatcher = _DevelopmentJobDispatcher(_OrchestratorFactory._orchestrator)
+    assert dispatcher._can_execute()

+ 0 - 12
tests/core/_orchestrator/_dispatcher/test_dispatcher__execute_job.py

@@ -33,18 +33,6 @@ def create_scenario():
     return taipy.create_scenario(sc_conf)
 
 
-def test_can_execute():
-    dispatcher = _JobDispatcher(_OrchestratorFactory._orchestrator)
-    assert dispatcher._nb_available_workers == 1
-    assert dispatcher._can_execute()
-    dispatcher._nb_available_workers = 0
-    assert not dispatcher._can_execute()
-    dispatcher._nb_available_workers = -1
-    assert not dispatcher._can_execute()
-    dispatcher._nb_available_workers = 1
-    assert dispatcher._can_execute()
-
-
 def test_execute_job():
     scenario = create_scenario()
     scenario.t1.skippable = True  # make the job skippable

+ 12 - 9
tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py

@@ -70,11 +70,6 @@ def test_dispatch_job():
     assert submit_first_call[1] == ()
     assert submit_first_call[2]["config_as_string"] == _TomlSerializer()._serialize(Config._applied_config)
 
-    # test that the proc of the job is added to the list of dispatched jobs
-    assert len(dispatcher.set_dispatch_processes_calls) == 1
-    assert dispatcher.set_dispatch_processes_calls[0][0] == job.id
-    assert dispatcher.set_dispatch_processes_calls[0][1] == dispatcher._executor.f[0]
-
     # test that the worker is released after the job is done
     assert len(dispatcher.release_worker_calls) == 1
 
@@ -84,6 +79,18 @@ def test_dispatch_job():
     assert dispatcher.update_job_status_from_future_calls[0][1] == dispatcher._executor.f[0]
 
 
+def test_can_execute():
+    dispatcher = _StandaloneJobDispatcher(_OrchestratorFactory._orchestrator)
+    assert dispatcher._nb_available_workers == 1
+    assert dispatcher._can_execute()
+    dispatcher._nb_available_workers = 0
+    assert not dispatcher._can_execute()
+    dispatcher._nb_available_workers = -1
+    assert not dispatcher._can_execute()
+    dispatcher._nb_available_workers = 1
+    assert dispatcher._can_execute()
+
+
 def test_release_worker():
     dispatcher = _StandaloneJobDispatcher(_OrchestratorFactory._orchestrator)
 
@@ -101,11 +108,7 @@ def test_update_job_status_from_future():
     dispatcher = _StandaloneJobDispatcher(orchestrator)
     ft = Future()
     ft.set_result(None)
-    dispatcher._set_dispatched_processes(job.id, ft)  # the job is dispatched to a process
-
     dispatcher._update_job_status_from_future(job, ft)
-
-    assert len(dispatcher._dispatched_processes) == 0  # the job process is not stored anymore
     assert job.is_completed()
 
 

+ 0 - 3
tests/core/_orchestrator/_dispatcher/test_task_function_wrapper.py

@@ -17,7 +17,6 @@ from taipy.config._serializer._toml_serializer import _TomlSerializer
 from taipy.config.common.scope import Scope
 from taipy.config.exceptions import ConfigurationUpdateBlocked
 from taipy.core._orchestrator._dispatcher._task_function_wrapper import _TaskFunctionWrapper
-from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.task.task import Task
 
@@ -84,9 +83,7 @@ def test_execute_task_that_returns_single_iterable_output():
     _TaskFunctionWrapper("job_id_list", task_with_list).execute()
 
     assert task_with_tuple.output[f"{task_with_tuple.config_id}_output0"].read() == (42, 21)
-    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
     assert task_with_list.output[f"{task_with_list.config_id}_output0"].read() == [42, 21]
-    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
 
 
 def test_data_node_not_written_due_to_wrong_result_nb():

+ 28 - 40
tests/core/_orchestrator/test_orchestrator.py

@@ -14,11 +14,13 @@ import random
 import string
 from functools import partial
 from time import sleep
+from typing import cast
 
 import pytest
 
 from taipy.config import Config
 from taipy.config.common.scope import Scope
+from taipy.core._orchestrator._dispatcher import _StandaloneJobDispatcher
 from taipy.core._orchestrator._orchestrator import _Orchestrator
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.config.job_config import JobConfig
@@ -57,7 +59,7 @@ def test_submit_task_multithreading_multiple_task():
     task_1 = _create_task(partial(lock_multiply, lock_1))
     task_2 = _create_task(partial(lock_multiply, lock_2))
 
-    _OrchestratorFactory._build_dispatcher(force_restart=True)
+    dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher(force_restart=True))
 
     with lock_1:
         with lock_2:
@@ -70,7 +72,7 @@ def test_submit_task_multithreading_multiple_task():
             assert task_2.output[f"{task_2.config_id}_output0"].read() == 0
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_running)
-            assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2
+            assert dispatcher._nb_available_workers == 0
             assert_submission_status(submission_1, SubmissionStatus.RUNNING)
             assert_submission_status(submission_2, SubmissionStatus.RUNNING)
 
@@ -78,14 +80,14 @@ def test_submit_task_multithreading_multiple_task():
         assert_true_after_time(job_1.is_running)
         assert task_2.output[f"{task_2.config_id}_output0"].read() == 42
         assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
-        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1
+        assert dispatcher._nb_available_workers == 1
         assert_submission_status(submission_1, SubmissionStatus.RUNNING)
         assert_submission_status(submission_2, SubmissionStatus.COMPLETED)
 
     assert_true_after_time(job_1.is_completed)
     assert job_2.is_completed()
     assert task_1.output[f"{task_1.config_id}_output0"].read() == 42
-    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
+    assert dispatcher._nb_available_workers == 2
     assert_submission_status(submission_1, SubmissionStatus.COMPLETED)
     assert submission_2.submission_status == SubmissionStatus.COMPLETED
 
@@ -93,17 +95,13 @@ def test_submit_task_multithreading_multiple_task():
 @pytest.mark.orchestrator_dispatcher
 def test_submit_submittable_multithreading_multiple_task():
     Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
-
     m = multiprocessing.Manager()
     lock_1 = m.Lock()
     lock_2 = m.Lock()
-
     task_1 = _create_task(partial(lock_multiply, lock_1))
     task_2 = _create_task(partial(lock_multiply, lock_2))
-
-    scenario = Scenario("scenario_config", [task_1, task_2], {})
-
-    _OrchestratorFactory._build_dispatcher(force_restart=True)
+    scenario = Scenario("scenario_config", {task_1, task_2}, {})
+    dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher(force_restart=True))
 
     with lock_1:
         with lock_2:
@@ -116,20 +114,20 @@ def test_submit_submittable_multithreading_multiple_task():
             assert task_2.output[f"{task_2.config_id}_output0"].read() == 0
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_running)
-            assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2  # Two processes used
+            assert dispatcher._nb_available_workers == 0  # Two processes used
             assert_submission_status(submission, SubmissionStatus.RUNNING)
         assert_true_after_time(job_2.is_completed)
         assert_true_after_time(job_1.is_running)
         assert task_2.output[f"{task_2.config_id}_output0"].read() == 42
         assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
-        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1  # job 1 is completed: One process used
         assert_submission_status(submission, SubmissionStatus.RUNNING)
+        assert dispatcher._nb_available_workers == 1  # job 1 is completed: One process used
 
     assert_true_after_time(job_1.is_completed)
     assert job_2.is_completed()
     assert task_1.output[f"{task_1.config_id}_output0"].read() == 42
-    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0  # No more process used.
     assert_submission_status(submission, SubmissionStatus.COMPLETED)
+    assert dispatcher._nb_available_workers == 2  # No more process used.
 
 
 @pytest.mark.orchestrator_dispatcher
@@ -142,13 +140,13 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
     task_0 = _create_task(partial(lock_multiply, lock_0))
     task_1 = _create_task(partial(lock_multiply, lock_1))
     task_2 = _create_task(partial(lock_multiply, lock_2))
-    _OrchestratorFactory._build_dispatcher(force_restart=True)
+    dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher(force_restart=True))
 
     with lock_0:
         submission_0 = _Orchestrator.submit_task(task_0)
         job_0 = submission_0._jobs[0]
         assert_true_after_time(job_0.is_running)
-        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1
+        assert dispatcher._nb_available_workers == 1
         assert_submission_status(submission_0, SubmissionStatus.RUNNING)
         with lock_1:
             with lock_2:
@@ -164,12 +162,12 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
                 assert_submission_status(submission_0, SubmissionStatus.RUNNING)
                 assert_submission_status(submission_1, SubmissionStatus.PENDING)
                 assert_submission_status(submission_2, SubmissionStatus.RUNNING)
-                assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2
+                assert dispatcher._nb_available_workers == 0
 
             assert_true_after_time(job_0.is_running)
             assert_true_after_time(job_1.is_running)
             assert_true_after_time(job_2.is_completed)
-            assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 2
+            assert dispatcher._nb_available_workers == 0
             assert task_2.output[f"{task_2.config_id}_output0"].read() == 42
             assert task_1.output[f"{task_1.config_id}_output0"].read() == 0
             assert_submission_status(submission_0, SubmissionStatus.RUNNING)
@@ -179,7 +177,7 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
         assert_true_after_time(job_0.is_running)
         assert_true_after_time(job_1.is_completed)
         assert job_2.is_completed()
-        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1
+        assert dispatcher._nb_available_workers == 1
         assert task_1.output[f"{task_1.config_id}_output0"].read() == 42
         assert task_0.output[f"{task_0.config_id}_output0"].read() == 0
         assert_submission_status(submission_0, SubmissionStatus.RUNNING)
@@ -189,7 +187,7 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
     assert_true_after_time(job_0.is_completed)
     assert job_1.is_completed()
     assert job_2.is_completed()
-    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
+    assert dispatcher._nb_available_workers == 2
     assert task_0.output[f"{task_0.config_id}_output0"].read() == 42
     assert _SubmissionManager._get(job_0.submit_id).submission_status == SubmissionStatus.COMPLETED
     assert _SubmissionManager._get(job_1.submit_id).submission_status == SubmissionStatus.COMPLETED
@@ -198,17 +196,15 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
 
 @pytest.mark.orchestrator_dispatcher
 def test_blocked_task():
-    Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
-
+    Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=4)
     m = multiprocessing.Manager()
     lock_1 = m.Lock()
     lock_2 = m.Lock()
-
     foo_cfg = Config.configure_data_node("foo", default_data=1)
     bar_cfg = Config.configure_data_node("bar")
     baz_cfg = Config.configure_data_node("baz")
 
-    _OrchestratorFactory._build_dispatcher(force_restart=True)
+    dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher(force_restart=True))
 
     dns = _DataManager._bulk_get_or_create([foo_cfg, bar_cfg, baz_cfg])
     foo = dns[foo_cfg]
@@ -216,16 +212,14 @@ def test_blocked_task():
     baz = dns[baz_cfg]
     task_1 = Task("by_2", {}, partial(lock_multiply, lock_1, 2), [foo], [bar])
     task_2 = Task("by_3", {}, partial(lock_multiply, lock_2, 3), [bar], [baz])
-
     assert task_1.foo.is_ready_for_reading  # foo is ready
     assert not task_1.bar.is_ready_for_reading  # But bar is not ready
     assert not task_2.baz.is_ready_for_reading  # neither does baz
-
     assert len(_Orchestrator.blocked_jobs) == 0
     submission_2 = _Orchestrator.submit_task(task_2)
     job_2 = submission_2._jobs[0]  # job 2 is submitted
     assert job_2.is_blocked()  # since bar is not is_valid the job 2 is blocked
-    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0  # No process used
+    assert dispatcher._nb_available_workers == 4  # No process used
     assert _SubmissionManager._get(job_2.submit_id).submission_status == SubmissionStatus.BLOCKED
     assert len(_Orchestrator.blocked_jobs) == 1  # One job (job 2) is blocked
     with lock_2:
@@ -233,7 +227,7 @@ def test_blocked_task():
             submission_1 = _Orchestrator.submit_task(task_1)
             job_1 = submission_1._jobs[0]  # job 1 is submitted and locked
             assert_true_after_time(job_1.is_running)  # so it is still running
-            assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1  # One process used for job 1
+            assert dispatcher._nb_available_workers == 3  # One process used for job 1
             assert not _DataManager._get(task_1.bar.id).is_ready_for_reading  # And bar still not ready
             assert job_2.is_blocked  # the job_2 remains blocked
             assert_submission_status(submission_1, SubmissionStatus.RUNNING)
@@ -242,14 +236,14 @@ def test_blocked_task():
         assert _DataManager._get(task_1.bar.id).is_ready_for_reading  # bar becomes ready
         assert _DataManager._get(task_1.bar.id).read() == 2  # the data is computed and written
         assert_true_after_time(job_2.is_running)  # And job 2 can start running
-        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1  # One process used for job 2
+        assert dispatcher._nb_available_workers == 3  # One process used for job 2
         assert len(_Orchestrator.blocked_jobs) == 0
         assert_submission_status(submission_1, SubmissionStatus.COMPLETED)
         assert_submission_status(submission_2, SubmissionStatus.RUNNING)
     assert_true_after_time(job_2.is_completed)  # job 2 unlocked so it can complete
     assert _DataManager._get(task_2.baz.id).is_ready_for_reading  # baz becomes ready
     assert _DataManager._get(task_2.baz.id).read() == 6  # the data is computed and written
-    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0  # No more process used.
+    assert dispatcher._nb_available_workers == 4  # No more process used.
     assert submission_1.submission_status == SubmissionStatus.COMPLETED
     assert_submission_status(submission_2, SubmissionStatus.COMPLETED)
 
@@ -257,29 +251,23 @@ def test_blocked_task():
 @pytest.mark.orchestrator_dispatcher
 def test_blocked_submittable():
     Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
-
     m = multiprocessing.Manager()
     lock_1 = m.Lock()
     lock_2 = m.Lock()
-
     foo_cfg = Config.configure_data_node("foo", default_data=1)
     bar_cfg = Config.configure_data_node("bar")
     baz_cfg = Config.configure_data_node("baz")
-
-    _OrchestratorFactory._build_dispatcher(force_restart=True)
-
+    dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher(force_restart=True))
     dns = _DataManager._bulk_get_or_create([foo_cfg, bar_cfg, baz_cfg])
     foo = dns[foo_cfg]
     bar = dns[bar_cfg]
     baz = dns[baz_cfg]
     task_1 = Task("by_2", {}, partial(lock_multiply, lock_1, 2), [foo], [bar])
     task_2 = Task("by_3", {}, partial(lock_multiply, lock_2, 3), [bar], [baz])
-    scenario = Scenario("scenario_config", [task_1, task_2], {})
-
+    scenario = Scenario("scenario_config", {task_1, task_2}, {})
     assert task_1.foo.is_ready_for_reading  # foo is ready
     assert not task_1.bar.is_ready_for_reading  # But bar is not ready
     assert not task_2.baz.is_ready_for_reading  # neither does baz
-
     assert len(_Orchestrator.blocked_jobs) == 0
     with lock_2:
         with lock_1:
@@ -287,23 +275,23 @@ def test_blocked_submittable():
             tasks_jobs = {job._task.id: job for job in submission._jobs}
             job_1, job_2 = tasks_jobs[task_1.id], tasks_jobs[task_2.id]
             assert_true_after_time(job_1.is_running)  # job 1 is submitted and locked so it is still running
-            assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1
             assert not _DataManager._get(task_1.bar.id).is_ready_for_reading  # And bar still not ready
             assert job_2.is_blocked  # the job_2 remains blocked
             assert_submission_status(submission, SubmissionStatus.RUNNING)
+            assert dispatcher._nb_available_workers == 1
         assert_true_after_time(job_1.is_completed)  # job1 unlocked and can complete
         assert _DataManager._get(task_1.bar.id).is_ready_for_reading  # bar becomes ready
         assert _DataManager._get(task_1.bar.id).read() == 2  # the data is computed and written
         assert_true_after_time(job_2.is_running)  # And job 2 can start running
-        assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 1 # Still one process
         # currently used since the previous process is not used anymore
         assert len(_Orchestrator.blocked_jobs) == 0
         assert_submission_status(submission, SubmissionStatus.RUNNING)
+        assert dispatcher._nb_available_workers == 1 # Still one process
     assert_true_after_time(job_2.is_completed)  # job 2 unlocked so it can complete
     assert _DataManager._get(task_2.baz.id).is_ready_for_reading  # baz becomes ready
     assert _DataManager._get(task_2.baz.id).read() == 6  # the data is computed and written
-    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0  # No more process used.
     assert_submission_status(submission, SubmissionStatus.COMPLETED)
+    assert dispatcher._nb_available_workers == 2  # No more process used.
 
 
 # ################################  UTIL METHODS    ##################################

+ 4 - 3
tests/core/_orchestrator/test_orchestrator__cancel_jobs.py

@@ -8,10 +8,12 @@
 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
+from typing import cast
 
 from taipy import Job, JobId, Status
 from taipy.config import Config
 from taipy.core import taipy
+from taipy.core._orchestrator._orchestrator import _Orchestrator
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.job._job_manager_factory import _JobManagerFactory
 from taipy.core.task._task_manager_factory import _TaskManagerFactory
@@ -89,11 +91,10 @@ def test_cancel_job_with_subsequent_jobs_and_parallel_jobs():
     job3 = orchestrator._lock_dn_output_and_create_job(scenario.t3, "s_id", "e_id")
     job2bis = orchestrator._lock_dn_output_and_create_job(scenario.t2bis, "s_id", "e_id")
     job1.completed()
-
-    job2.running()
+    job2.pending()
     job3.blocked()
     job2bis.pending()
-    orchestrator.blocked_jobs = [job3]
+    cast(_Orchestrator, orchestrator).blocked_jobs = [job3]
 
     orchestrator.cancel_job(job2)
 

+ 22 - 27
tests/core/job/test_job_manager.py

@@ -14,13 +14,14 @@ import random
 import string
 from functools import partial
 from time import sleep
+from typing import cast
 from unittest import mock
 
 import pytest
 
 from taipy.config.common.scope import Scope
 from taipy.config.config import Config
-from taipy.core._orchestrator._dispatcher._job_dispatcher import _JobDispatcher
+from taipy.core._orchestrator._dispatcher import _StandaloneJobDispatcher
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.config.job_config import JobConfig
 from taipy.core.data._data_manager import _DataManager
@@ -160,11 +161,10 @@ def test_raise_when_trying_to_delete_unfinished_job():
     task = Task(
         "task_config_1", {}, partial(lock_multiply, lock), [dn_1, dn_2], [dn_3], id="raise_when_delete_unfinished"
     )
-    _OrchestratorFactory._build_dispatcher()
+    dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher())
     with lock:
         job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
-
-        assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 1)
+        assert_true_after_time(lambda: dispatcher._nb_available_workers == 1)
         assert_true_after_time(job.is_running)
         with pytest.raises(JobNotDeletedException):
             _JobManager._delete(job)
@@ -203,19 +203,17 @@ def test_cancel_single_job():
 
     task = _create_task(multiply, name="cancel_single_job")
 
-    _OrchestratorFactory._build_dispatcher()
-
-    assert_true_after_time(_OrchestratorFactory._dispatcher.is_running)
-    _OrchestratorFactory._dispatcher.stop()
-    assert_true_after_time(lambda: not _OrchestratorFactory._dispatcher.is_running())
+    dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher())
 
+    assert_true_after_time(dispatcher.is_running)
+    dispatcher.stop()
+    assert_true_after_time(lambda: not dispatcher.is_running())
     job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
     assert_true_after_time(job.is_pending)
-    assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 0)
+    assert_true_after_time(lambda: dispatcher._nb_available_workers == 1)
     _JobManager._cancel(job.id)
     assert_true_after_time(job.is_canceled)
-    assert_true_after_time(job.is_canceled)
 
 
 @mock.patch(
@@ -228,11 +226,11 @@ def test_cancel_canceled_abandoned_failed_jobs(cancel_jobs, orchestrated_job):
 
     task = _create_task(multiply, name="test_cancel_canceled_abandoned_failed_jobs")
 
-    _OrchestratorFactory._build_dispatcher()
+    dispatcher = _OrchestratorFactory._build_dispatcher()
 
-    assert_true_after_time(_OrchestratorFactory._dispatcher.is_running)
-    _OrchestratorFactory._dispatcher.stop()
-    assert_true_after_time(lambda: not _OrchestratorFactory._dispatcher.is_running())
+    assert_true_after_time(dispatcher.is_running)
+    dispatcher.stop()
+    assert_true_after_time(lambda: not dispatcher.is_running())
 
     job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job.canceled()
@@ -265,11 +263,11 @@ def test_cancel_completed_skipped_jobs(cancel_jobs, orchestrated_job):
     Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=1)
     task = _create_task(multiply, name="cancel_single_job")
 
-    _OrchestratorFactory._build_dispatcher()
+    dispatcher = _OrchestratorFactory._build_dispatcher()
 
-    assert_true_after_time(_OrchestratorFactory._dispatcher.is_running)
-    _OrchestratorFactory._dispatcher.stop()
-    assert_true_after_time(lambda: not _OrchestratorFactory._dispatcher.is_running())
+    assert_true_after_time(dispatcher.is_running)
+    dispatcher.stop()
+    assert_true_after_time(lambda: not dispatcher.is_running())
 
     job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job.completed()
@@ -307,22 +305,19 @@ def test_cancel_single_running_job():
     dnm._set(dn_3)
     task = Task("task_config_1", {}, partial(lock_multiply, lock), [dn_1, dn_2], [dn_3], id="cancel_single_job")
 
-    _OrchestratorFactory._build_dispatcher()
+    dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher(force_restart=True))
 
-    assert_true_after_time(_OrchestratorFactory._dispatcher.is_running)
-    assert_true_after_time(lambda: _OrchestratorFactory._dispatcher._nb_available_workers == 2)
+    assert_true_after_time(dispatcher.is_running)
+    assert_true_after_time(lambda: dispatcher._nb_available_workers == 2)
 
     with lock:
         job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
-
-        assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 1)
-        assert_true_after_time(lambda: _OrchestratorFactory._dispatcher._nb_available_workers == 1)
         assert_true_after_time(job.is_running)
+        assert dispatcher._nb_available_workers == 1
         _JobManager._cancel(job)
         assert_true_after_time(job.is_running)
-    assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 0)
-    assert_true_after_time(lambda: _OrchestratorFactory._dispatcher._nb_available_workers == 2)
     assert_true_after_time(job.is_completed)
+    assert dispatcher._nb_available_workers == 2
 
 
 def test_cancel_subsequent_jobs():

+ 7 - 8
tests/core/job/test_job_manager_with_sql_repo.py

@@ -14,13 +14,14 @@ import random
 import string
 from functools import partial
 from time import sleep
+from typing import cast
 
 import pytest
 
 from taipy.config.common.scope import Scope
 from taipy.config.config import Config
 from taipy.core import Task
-from taipy.core._orchestrator._dispatcher._job_dispatcher import _JobDispatcher
+from taipy.core._orchestrator._dispatcher import _StandaloneJobDispatcher
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.config.job_config import JobConfig
 from taipy.core.data import InMemoryDataNode
@@ -134,10 +135,8 @@ def test_delete_job(init_sql_repo):
 
 
 def test_raise_when_trying_to_delete_unfinished_job(init_sql_repo):
-    Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
+    Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=3)
 
-    m = multiprocessing.Manager()
-    lock = m.Lock()
     dnm = _DataManagerFactory._build_manager()
     dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})
     dnm._set(dn_1)
@@ -145,14 +144,15 @@ def test_raise_when_trying_to_delete_unfinished_job(init_sql_repo):
     dnm._set(dn_2)
     dn_3 = InMemoryDataNode("dn_config_3", Scope.SCENARIO)
     dnm._set(dn_3)
+    proc_manager = multiprocessing.Manager()
+    lock = proc_manager.Lock()
     task = Task("task_cfg", {}, partial(lock_multiply, lock), [dn_1, dn_2], [dn_3], id="raise_when_delete_unfinished")
-    _OrchestratorFactory._build_dispatcher()
+    dispatcher = cast(_StandaloneJobDispatcher, _OrchestratorFactory._build_dispatcher(force_restart=True))
 
     with lock:
         job = _OrchestratorFactory._orchestrator.submit_task(task)._jobs[0]
-
-        assert_true_after_time(lambda: len(_JobDispatcher._dispatched_processes) == 1)
         assert_true_after_time(job.is_running)
+        assert dispatcher._nb_available_workers == 2
         with pytest.raises(JobNotDeletedException):
             _JobManager._delete(job)
         with pytest.raises(JobNotDeletedException):
@@ -178,7 +178,6 @@ def test_force_deleting_unfinished_job(init_sql_repo):
     )
     reference_last_edit_date = dn_3.last_edit_date
     _OrchestratorFactory._build_dispatcher()
-
     with lock:
         job = _OrchestratorFactory._orchestrator.submit_task(task_1)._jobs[0]
         assert_true_after_time(job.is_running)