Переглянути джерело

Revert "2- REMOVE dispatcher folder"

This reverts commit 04ad120fdf96364d337a6c353db093d85f7e08f7.
jrobinAV 1 рік тому
батько
коміт
5e521e955e

+ 10 - 0
tests/core/_orchestrator/_dispatcher/__init__.py

@@ -0,0 +1,10 @@
+# Copyright 2023 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.

+ 71 - 0
tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py

@@ -0,0 +1,71 @@
+# Copyright 2023 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from concurrent.futures import Executor, Future
+from typing import Optional
+
+from taipy import Config
+from taipy.config._serializer._toml_serializer import _TomlSerializer
+from taipy.core import Job
+from taipy.core._orchestrator._abstract_orchestrator import _AbstractOrchestrator
+
+from taipy.core._orchestrator._dispatcher import _StandaloneJobDispatcher
+from taipy.core._orchestrator._dispatcher._task_function_wrapper import _TaskFunctionWrapper
+
+
+class MockProcessPoolExecutor(Executor):
+    submit_called = []
+    f = []
+
+    def submit(self, fn, *args, **kwargs):
+        self.submit_called.append((fn, args, kwargs))
+        f = Future()
+        try:
+            result = fn(*args, **kwargs)
+        except BaseException as e:
+            f.set_exception(e)
+        else:
+            f.set_result(result)
+        self.f.append(f)
+        return f
+
+
+class MockStandaloneDispatcher(_StandaloneJobDispatcher):
+    def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
+        super(_StandaloneJobDispatcher, self).__init__(orchestrator)
+        self._executor = MockProcessPoolExecutor()
+        self.dispatch_calls = []
+        self.release_worker_calls = []
+        self.set_dispatch_processes_calls = []
+        self.pop_dispatch_processes_calls = []
+        self.update_job_status_from_future_calls = []
+
+    def mock_exception_for_job(self, task_id, e: Exception):
+        self.exceptions[task_id] = e
+
+    def _dispatch(self, job: Job):
+        self.dispatch_calls.append(job)
+        super()._dispatch(job)
+
+    def _set_dispatched_processes(self, job_id, future):
+        self.set_dispatch_processes_calls.append((job_id, future))
+        super()._set_dispatched_processes(job_id, future)
+
+    def _pop_dispatched_process(self, job_id, default=None):
+        self.pop_dispatch_processes_calls.append(job_id)
+        return super()._pop_dispatched_process(job_id, default)
+
+    def _release_worker(self, _):
+        self.release_worker_calls.append(None)
+        super()._release_worker(_)
+
+    def _update_job_status_from_future(self, job: Job, ft):
+        self.update_job_status_from_future_calls.append((job, ft))
+        super()._update_job_status_from_future(job, ft)

+ 63 - 0
tests/core/_orchestrator/_dispatcher/test_development_job_dispatcher.py

@@ -0,0 +1,63 @@
+# Copyright 2023 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import traceback
+from unittest.mock import patch
+
+from taipy.core import JobId
+from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
+from taipy.core.job.job import Job
+from taipy.core.task._task_manager_factory import _TaskManagerFactory
+from taipy.core.task.task import Task
+
+
+def nothing(*args):
+    return
+
+
+def create_task():
+    task = Task("config_id", {}, nothing, [], [])
+    _TaskManagerFactory._build_manager()._set(task)
+    return task
+
+
+def test_dispatch_executes_the_function_no_exception():
+    task = create_task()
+    job = Job(JobId("job"), task, "s_id", task.id)
+    dispatcher = _OrchestratorFactory._build_dispatcher()
+
+    with patch("taipy.core._orchestrator._dispatcher._task_function_wrapper._TaskFunctionWrapper.execute") as mck:
+        mck.return_value = []
+        dispatcher._dispatch(job)
+
+        mck.assert_called_once()
+
+    assert job.is_completed()
+    assert job.stacktrace == []
+
+
+def test_dispatch_executes_the_function_with_exceptions():
+    task = create_task()
+    job = Job(JobId("job"), task, "s_id", task.id)
+    dispatcher = _OrchestratorFactory._build_dispatcher()
+    e_1 = Exception("test")
+    e_2 = Exception("test")
+
+    with patch("taipy.core._orchestrator._dispatcher._task_function_wrapper._TaskFunctionWrapper.execute") as mck:
+        mck.return_value = [e_1, e_2]
+        dispatcher._dispatch(job)
+
+        mck.assert_called_once()
+
+    assert len(job.stacktrace) == 2
+    assert job.stacktrace[1] == "".join(traceback.format_exception(type(e_2), value=e_2, tb=e_2.__traceback__))
+    assert job.stacktrace[0] == "".join(traceback.format_exception(type(e_1), value=e_1, tb=e_1.__traceback__))
+    assert job.is_failed()

+ 130 - 0
tests/core/_orchestrator/_dispatcher/test_dispatcher__execute_job.py

@@ -0,0 +1,130 @@
+# Copyright 2023 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import multiprocessing
+import time
+from concurrent.futures import ProcessPoolExecutor
+from functools import partial
+from unittest import mock
+from unittest.mock import MagicMock
+
+from pytest import raises
+
+import taipy
+from taipy.config.config import Config
+from taipy.core import DataNodeId, JobId, TaskId
+from taipy.core._orchestrator._dispatcher import _JobDispatcher
+from taipy.core._orchestrator._dispatcher._development_job_dispatcher import _DevelopmentJobDispatcher
+from taipy.core._orchestrator._dispatcher._standalone_job_dispatcher import _StandaloneJobDispatcher
+from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
+from taipy.core.config.job_config import JobConfig
+from taipy.core.data._data_manager import _DataManager
+from taipy.core.job._job_manager_factory import _JobManagerFactory
+from taipy.core.job.job import Job
+from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
+from taipy.core.task._task_manager_factory import _TaskManagerFactory
+from taipy.core.task.task import Task
+from tests.core.utils import assert_true_after_time
+
+
+def nothing(*args):
+    return
+
+
+def create_scenario():
+    dn_cfg = Config.configure_pickle_data_node("dn")
+    t1_cfg = Config.configure_task("t1", nothing, [], [dn_cfg])
+    sc_conf = Config.configure_scenario("scenario_cfg", [t1_cfg])
+    return taipy.create_scenario(sc_conf)
+
+
+def test_can_execute():
+    dispatcher = _JobDispatcher(_OrchestratorFactory._orchestrator)
+    assert dispatcher._nb_available_workers == 1
+    assert dispatcher._can_execute()
+    dispatcher._nb_available_workers = 0
+    assert not dispatcher._can_execute()
+    dispatcher._nb_available_workers = -1
+    assert not dispatcher._can_execute()
+    dispatcher._nb_available_workers = 1
+    assert dispatcher._can_execute()
+
+
+def test_execute_job():
+    scenario = create_scenario()
+    scenario.t1.skippable = True  # make the job skippable
+    scenario.dn.lock_edit()  # lock output edit
+    job = Job(JobId("id"), scenario.t1, "submit_id", TaskId("id"))
+    _JobManagerFactory._build_manager()._set(job)
+    with mock.patch("taipy.core._orchestrator._dispatcher._job_dispatcher._JobDispatcher._dispatch") as mck_1:
+        with mock.patch("taipy.core._orchestrator._dispatcher._job_dispatcher._JobDispatcher._needs_to_run") as mck_2:
+            mck_2.return_value = True
+            dispatcher = _JobDispatcher(_OrchestratorFactory._build_orchestrator())
+            dispatcher._execute_job(job)
+
+            mck_2.assert_called_once_with(job.task)  # This should be called to check if job needs to run
+            mck_1.assert_called_once_with(job)
+            assert job.is_running()  # The job is not executed since the dispatch is mocked
+            assert scenario.dn.edit_in_progress  # outputs must NOT have been unlocked because the disptach is mocked
+
+
+def test_execute_job_to_skip():
+    scenario = create_scenario()
+    scenario.t1.skippable = True  # make the job skippable
+    scenario.dn.lock_edit()  # lock output edit
+    job = Job(JobId("id"), scenario.t1, "submit_id", TaskId("id"))
+    _JobManagerFactory._build_manager()._set(job)
+
+    with mock.patch("taipy.core._orchestrator._dispatcher._job_dispatcher._JobDispatcher._dispatch") as mck_1:
+        with mock.patch("taipy.core._orchestrator._dispatcher._job_dispatcher._JobDispatcher._needs_to_run") as mck_2:
+            mck_2.return_value = False
+            _JobDispatcher(_OrchestratorFactory._build_orchestrator())._execute_job(job)
+
+            assert job.is_skipped()
+            mck_1.assert_not_called()  # The job is expecting to be skipped, so it must not be dispatched
+            mck_2.assert_called_once_with(job.task)  # this must be called to check if the job needs to run
+            assert not scenario.dn.edit_in_progress  # outputs must have been unlocked
+
+
+def test_execute_job_skippable_with_force():
+    scenario = create_scenario()
+    scenario.t1.skippable = True  # make the job skippable
+    scenario.dn.lock_edit()  # lock output edit
+    job = Job(JobId("id"), scenario.t1, "submit_id", TaskId("id"), force=True)
+    _JobManagerFactory._build_manager()._set(job)
+
+    with mock.patch("taipy.core._orchestrator._dispatcher._job_dispatcher._JobDispatcher._dispatch") as mck_1:
+        with mock.patch("taipy.core._orchestrator._dispatcher._job_dispatcher._JobDispatcher._needs_to_run") as mck_2:
+            mck_2.return_value = False
+            dispatcher = _JobDispatcher(_OrchestratorFactory._orchestrator)
+            dispatcher._execute_job(job)
+
+            mck_1.assert_called_once_with(job)  # This should be called to dispatch the job
+            mck_2.assert_not_called()  # This should NOT be called since we force the execution anyway
+            assert job.is_running()  # The job is not executed since the dispatch is mocked
+            assert scenario.dn.edit_in_progress  # outputs must NOT have been unlocked because the disptach is mocked
+
+
+def test_execute_jobs_synchronously():
+    task = Task("config_id", {}, nothing, [], [])
+    _TaskManagerFactory._build_manager()._set(task)
+    job_1 = Job(JobId("job1"), task, "s_id", task.id)
+    job_2 = Job(JobId("job2"), task, "s_id", task.id)
+    _JobManagerFactory._build_manager()._set(job_1)
+    _JobManagerFactory._build_manager()._set(job_2)
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+    orchestrator.jobs_to_run.put(job_1)
+    orchestrator.jobs_to_run.put(job_2)
+
+    with mock.patch("taipy.core._orchestrator._dispatcher._job_dispatcher._JobDispatcher._execute_job") as mck:
+        _JobDispatcher(orchestrator)._execute_jobs_synchronously()
+        assert mck.call_count == 2
+        mck.assert_called_with(job_2)

+ 103 - 0
tests/core/_orchestrator/_dispatcher/test_dispatcher__needs_to_run.py

@@ -0,0 +1,103 @@
+# Copyright 2023 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+from datetime import datetime, timedelta
+
+import freezegun
+
+from taipy.config import Config
+from taipy.core._orchestrator._dispatcher import _JobDispatcher
+from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
+from taipy.core.task._task_manager import _TaskManager
+
+
+def nothing(*args):
+    pass
+
+
+def _create_task_from_config(task_cfg):
+    return _TaskManager()._bulk_get_or_create([task_cfg])[0]
+
+
+def test_need_to_run_no_output():
+    hello_cfg = Config.configure_data_node("hello", default_data="Hello ")
+    world_cfg = Config.configure_data_node("world", default_data="world !")
+    task_cfg = Config.configure_task("name", input=[hello_cfg, world_cfg], function=nothing, output=[])
+    task = _create_task_from_config(task_cfg)
+    assert _JobDispatcher(_OrchestratorFactory._build_orchestrator())._needs_to_run(task)
+
+
+def test_need_to_run_task_not_skippable():
+    hello_cfg = Config.configure_data_node("hello", default_data="Hello ")
+    world_cfg = Config.configure_data_node("world", default_data="world !")
+    hello_world_cfg = Config.configure_data_node("hello_world")
+    task_cfg = Config.configure_task(
+        "name", input=[hello_cfg, world_cfg], function=nothing, output=[hello_world_cfg], skippable=False
+    )
+    task = _create_task_from_config(task_cfg)
+
+    assert _JobDispatcher(_OrchestratorFactory._build_orchestrator())._needs_to_run(task)
+
+
+def test_need_to_run_skippable_task_no_input():
+    hello_world_cfg = Config.configure_data_node("hello_world")
+    task_cfg = Config.configure_task("name", input=[], function=nothing, output=[hello_world_cfg], skippable=True)
+    task = _create_task_from_config(task_cfg)
+    dispatcher = _JobDispatcher(_OrchestratorFactory._build_orchestrator())
+    assert dispatcher._needs_to_run(task)  # output data is not written
+    task.output["hello_world"].write("Hello world !")
+    assert not dispatcher._needs_to_run(task)  # output data is written
+
+
+def test_need_to_run_skippable_task_no_validity_period_on_output():
+    hello_cfg = Config.configure_data_node("hello", default_data="Hello ")
+    output_cfg = Config.configure_data_node("output")
+    task_cfg = Config.configure_task("name", input=[hello_cfg], function=nothing, output=[output_cfg], skippable=True)
+    task = _create_task_from_config(task_cfg)
+    dispatcher = _JobDispatcher(_OrchestratorFactory._build_orchestrator())
+    assert dispatcher._needs_to_run(task)  # output data is not written
+    task.output["output"].write("Hello world !")
+    assert not dispatcher._needs_to_run(task)  # output data is written
+
+
+def test_need_to_run_skippable_task_with_validity_period_on_output():
+    hello_cfg = Config.configure_data_node("hello", default_data="Hello ")
+    hello_world_cfg = Config.configure_data_node("output", validity_period=timedelta(days=1))
+    task_cfg = Config.configure_task("name", nothing, [hello_cfg], [hello_world_cfg], skippable=True)
+    task = _create_task_from_config(task_cfg)
+    dispatcher = _JobDispatcher(_OrchestratorFactory._build_orchestrator())
+
+    assert dispatcher._needs_to_run(task)  # output data is not edited
+
+    output_edit_time = datetime.now()  # edit time
+    with freezegun.freeze_time(output_edit_time):
+        task.output["output"].write("Hello world !")  # output data is edited
+
+    with freezegun.freeze_time(output_edit_time + timedelta(minutes=30)):  # 30 min after edit time
+        assert not dispatcher._needs_to_run(task)  # output data is written and validity period not expired
+
+    with freezegun.freeze_time(output_edit_time + timedelta(days=1, seconds=1)):  # 1 day and 1 second after edit time
+        assert dispatcher._needs_to_run(task)  # output data is written but validity period expired
+
+
+def test_need_to_run_skippable_task_but_input_edited_after_output():
+    hello_cfg = Config.configure_data_node("input", default_data="Hello ")
+    hello_world_cfg = Config.configure_data_node("output")
+    task_cfg = Config.configure_task("name", nothing, [hello_cfg], [hello_world_cfg], skippable=True)
+    task = _create_task_from_config(task_cfg)
+    dispatcher = _JobDispatcher(_OrchestratorFactory._build_orchestrator())
+    output_edit_time = datetime.now()
+    with freezegun.freeze_time(output_edit_time):
+        task.data_nodes["output"].write("Hello world !")  # output data is edited at output_edit_time
+
+    with freezegun.freeze_time(output_edit_time + timedelta(minutes=30)):  # 30 min after output_edit_time
+        task.data_nodes["input"].write("Yellow !")
+        assert dispatcher._needs_to_run(task)  # output data is written but validity period expired

+ 61 - 0
tests/core/_orchestrator/_dispatcher/test_dispatcher__update_job_status.py

@@ -0,0 +1,61 @@
+# Copyright 2023 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+import traceback
+
+from taipy import Job, Task, Status, JobId
+from taipy.core._orchestrator._dispatcher import _JobDispatcher
+from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
+from taipy.core.job._job_manager_factory import _JobManagerFactory
+from taipy.core.task._task_manager_factory import _TaskManagerFactory
+
+
+def nothing(*args):
+    pass
+
+
+def test_update_job_status_no_exception():
+    task = Task("config_id", {}, nothing)
+    _TaskManagerFactory._build_manager()._set(task)
+    job = Job(JobId("id"), task, "s_id", task.id)
+    _JobManagerFactory._build_manager()._set(job)
+
+    _JobDispatcher(_OrchestratorFactory._orchestrator)._update_job_status(job, None)
+
+    assert job.status == Status.COMPLETED
+    assert job.stacktrace == []
+
+
+def test_update_job_status_with_one_exception():
+    task = Task("config_id", {}, nothing)
+    _TaskManagerFactory._build_manager()._set(task)
+    job = Job(JobId("id"), task, "s_id", task.id)
+    _JobManagerFactory._build_manager()._set(job)
+    e = Exception("test")
+    _JobDispatcher(_OrchestratorFactory._orchestrator)._update_job_status(job, [e])
+
+    assert job.status == Status.FAILED
+    assert len(job.stacktrace) == 1
+    assert job.stacktrace[0] == "".join(traceback.format_exception(type(e), value=e, tb=e.__traceback__))
+
+
+def test_update_job_status_with_exceptions():
+    task = Task("config_id", {}, nothing)
+    _TaskManagerFactory._build_manager()._set(task)
+    job = Job(JobId("id"), task, "s_id", task.id)
+    _JobManagerFactory._build_manager()._set(job)
+    e_1 = Exception("test1")
+    e_2 = Exception("test2")
+    _JobDispatcher(_OrchestratorFactory._orchestrator)._update_job_status(job, [e_1, e_2])
+
+    assert job.status == Status.FAILED
+    assert len(job.stacktrace) == 2
+    assert job.stacktrace[0] == "".join(traceback.format_exception(type(e_1), value=e_1, tb=e_1.__traceback__))
+    assert job.stacktrace[1] == "".join(traceback.format_exception(type(e_2), value=e_2, tb=e_2.__traceback__))

+ 134 - 0
tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py

@@ -0,0 +1,134 @@
+# Copyright 2023 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+import time
+from concurrent.futures import ProcessPoolExecutor, Future
+from unittest import mock
+from unittest.mock import call
+
+from taipy import Config
+from taipy.config._serializer._toml_serializer import _TomlSerializer
+from taipy.core import JobId
+from taipy.core._orchestrator._dispatcher import _StandaloneJobDispatcher
+from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
+from taipy.core.job._job_manager_factory import _JobManagerFactory
+from taipy.core.job.job import Job
+from taipy.core.task._task_manager_factory import _TaskManagerFactory
+from taipy.core.task.task import Task
+from tests.core._orchestrator._dispatcher.mock_standalone_dispatcher import MockStandaloneDispatcher
+from tests.core.utils import assert_true_after_time
+
+
+def nothing(*args):
+    return
+
+
+def create_task():
+    task = Task("config_id", {}, nothing, [], [])
+    _TaskManagerFactory._build_manager()._set(task)
+    return task
+
+
+def test_init_default():
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+    job_dispatcher = _StandaloneJobDispatcher(orchestrator)
+
+    assert job_dispatcher.orchestrator == orchestrator
+    assert job_dispatcher.lock == orchestrator.lock
+    assert job_dispatcher._nb_available_workers == 1
+    assert isinstance(job_dispatcher._executor, ProcessPoolExecutor)
+
+
+def test_init_with_nb_workers():
+    Config.configure_job_executions(max_nb_of_workers=2)
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+    job_dispatcher = _StandaloneJobDispatcher(orchestrator)
+
+    assert job_dispatcher._nb_available_workers == 2
+
+
+def test_dispatch_job():
+    task = create_task()
+    job = Job(JobId("job"), task, "s_id", task.id)
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+    dispatcher = MockStandaloneDispatcher(orchestrator)
+
+    dispatcher._dispatch(job)
+
+    # test that the job execution is submitted to the executor
+    assert len(dispatcher.dispatch_calls) == 1
+    assert len(dispatcher._executor.submit_called) == 1
+    submit_first_call = dispatcher._executor.submit_called[0]
+    assert submit_first_call[0].job_id == job.id
+    assert submit_first_call[0].task == task
+    assert submit_first_call[1] == ()
+    assert submit_first_call[2]["config_as_string"] == _TomlSerializer()._serialize(Config._applied_config)
+
+    # test that the proc of the job is added to the list of dispatched jobs
+    assert len(dispatcher.set_dispatch_processes_calls) == 1
+    assert dispatcher.set_dispatch_processes_calls[0][0] == job.id
+    assert dispatcher.set_dispatch_processes_calls[0][1] == dispatcher._executor.f[0]
+
+    # test that the worker is released after the job is done
+    assert len(dispatcher.release_worker_calls) == 1
+
+    # test that the job status is updated after execution on future
+    assert len(dispatcher.update_job_status_from_future_calls) == 1
+    assert dispatcher.update_job_status_from_future_calls[0][0] == job
+    assert dispatcher.update_job_status_from_future_calls[0][1] == dispatcher._executor.f[0]
+
+
+def test_release_worker():
+    dispatcher = _StandaloneJobDispatcher(_OrchestratorFactory._orchestrator)
+
+    assert dispatcher._nb_available_workers == 1
+    dispatcher._release_worker(None)
+    assert dispatcher._nb_available_workers == 2
+    dispatcher._release_worker(None)
+    assert dispatcher._nb_available_workers == 3
+
+
+def test_update_job_status_from_future():
+    task = create_task()
+    job = Job(JobId("job"), task, "s_id", task.id)
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+    dispatcher = _StandaloneJobDispatcher(orchestrator)
+    ft = Future()
+    ft.set_result(None)
+    dispatcher._set_dispatched_processes(job.id, ft)  # the job is dispatched to a process
+
+    dispatcher._update_job_status_from_future(job, ft)
+
+    assert len(dispatcher._dispatched_processes) == 0  # the job process is not stored anymore
+    assert job.is_completed()
+
+
+def test_run():
+    task = create_task()
+    job_1 = Job(JobId("job1"), task, "s_id", task.id)
+    job_2 = Job(JobId("job2"), task, "s_id", task.id)
+    job_3 = Job(JobId("job3"), task, "s_id", task.id)
+    job_4 = Job(JobId("job4"), task, "s_id", task.id)
+    _JobManagerFactory._build_manager()._set(job_1)
+    _JobManagerFactory._build_manager()._set(job_2)
+    _JobManagerFactory._build_manager()._set(job_3)
+    _JobManagerFactory._build_manager()._set(job_4)
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+    orchestrator.jobs_to_run.put(job_1)
+    orchestrator.jobs_to_run.put(job_2)
+    orchestrator.jobs_to_run.put(job_3)
+    orchestrator.jobs_to_run.put(job_4)
+
+    with mock.patch("taipy.core._orchestrator._dispatcher._job_dispatcher._JobDispatcher._execute_job") as mck:
+        dispatcher = _StandaloneJobDispatcher(orchestrator)
+        dispatcher.start()
+        assert_true_after_time(lambda: mck.call_count == 4, msg="The 4 jobs were not dequeued.", time=5)
+        dispatcher.stop()
+        mck.assert_has_calls([call(job_1), call(job_2), call(job_3), call(job_4)])

+ 133 - 0
tests/core/_orchestrator/_dispatcher/test_task_function_wrapper.py

@@ -0,0 +1,133 @@
+# Copyright 2023 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import random
+import string
+
+from taipy.config import Config
+from taipy.config._serializer._toml_serializer import _TomlSerializer
+from taipy.config.common.scope import Scope
+from taipy.config.exceptions import ConfigurationUpdateBlocked
+from taipy.core._orchestrator._dispatcher._task_function_wrapper import _TaskFunctionWrapper
+from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
+from taipy.core.data._data_manager import _DataManager
+from taipy.core.task.task import Task
+
+
+def _create_task(function, nb_outputs=1):
+    output_dn_config_id = "".join(random.choice(string.ascii_lowercase) for _ in range(10))
+    dn_input_configs = [
+        Config.configure_data_node("input1", "pickle", Scope.SCENARIO, default_data=21),
+        Config.configure_data_node("input2", "pickle", Scope.SCENARIO, default_data=2),
+    ]
+    dn_output_configs = [
+        Config.configure_data_node(f"{output_dn_config_id}_output{i}", "pickle", Scope.SCENARIO, default_data=0)
+        for i in range(nb_outputs)
+    ]
+    input_dn = _DataManager._bulk_get_or_create(dn_input_configs).values()
+    output_dn = _DataManager._bulk_get_or_create(dn_output_configs).values()
+    return Task(
+        output_dn_config_id,
+        {},
+        function=function,
+        input=input_dn,
+        output=output_dn,
+    )
+
+
+def multiply(nb1: float, nb2: float):
+    return nb1 * nb2
+
+
+def test_execute_task_that_return_multiple_outputs():
+    def return_2tuple(nb1, nb2):
+        return multiply(nb1, nb2), multiply(nb1, nb2) / 2
+
+    def return_list(nb1, nb2):
+        return [multiply(nb1, nb2), multiply(nb1, nb2) / 2]
+
+    with_tuple = _create_task(return_2tuple, 2)
+    with_list = _create_task(return_list, 2)
+    _TaskFunctionWrapper("job_id_tuple", with_tuple).execute()
+    _TaskFunctionWrapper("job_id_list", with_list).execute()
+
+    assert (
+        with_tuple.output[f"{with_tuple.config_id}_output0"].read()
+        == with_list.output[f"{with_list.config_id}_output0"].read()
+        == 42
+    )
+    assert (
+        with_tuple.output[f"{with_tuple.config_id}_output1"].read()
+        == with_list.output[f"{with_list.config_id}_output1"].read()
+        == 21
+    )
+
+
+def test_execute_task_that_returns_single_iterable_output():
+    def return_2tuple(nb1, nb2):
+        return multiply(nb1, nb2), multiply(nb1, nb2) / 2
+
+    def return_list(nb1, nb2):
+        return [multiply(nb1, nb2), multiply(nb1, nb2) / 2]
+
+    task_with_tuple = _create_task(return_2tuple, 1)
+    task_with_list = _create_task(return_list, 1)
+    _TaskFunctionWrapper("job_id_tuple", task_with_tuple).execute()
+    _TaskFunctionWrapper("job_id_list", task_with_list).execute()
+
+    assert task_with_tuple.output[f"{task_with_tuple.config_id}_output0"].read() == (42, 21)
+    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
+    assert task_with_list.output[f"{task_with_list.config_id}_output0"].read() == [42, 21]
+    assert len(_OrchestratorFactory._dispatcher._dispatched_processes) == 0
+
+
+def test_data_node_not_written_due_to_wrong_result_nb():
+    def fct_2_outputs():
+        return lambda nb1, nb2: (multiply(nb1, nb2), multiply(nb1, nb2) / 2)
+
+    task_expecting_3_outputs = _create_task(fct_2_outputs, 3)
+
+    exceptions = _TaskFunctionWrapper("job_id", task_expecting_3_outputs).execute()
+
+    assert len(exceptions) == 1
+    assert isinstance(exceptions[0], Exception)
+
+
+def test_cannot_exec_task_that_update_config():
+    def update_config_fct(n, m):
+        from taipy.config import Config
+
+        Config.core.storage_folder = ".new_storage_folder/"
+        return n * m
+
+    task_updating_cfg = _create_task(update_config_fct)
+    cfg_as_str = _TomlSerializer()._serialize(Config._applied_config)
+    res = _TaskFunctionWrapper("job_id", task_updating_cfg).execute(config_as_string=cfg_as_str)
+
+    assert len(res) == 1
+    assert isinstance(res[0], ConfigurationUpdateBlocked)
+
+
+def test_can_execute_task_with_a_modified_config():
+    def assert_config_is_correct_after_serialization(n, m):
+        from taipy.config import Config
+
+        assert Config.core.storage_folder == ".my_data/"
+        assert Config.core.custom_property == "custom_property"
+        return n * m
+
+    Config.configure_core(storage_folder=".my_data/", custom_property="custom_property")
+
+    task_asserting_cfg_is_correct = _create_task(assert_config_is_correct_after_serialization)
+    cfg_as_str = _TomlSerializer()._serialize(Config._applied_config)
+    res = _TaskFunctionWrapper("job_id", task_asserting_cfg_is_correct).execute(config_as_string=cfg_as_str)
+
+    assert len(res) == 0  # no exception raised so the asserts in the fct passed