Ver Fonte

Fix - Add wait when stop dispatcher (#801)

* fix: add wait when stop dispatcher to make sure the thread is fully terminated

* fix: remove unnecessary calls in tests

* fix: remove internal imports from the assert_true_after_time()

* fix: stop the dispatcher in the core conftest
Đỗ Trường Giang há 1 ano atrás
pai
commit
07a0fbc500

+ 6 - 3
taipy/core/_core.py

@@ -66,16 +66,19 @@ class Core:
 
         self.__start_dispatcher(force_restart)
 
-    def stop(self):
+    def stop(self, wait: bool = True, timeout: Optional[float] = None):
         """
         Stop the Core service.
-
         This function stops the dispatcher and unblock the Config for update.
+
+        Parameters:
+            wait (bool): If True, the method will wait for the dispatcher to stop.
+            timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely.
         """
         Config.unblock_update()
 
         if self._dispatcher:
-            self._dispatcher = _OrchestratorFactory._remove_dispatcher()
+            self._dispatcher = _OrchestratorFactory._remove_dispatcher(wait, timeout)
             self.__logger.info("Core service has been stopped.")
 
         with self.__class__.__lock_is_running:

+ 4 - 1
taipy/core/_orchestrator/_dispatcher/_development_job_dispatcher.py

@@ -8,6 +8,9 @@
 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
+
+from typing import Optional
+
 from ...job.job import Job
 from .._abstract_orchestrator import _AbstractOrchestrator
 from ._job_dispatcher import _JobDispatcher
@@ -26,7 +29,7 @@ class _DevelopmentJobDispatcher(_JobDispatcher):
     def is_running(self) -> bool:
         return True
 
-    def stop(self):
+    def stop(self, wait: bool = True, timeout: Optional[float] = None):
         raise NotImplementedError
 
     def run(self):

+ 13 - 5
taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py

@@ -12,7 +12,7 @@
 import threading
 from abc import abstractmethod
 from queue import Empty
-from typing import Dict
+from typing import Dict, Optional
 
 from taipy.config.config import Config
 from taipy.logger._taipy_logger import _TaipyLogger
@@ -47,12 +47,20 @@ class _JobDispatcher(threading.Thread):
         """Return True if the dispatcher is running"""
         return self.is_alive()
 
-    def stop(self):
-        """Stop the dispatcher"""
+    def stop(self, wait: bool = True, timeout: Optional[float] = None):
+        """Stop the dispatcher.
+
+        Parameters:
+            wait (bool): If True, the method will wait for the dispatcher to stop.
+            timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely.
+        """
         self._STOP_FLAG = True
+        if wait and self.is_alive():
+            self._logger.debug("Waiting for the dispatcher thread to stop...")
+            self.join(timeout=timeout)
 
     def run(self):
-        _TaipyLogger._get_logger().info("Start job dispatcher...")
+        self._logger.info("Start job dispatcher...")
         while not self._STOP_FLAG:
             try:
                 if self._can_execute():
@@ -64,7 +72,7 @@ class _JobDispatcher(threading.Thread):
             except Empty:  # In case the last job of the queue has been removed.
                 pass
             except Exception as e:
-                _TaipyLogger._get_logger().exception(e)
+                self._logger.exception(e)
                 pass
         self._logger.info("Job dispatcher stopped.")
 

+ 2 - 1
taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py

@@ -8,6 +8,7 @@
 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
+
 from concurrent.futures import Executor, ProcessPoolExecutor
 from functools import partial
 from typing import Callable, Optional
@@ -29,7 +30,7 @@ class _StandaloneJobDispatcher(_JobDispatcher):
         max_workers = Config.job_config.max_nb_of_workers or 1
         self._executor: Executor = ProcessPoolExecutor(
             max_workers=max_workers,
-            initializer=subproc_initializer
+            initializer=subproc_initializer,
         )  # type: ignore
         self._nb_available_workers = self._executor._max_workers  # type: ignore
 

+ 2 - 2
taipy/core/_orchestrator/_orchestrator_factory.py

@@ -61,9 +61,9 @@ class _OrchestratorFactory:
         return cls._dispatcher
 
     @classmethod
-    def _remove_dispatcher(cls) -> Optional[_JobDispatcher]:
+    def _remove_dispatcher(cls, wait: bool = True, timeout: Optional[float] = None) -> None:
         if cls._dispatcher is not None and not isinstance(cls._dispatcher, _DevelopmentJobDispatcher):
-            cls._dispatcher.stop()
+            cls._dispatcher.stop(wait, timeout)
         cls._dispatcher = None
         return cls._dispatcher
 

+ 5 - 5
tests/core/_orchestrator/test_orchestrator.py

@@ -57,7 +57,7 @@ def test_submit_task_multithreading_multiple_task():
     task_1 = _create_task(partial(lock_multiply, lock_1))
     task_2 = _create_task(partial(lock_multiply, lock_2))
 
-    _OrchestratorFactory._build_dispatcher()
+    _OrchestratorFactory._build_dispatcher(force_restart=True)
 
     with lock_1:
         with lock_2:
@@ -104,7 +104,7 @@ def test_submit_submittable_multithreading_multiple_task():
 
     scenario = Scenario("scenario_config", [task_1, task_2], {})
 
-    _OrchestratorFactory._build_dispatcher()
+    _OrchestratorFactory._build_dispatcher(force_restart=True)
 
     with lock_1:
         with lock_2:
@@ -146,7 +146,7 @@ def test_submit_task_multithreading_multiple_task_in_sync_way_to_check_job_statu
     task_1 = _create_task(partial(lock_multiply, lock_1))
     task_2 = _create_task(partial(lock_multiply, lock_2))
 
-    _OrchestratorFactory._build_dispatcher()
+    _OrchestratorFactory._build_dispatcher(force_restart=True)
 
     with lock_0:
         submission_0 = _Orchestrator.submit_task(task_0)
@@ -215,7 +215,7 @@ def test_blocked_task():
     bar_cfg = Config.configure_data_node("bar")
     baz_cfg = Config.configure_data_node("baz")
 
-    _OrchestratorFactory._build_dispatcher()
+    _OrchestratorFactory._build_dispatcher(force_restart=True)
 
     dns = _DataManager._bulk_get_or_create([foo_cfg, bar_cfg, baz_cfg])
     foo = dns[foo_cfg]
@@ -273,7 +273,7 @@ def test_blocked_submittable():
     bar_cfg = Config.configure_data_node("bar")
     baz_cfg = Config.configure_data_node("baz")
 
-    _OrchestratorFactory._build_dispatcher()
+    _OrchestratorFactory._build_dispatcher(force_restart=True)
 
     dns = _DataManager._bulk_get_or_create([foo_cfg, bar_cfg, baz_cfg])
     foo = dns[foo_cfg]

+ 6 - 3
tests/core/conftest.py

@@ -61,7 +61,6 @@ from taipy.core.task._task_manager_factory import _TaskManagerFactory
 from taipy.core.task.task import Task
 
 current_time = datetime.now()
-_OrchestratorFactory._build_orchestrator()
 
 
 @pytest.fixture(scope="function")
@@ -369,9 +368,11 @@ def init_managers():
 @pytest.fixture
 def init_orchestrator():
     def _init_orchestrator():
+        _OrchestratorFactory._remove_dispatcher()
+
         if _OrchestratorFactory._orchestrator is None:
             _OrchestratorFactory._build_orchestrator()
-        _OrchestratorFactory._build_dispatcher()
+        _OrchestratorFactory._build_dispatcher(force_restart=True)
         _OrchestratorFactory._orchestrator.jobs_to_run = Queue()
         _OrchestratorFactory._orchestrator.blocked_jobs = []
 
@@ -392,7 +393,7 @@ def sql_engine():
 
 
 @pytest.fixture
-def init_sql_repo(tmp_sqlite):
+def init_sql_repo(tmp_sqlite, init_managers):
     Config.configure_core(repository_type="sql", repository_properties={"db_location": tmp_sqlite})
 
     # Clean SQLite database
@@ -401,4 +402,6 @@ def init_sql_repo(tmp_sqlite):
         _SQLConnection._connection = None
     _SQLConnection.init_db()
 
+    init_managers()
+
     return tmp_sqlite

+ 0 - 6
tests/core/cycle/test_cycle_manager.py

@@ -14,8 +14,6 @@ from datetime import datetime
 from taipy.config.common.frequency import Frequency
 from taipy.config.common.scope import Scope
 from taipy.config.config import Config
-from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
-from taipy.core.config.job_config import JobConfig
 from taipy.core.cycle._cycle_manager import _CycleManager
 from taipy.core.cycle.cycle import Cycle
 from taipy.core.cycle.cycle_id import CycleId
@@ -190,8 +188,6 @@ def test_get_cycle_start_date_and_end_date():
 
 
 def test_hard_delete_shared_entities():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     dn_config_1 = Config.configure_data_node("my_input_1", "pickle", scope=Scope.SCENARIO, default_data="testing")
     dn_config_2 = Config.configure_data_node("my_input_2", "pickle", scope=Scope.SCENARIO, default_data="testing")
     dn_config_3 = Config.configure_data_node("my_input_3", "pickle", scope=Scope.CYCLE, default_data="testing")
@@ -219,8 +215,6 @@ def test_hard_delete_shared_entities():
     scenario_config_2 = Config.configure_scenario("scenario_config_2", [task_config_2, task_config_3])
     scenario_config_2.add_sequences({"sequence_3": [task_config_3]})
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario_1 = _ScenarioManager._create(scenario_config_1)
     scenario_2 = _ScenarioManager._create(scenario_config_1)
     scenario_3 = _ScenarioManager._create(scenario_config_2)

+ 0 - 5
tests/core/cycle/test_cycle_manager_with_sql_repo.py

@@ -14,8 +14,6 @@ from datetime import datetime
 from taipy.config.common.frequency import Frequency
 from taipy.config.common.scope import Scope
 from taipy.config.config import Config
-from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
-from taipy.core.config.job_config import JobConfig
 from taipy.core.cycle._cycle_manager import _CycleManager
 from taipy.core.cycle._cycle_manager_factory import _CycleManagerFactory
 from taipy.core.cycle.cycle import Cycle
@@ -199,7 +197,6 @@ def test_get_cycle_start_date_and_end_date(init_sql_repo):
 
 
 def test_hard_delete_shared_entities(init_sql_repo):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
     _ScenarioManager._repository = _ScenarioManagerFactory._build_repository()
 
     dn_config_1 = Config.configure_data_node("my_input_1", "in_memory", scope=Scope.SCENARIO, default_data="testing")
@@ -228,8 +225,6 @@ def test_hard_delete_shared_entities(init_sql_repo):
     )  # No Frequency so cycle attached to scenarios
     scenario_config_2.add_sequences({"sequence_3": [task_config_3]})
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario_1 = _ScenarioManager._create(scenario_config_1)
     scenario_2 = _ScenarioManager._create(scenario_config_1)
     scenario_3 = _ScenarioManager._create(scenario_config_2)

+ 0 - 28
tests/core/data/test_data_manager_with_sql_repo.py

@@ -19,7 +19,6 @@ from taipy.config.config import Config
 from taipy.core._version._version_manager import _VersionManager
 from taipy.core.config.data_node_config import DataNodeConfig
 from taipy.core.data._data_manager import _DataManager
-from taipy.core.data._data_manager_factory import _DataManagerFactory
 from taipy.core.data.csv import CSVDataNode
 from taipy.core.data.data_node_id import DataNodeId
 from taipy.core.data.in_memory import InMemoryDataNode
@@ -30,14 +29,8 @@ def file_exists(file_path: str) -> bool:
     return os.path.exists(file_path)
 
 
-def init_managers():
-    _DataManagerFactory._build_manager()._delete_all()
-
-
 class TestDataManager:
     def test_create_data_node_and_modify_properties_does_not_modify_config(self, init_sql_repo):
-        init_managers()
-
         dn_config = Config.configure_data_node(id="name", foo="bar")
         dn = _DataManager._create_and_set(dn_config, None, None)
         assert dn_config.properties.get("foo") == "bar"
@@ -51,23 +44,17 @@ class TestDataManager:
         assert dn.properties.get("baz") == "qux"
 
     def test_create_raises_exception_with_wrong_type(self, init_sql_repo):
-        init_managers()
-
         wrong_type_dn_config = DataNodeConfig(id="foo", storage_type="bar", scope=DataNodeConfig._DEFAULT_SCOPE)
         with pytest.raises(InvalidDataNodeType):
             _DataManager._create_and_set(wrong_type_dn_config, None, None)
 
     def test_create_from_same_config_generates_new_data_node_and_new_id(self, init_sql_repo):
-        init_managers()
-
         dn_config = Config.configure_data_node(id="foo", storage_type="in_memory")
         dn = _DataManager._create_and_set(dn_config, None, None)
         dn_2 = _DataManager._create_and_set(dn_config, None, None)
         assert dn_2.id != dn.id
 
     def test_create_uses_overridden_attributes_in_config_file(self, init_sql_repo):
-        init_managers()
-
         Config.override(os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/config.toml"))
 
         csv_dn_cfg = Config.configure_data_node(id="foo", storage_type="csv", path="bar", has_header=True)
@@ -85,14 +72,10 @@ class TestDataManager:
         assert csv_dn.has_header
 
     def test_get_if_not_exists(self, init_sql_repo):
-        init_managers()
-
         with pytest.raises(ModelNotFound):
             _DataManager._repository._load("test_data_node_2")
 
     def test_get_all(self, init_sql_repo):
-        init_managers()
-
         _DataManager._delete_all()
         assert len(_DataManager._get_all()) == 0
         dn_config_1 = Config.configure_data_node(id="foo", storage_type="in_memory")
@@ -106,8 +89,6 @@ class TestDataManager:
         assert len([dn for dn in _DataManager._get_all() if dn.config_id == "baz"]) == 2
 
     def test_get_all_on_multiple_versions_environment(self, init_sql_repo):
-        init_managers()
-
         # Create 5 data nodes with 2 versions each
         # Only version 1.0 has the data node with config_id = "config_id_1"
         # Only version 2.0 has the data node with config_id = "config_id_6"
@@ -143,8 +124,6 @@ class TestDataManager:
         assert len(_DataManager._get_all_by(filters=[{"version": "2.0", "config_id": "config_id_6"}])) == 1
 
     def test_set(self, init_sql_repo):
-        init_managers()
-
         dn = InMemoryDataNode(
             "config_id",
             Scope.SCENARIO,
@@ -171,7 +150,6 @@ class TestDataManager:
         assert _DataManager._get(dn.id).config_id == "foo"
 
     def test_delete(self, init_sql_repo):
-        init_managers()
         _DataManager._delete_all()
 
         dn_1 = InMemoryDataNode("config_id", Scope.SCENARIO, id="id_1")
@@ -198,8 +176,6 @@ class TestDataManager:
         def _get_or_create_dn(config, *args):
             return _DataManager._bulk_get_or_create([config], *args)[config]
 
-        init_managers()
-
         global_dn_config = Config.configure_data_node(
             id="test_data_node", storage_type="in_memory", scope=Scope.GLOBAL, data="In memory Data Node"
         )
@@ -259,8 +235,6 @@ class TestDataManager:
         assert cycle_dn_4.id == cycle_dn_5.id
 
     def test_get_data_nodes_by_config_id(self, init_sql_repo):
-        init_managers()
-
         dn_config_1 = Config.configure_data_node("dn_1", scope=Scope.SCENARIO)
         dn_config_2 = Config.configure_data_node("dn_2", scope=Scope.SCENARIO)
         dn_config_3 = Config.configure_data_node("dn_3", scope=Scope.SCENARIO)
@@ -290,8 +264,6 @@ class TestDataManager:
         assert sorted([dn_3_1.id]) == sorted([sequence.id for sequence in dn_3_datanodes])
 
     def test_get_data_nodes_by_config_id_in_multiple_versions_environment(self, init_sql_repo):
-        init_managers()
-
         dn_config_1 = Config.configure_data_node("dn_1", scope=Scope.SCENARIO)
         dn_config_2 = Config.configure_data_node("dn_2", scope=Scope.SCENARIO)
 

+ 0 - 6
tests/core/data/test_data_node.py

@@ -20,8 +20,6 @@ import taipy.core as tp
 from taipy.config import Config
 from taipy.config.common.scope import Scope
 from taipy.config.exceptions.exceptions import InvalidConfigurationId
-from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
-from taipy.core.config.job_config import JobConfig
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.data.data_node import DataNode
 from taipy.core.data.data_node_id import DataNodeId
@@ -355,8 +353,6 @@ class TestDataNode:
         assert not dn_3.is_up_to_date
 
     def test_do_not_recompute_data_node_valid_but_continue_sequence_execution(self):
-        Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
         a = Config.configure_data_node("A", "pickle", default_data="A")
         b = Config.configure_data_node("B", "pickle")
         c = Config.configure_data_node("C", "pickle")
@@ -367,8 +363,6 @@ class TestDataNode:
         task_b_d = Config.configure_task("task_b_d", funct_b_d, input=b, output=d)
         scenario_cfg = Config.configure_scenario("scenario", [task_a_b, task_b_c, task_b_d])
 
-        _OrchestratorFactory._build_dispatcher()
-
         scenario = tp.create_scenario(scenario_cfg)
         scenario.submit()
         assert scenario.A.read() == "A"

+ 8 - 9
tests/core/job/test_job.py

@@ -305,9 +305,16 @@ def test_auto_set_and_reload(current_datetime, job_id):
     assert not job_1._is_in_context
 
 
+def test_is_deletable():
+    with mock.patch("taipy.core.job._job_manager._JobManager._is_deletable") as mock_submit:
+        task = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1"))
+        job = Job(job_id, task, "submit_id_1", "scenario_entity_id")
+        job.is_deletable()
+        mock_submit.assert_called_once_with(job)
+
+
 def _dispatch(task: Task, job: Job, mode=JobConfig._DEVELOPMENT_MODE):
     Config.configure_job_executions(mode=mode)
-    _OrchestratorFactory._build_dispatcher()
     _TaskManager._set(task)
     _JobManager._set(job)
     dispatcher: Union[_StandaloneJobDispatcher, _DevelopmentJobDispatcher] = _StandaloneJobDispatcher(
@@ -316,11 +323,3 @@ def _dispatch(task: Task, job: Job, mode=JobConfig._DEVELOPMENT_MODE):
     if mode == JobConfig._DEVELOPMENT_MODE:
         dispatcher = _DevelopmentJobDispatcher(cast(_AbstractOrchestrator, _OrchestratorFactory._orchestrator))
     dispatcher._dispatch(job)
-
-
-def test_is_deletable():
-    with mock.patch("taipy.core.job._job_manager._JobManager._is_deletable") as mock_submit:
-        task = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1"))
-        job = Job(job_id, task, "submit_id_1", "scenario_entity_id")
-        job.is_deletable()
-        mock_submit.assert_called_once_with(job)

+ 1 - 10
tests/core/job/test_job_manager.py

@@ -50,8 +50,6 @@ def test_create_jobs():
     Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
     task = _create_task(multiply, name="get_job")
 
-    _OrchestratorFactory._build_dispatcher()
-
     job_1 = _JobManager._create(task, [print], "submit_id", "secnario_id", True)
     assert _JobManager._get(job_1.id) == job_1
     assert job_1.is_submitted()
@@ -78,8 +76,6 @@ def test_get_job():
 
     task = _create_task(multiply, name="get_job")
 
-    _OrchestratorFactory._build_dispatcher()
-
     job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert _JobManager._get(job_1.id) == job_1
     assert _JobManager._get(job_1.id).submit_entity_id == task.id
@@ -97,8 +93,6 @@ def test_get_latest_job():
     task = _create_task(multiply, name="get_latest_job")
     task_2 = _create_task(multiply, name="get_latest_job_2")
 
-    _OrchestratorFactory._build_dispatcher()
-
     job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert _JobManager._get_latest(task) == job_1
     assert _JobManager._get_latest(task_2) is None
@@ -123,8 +117,6 @@ def test_get_jobs():
 
     task = _create_task(multiply, name="get_all_jobs")
 
-    _OrchestratorFactory._build_dispatcher()
-
     job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
@@ -136,8 +128,6 @@ def test_delete_job():
 
     task = _create_task(multiply, name="delete_job")
 
-    _OrchestratorFactory._build_dispatcher()
-
     job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
@@ -479,6 +469,7 @@ def test_is_deletable():
     assert not _JobManager._is_deletable(job)
     assert not _JobManager._is_deletable(job.id)
 
+
 def _create_task(function, nb_outputs=1, name=None):
     input1_dn_config = Config.configure_data_node("input1", "pickle", Scope.SCENARIO, default_data=21)
     input2_dn_config = Config.configure_data_node("input2", "pickle", Scope.SCENARIO, default_data=2)

+ 0 - 29
tests/core/job/test_job_manager_with_sql_repo.py

@@ -28,11 +28,9 @@ from taipy.core.data._data_manager import _DataManager
 from taipy.core.data._data_manager_factory import _DataManagerFactory
 from taipy.core.exceptions.exceptions import JobNotDeletedException
 from taipy.core.job._job_manager import _JobManager
-from taipy.core.job._job_manager_factory import _JobManagerFactory
 from taipy.core.job.job_id import JobId
 from taipy.core.job.status import Status
 from taipy.core.task._task_manager import _TaskManager
-from taipy.core.task._task_manager_factory import _TaskManagerFactory
 from tests.core.utils import assert_true_after_time
 
 
@@ -45,20 +43,11 @@ def lock_multiply(lock, nb1: float, nb2: float):
         return multiply(nb1 or 1, nb2 or 2)
 
 
-def init_managers():
-    _TaskManagerFactory._build_manager()._delete_all()
-    _DataManagerFactory._build_manager()._delete_all()
-    _JobManagerFactory._build_manager()._delete_all()
-
-
 def test_create_jobs(init_sql_repo):
     Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    init_managers()
 
     task = _create_task(multiply, name="get_job")
 
-    _OrchestratorFactory._build_dispatcher()
-
     job_1 = _JobManager._create(task, [print], "submit_id", "secnario_id", True)
     assert _JobManager._get(job_1.id) == job_1
     assert job_1.is_submitted()
@@ -80,12 +69,9 @@ def test_create_jobs(init_sql_repo):
 
 def test_get_job(init_sql_repo):
     Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    init_managers()
 
     task = _create_task(multiply, name="get_job")
 
-    _OrchestratorFactory._build_dispatcher()
-
     job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert _JobManager._get(job_1.id) == job_1
     assert _JobManager._get(job_1.id).submit_entity_id == task.id
@@ -99,13 +85,10 @@ def test_get_job(init_sql_repo):
 
 def test_get_latest_job(init_sql_repo):
     Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    init_managers()
 
     task = _create_task(multiply, name="get_latest_job")
     task_2 = _create_task(multiply, name="get_latest_job_2")
 
-    _OrchestratorFactory._build_dispatcher()
-
     job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     assert _JobManager._get_latest(task) == job_1
     assert _JobManager._get_latest(task_2) is None
@@ -122,18 +105,14 @@ def test_get_latest_job(init_sql_repo):
 
 
 def test_get_job_unknown(init_sql_repo):
-    init_managers()
     assert _JobManager._get(JobId("Unknown")) is None
 
 
 def test_get_jobs(init_sql_repo):
     Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    init_managers()
 
     task = _create_task(multiply, name="get_all_jobs")
 
-    _OrchestratorFactory._build_dispatcher()
-
     job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
@@ -143,12 +122,8 @@ def test_get_jobs(init_sql_repo):
 def test_delete_job(init_sql_repo):
     Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
 
-    init_managers()
-
     task = _create_task(multiply, name="delete_job")
 
-    _OrchestratorFactory._build_dispatcher()
-
     job_1 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
     job_2 = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]
 
@@ -160,7 +135,6 @@ def test_delete_job(init_sql_repo):
 
 def test_raise_when_trying_to_delete_unfinished_job(init_sql_repo):
     Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
-    init_managers()
 
     m = multiprocessing.Manager()
     lock = m.Lock()
@@ -189,7 +163,6 @@ def test_raise_when_trying_to_delete_unfinished_job(init_sql_repo):
 
 def test_force_deleting_unfinished_job(init_sql_repo):
     Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
-    init_managers()
 
     m = multiprocessing.Manager()
     lock = m.Lock()
@@ -217,8 +190,6 @@ def test_force_deleting_unfinished_job(init_sql_repo):
 
 
 def test_is_deletable(init_sql_repo):
-    init_managers()
-
     assert len(_JobManager._get_all()) == 0
     task = _create_task(print, 0, "task")
     job = _OrchestratorFactory._orchestrator.submit_task(task).jobs[0]

+ 1 - 67
tests/core/scenario/test_scenario_manager.py

@@ -20,11 +20,9 @@ from taipy.config.common.scope import Scope
 from taipy.config.config import Config
 from taipy.core import Job
 from taipy.core._orchestrator._orchestrator import _Orchestrator
-from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core._version._version_manager import _VersionManager
 from taipy.core.common import _utils
 from taipy.core.common._utils import _Subscriber
-from taipy.core.config.job_config import JobConfig
 from taipy.core.cycle._cycle_manager import _CycleManager
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.data.in_memory import InMemoryDataNode
@@ -52,9 +50,6 @@ from tests.core.utils.NotifyMock import NotifyMock
 
 
 def test_set_and_get_scenario(cycle):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    _OrchestratorFactory._build_dispatcher()
-
     scenario_id_1 = ScenarioId("scenario_id_1")
     scenario_1 = Scenario("scenario_name_1", [], {}, [], scenario_id_1)
 
@@ -278,16 +273,12 @@ def test_get_all_on_multiple_versions_environment():
 
 
 def test_create_scenario_does_not_modify_config():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     creation_date_1 = datetime.now()
     name_1 = "name_1"
     scenario_config = Config.configure_scenario("sc", None, None, Frequency.DAILY)
     assert scenario_config.properties.get("name") is None
     assert len(scenario_config.properties) == 0
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario = _ScenarioManager._create(scenario_config, creation_date=creation_date_1, name=name_1)
     assert len(scenario_config.properties) == 0
     assert len(scenario.properties) == 1
@@ -307,8 +298,6 @@ def test_create_scenario_does_not_modify_config():
 
 
 def test_create_and_delete_scenario():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     creation_date_1 = datetime.now()
     creation_date_2 = creation_date_1 + timedelta(minutes=10)
 
@@ -319,8 +308,6 @@ def test_create_and_delete_scenario():
 
     scenario_config = Config.configure_scenario("sc", None, None, Frequency.DAILY)
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario_1 = _ScenarioManager._create(scenario_config, creation_date=creation_date_1, name=name_1)
     assert scenario_1.config_id == "sc"
     assert scenario_1.sequences == {}
@@ -531,8 +518,6 @@ def mult_by_4(nb: int):
 
 
 def test_scenario_manager_only_creates_data_node_once():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     # dn_1 ---> mult_by_2 ---> dn_2 ---> mult_by_3 ---> dn_6
     # dn_1 ---> mult_by_4 ---> dn_4
 
@@ -550,8 +535,6 @@ def test_scenario_manager_only_creates_data_node_once():
         {"by_6": [task_mult_by_2_config, task_mult_by_3_config], "by_4": [task_mult_by_4_config]}
     )
 
-    _OrchestratorFactory._build_dispatcher()
-
     assert len(_DataManager._get_all()) == 0
     assert len(_TaskManager._get_all()) == 0
     assert len(_SequenceManager._get_all()) == 0
@@ -588,8 +571,6 @@ def test_scenario_manager_only_creates_data_node_once():
 
 
 def test_notification_subscribe(mocker):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
 
     scenario_config = Config.configure_scenario(
@@ -604,8 +585,6 @@ def test_notification_subscribe(mocker):
         ],
     )
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario = _ScenarioManager._create(scenario_config)
 
     notify_1 = NotifyMock(scenario)
@@ -639,8 +618,6 @@ class Notify:
 
 
 def test_notification_subscribe_multiple_params(mocker):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
 
     scenario_config = Config.configure_scenario(
@@ -656,8 +633,6 @@ def test_notification_subscribe_multiple_params(mocker):
     )
     notify = mocker.Mock()
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario = _ScenarioManager._create(scenario_config)
     _ScenarioManager._subscribe(callback=notify, params=["foobar", 123, 1.2], scenario=scenario)
     mocker.patch.object(_ScenarioManager, "_get", return_value=scenario)
@@ -680,8 +655,6 @@ def notify2(*args, **kwargs):
 
 
 def test_notification_unsubscribe(mocker):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
 
     scenario_config = Config.configure_scenario(
@@ -696,8 +669,6 @@ def test_notification_unsubscribe(mocker):
         ],
     )
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario = _ScenarioManager._create(scenario_config)
 
     notify_1 = notify1
@@ -715,8 +686,6 @@ def test_notification_unsubscribe(mocker):
 
 
 def test_notification_unsubscribe_multi_param():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     scenario_config = Config.configure_scenario(
         "awesome_scenario",
         [
@@ -729,8 +698,6 @@ def test_notification_unsubscribe_multi_param():
         ],
     )
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario = _ScenarioManager._create(scenario_config)
 
     # test subscribing notification
@@ -756,8 +723,6 @@ def test_notification_unsubscribe_multi_param():
 
 
 def test_scenario_notification_subscribe_all():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     scenario_config = Config.configure_scenario(
         "awesome_scenario",
         [
@@ -780,7 +745,7 @@ def test_scenario_notification_subscribe_all():
             )
         ],
     )
-    _OrchestratorFactory._build_dispatcher()
+
     scenario = _ScenarioManager._create(scenario_config)
     other_scenario = _ScenarioManager._create(other_scenario_config)
     notify_1 = NotifyMock(scenario)
@@ -816,9 +781,6 @@ def test_is_promotable_to_primary_scenario():
 
 
 def test_get_set_primary_scenario():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    _OrchestratorFactory._build_dispatcher()
-
     cycle_1 = _CycleManager._create(Frequency.DAILY, name="foo")
 
     scenario_1 = Scenario("sc_1", [], {}, ScenarioId("sc_1"), is_primary=False, cycle=cycle_1)
@@ -852,16 +814,12 @@ def test_get_set_primary_scenario():
 
 
 def test_hard_delete_one_single_scenario_with_scenario_data_nodes():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing")
     dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO)
     task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
     scenario_config = Config.configure_scenario("scenario_config", [task_config])
     scenario_config.add_sequences({"sequence_config": [task_config]})
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario = _ScenarioManager._create(scenario_config)
     _ScenarioManager._submit(scenario.id)
 
@@ -879,16 +837,12 @@ def test_hard_delete_one_single_scenario_with_scenario_data_nodes():
 
 
 def test_hard_delete_one_scenario_among_two_with_scenario_data_nodes():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing")
     dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO)
     task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
     scenario_config = Config.configure_scenario("scenario_config", [task_config])
     scenario_config.add_sequences({"sequence_config": [task_config]})
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario_1 = _ScenarioManager._create(scenario_config)
     scenario_2 = _ScenarioManager._create(scenario_config)
     _ScenarioManager._submit(scenario_1.id)
@@ -909,16 +863,12 @@ def test_hard_delete_one_scenario_among_two_with_scenario_data_nodes():
 
 
 def test_hard_delete_one_scenario_among_two_with_cycle_data_nodes():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.CYCLE, default_data="testing")
     dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.CYCLE)
     task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
     scenario_config = Config.configure_scenario("scenario_config", [task_config])
     scenario_config.add_sequences({"sequence_config": [task_config]})
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario_1 = _ScenarioManager._create(scenario_config)
     scenario_2 = _ScenarioManager._create(scenario_config)
     _ScenarioManager._submit(scenario_1.id)
@@ -939,8 +889,6 @@ def test_hard_delete_one_scenario_among_two_with_cycle_data_nodes():
 
 
 def test_hard_delete_shared_entities():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     dn_config_1 = Config.configure_data_node("my_input_1", "in_memory", scope=Scope.CYCLE, default_data="testing")
     dn_config_2 = Config.configure_data_node("my_input_2", "in_memory", scope=Scope.SCENARIO, default_data="testing")
     dn_config_3 = Config.configure_data_node("my_input_3", "in_memory", scope=Scope.GLOBAL, default_data="testing")
@@ -963,8 +911,6 @@ def test_hard_delete_shared_entities():
         }
     )
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario_1 = _ScenarioManager._create(scenario_config_1)
     scenario_2 = _ScenarioManager._create(scenario_config_1)
     scenario_1.submit()
@@ -1008,9 +954,6 @@ def test_is_submittable():
 
 
 def test_submit():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    _OrchestratorFactory._build_dispatcher()
-
     data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
     data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
     data_node_3 = InMemoryDataNode("baz", Scope.SCENARIO, "s3")
@@ -1179,8 +1122,6 @@ def test_scenarios_comparison():
         comparators={"bar": [subtraction], "foo": [subtraction, addition]},
     )
 
-    _OrchestratorFactory._build_dispatcher()
-
     assert scenario_config.comparators is not None
     scenario_1 = _ScenarioManager._create(scenario_config)
     scenario_2 = _ScenarioManager._create(scenario_config)
@@ -1213,9 +1154,6 @@ def test_scenarios_comparison():
 
 
 def test_tags():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    _OrchestratorFactory._build_dispatcher()
-
     cycle_1 = _CycleManager._create(Frequency.DAILY, name="today", creation_date=datetime.now())
     cycle_2 = _CycleManager._create(
         Frequency.DAILY,
@@ -1353,13 +1291,9 @@ def test_tags():
 
 
 def test_authorized_tags():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     scenario = Scenario("scenario_1", [], {"authorized_tags": ["foo", "bar"]}, [], ScenarioId("scenario_1"))
     scenario_2_cfg = Config.configure_scenario("scenario_2", [], [], Frequency.DAILY, authorized_tags=["foo", "bar"])
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario_2 = _ScenarioManager._create(scenario_2_cfg)
     _ScenarioManager._set(scenario)
 

+ 7 - 38
tests/core/scenario/test_scenario_manager_with_sql_repo.py

@@ -16,9 +16,7 @@ import pytest
 from taipy.config.common.frequency import Frequency
 from taipy.config.common.scope import Scope
 from taipy.config.config import Config
-from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core._version._version_manager import _VersionManager
-from taipy.core.config.job_config import JobConfig
 from taipy.core.cycle._cycle_manager import _CycleManager
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.data.in_memory import InMemoryDataNode
@@ -32,12 +30,7 @@ from taipy.core.task.task import Task
 from taipy.core.task.task_id import TaskId
 
 
-def test_set_and_get_scenario(cycle, init_sql_repo, init_managers):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
-    init_managers()
-    _OrchestratorFactory._build_dispatcher()
-
+def test_set_and_get_scenario(cycle, init_sql_repo):
     scenario_id_1 = ScenarioId("scenario_id_1")
     scenario_1 = Scenario("scenario_name_1", [], {}, [], scenario_id_1)
 
@@ -200,9 +193,7 @@ def test_set_and_get_scenario(cycle, init_sql_repo, init_managers):
     assert _TaskManager._get(task_2.id).id == task_2.id
 
 
-def test_get_all_on_multiple_versions_environment(init_sql_repo, init_managers):
-    init_managers()
-
+def test_get_all_on_multiple_versions_environment(init_sql_repo):
     # Create 5 scenarios with 2 versions each
     # Only version 1.0 has the scenario with config_id = "config_id_1"
     # Only version 2.0 has the scenario with config_id = "config_id_6"
@@ -233,17 +224,11 @@ def test_get_all_on_multiple_versions_environment(init_sql_repo, init_managers):
     assert len(_ScenarioManager._get_all_by(filters=[{"version": "2.0", "config_id": "config_id_6"}])) == 1
 
 
-def test_create_scenario_does_not_modify_config(init_sql_repo, init_managers):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
-    init_managers()
-
+def test_create_scenario_does_not_modify_config(init_sql_repo):
     creation_date_1 = datetime.now()
     name_1 = "name_1"
     scenario_config = Config.configure_scenario("sc", None, None, Frequency.DAILY)
 
-    _OrchestratorFactory._build_dispatcher()
-
     assert scenario_config.properties.get("name") is None
     assert len(scenario_config.properties) == 0
 
@@ -265,11 +250,7 @@ def test_create_scenario_does_not_modify_config(init_sql_repo, init_managers):
     assert scenario_2.name is None
 
 
-def test_create_and_delete_scenario(init_sql_repo, init_managers):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
-    init_managers()
-
+def test_create_and_delete_scenario(init_sql_repo):
     creation_date_1 = datetime.now()
     creation_date_2 = creation_date_1 + timedelta(minutes=10)
 
@@ -280,8 +261,6 @@ def test_create_and_delete_scenario(init_sql_repo, init_managers):
 
     scenario_config = Config.configure_scenario("sc", None, None, Frequency.DAILY)
 
-    _OrchestratorFactory._build_dispatcher()
-
     scenario_1 = _ScenarioManager._create(scenario_config, creation_date=creation_date_1, name=name_1)
     assert scenario_1.config_id == "sc"
     assert scenario_1.sequences == {}
@@ -350,11 +329,7 @@ def mult_by_4(nb: int):
     return nb * 4
 
 
-def test_scenario_manager_only_creates_data_node_once(init_sql_repo, init_managers):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
-    init_managers()
-
+def test_scenario_manager_only_creates_data_node_once(init_sql_repo):
     # dn_1 ---> mult_by_2 ---> dn_2 ---> mult_by_3 ---> dn_6
     # dn_1 ---> mult_by_4 ---> dn_4
 
@@ -372,8 +347,6 @@ def test_scenario_manager_only_creates_data_node_once(init_sql_repo, init_manage
         {"by_6": [task_mult_by_2_config, task_mult_by_3_config], "by_4": [task_mult_by_4_config]}
     )
 
-    _OrchestratorFactory._build_dispatcher()
-
     assert len(_DataManager._get_all()) == 0
     assert len(_TaskManager._get_all()) == 0
     assert len(_SequenceManager._get_all()) == 0
@@ -409,9 +382,7 @@ def test_scenario_manager_only_creates_data_node_once(init_sql_repo, init_manage
     assert len(_ScenarioManager._get_all()) == 2
 
 
-def test_get_scenarios_by_config_id(init_sql_repo, init_managers):
-    init_managers()
-
+def test_get_scenarios_by_config_id(init_sql_repo):
     scenario_config_1 = Config.configure_scenario("s1", sequence_configs=[])
     scenario_config_2 = Config.configure_scenario("s2", sequence_configs=[])
     scenario_config_3 = Config.configure_scenario("s3", sequence_configs=[])
@@ -441,9 +412,7 @@ def test_get_scenarios_by_config_id(init_sql_repo, init_managers):
     assert sorted([s_3_1.id]) == sorted([scenario.id for scenario in s3_scenarios])
 
 
-def test_get_scenarios_by_config_id_in_multiple_versions_environment(init_sql_repo, init_managers):
-    init_managers()
-
+def test_get_scenarios_by_config_id_in_multiple_versions_environment(init_sql_repo):
     scenario_config_1 = Config.configure_scenario("s1", sequence_configs=[])
     scenario_config_2 = Config.configure_scenario("s2", sequence_configs=[])
 

+ 0 - 46
tests/core/sequence/test_sequence_manager.py

@@ -20,11 +20,9 @@ import pytest
 from taipy.config.common.scope import Scope
 from taipy.config.config import Config
 from taipy.core._orchestrator._orchestrator import _Orchestrator
-from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core._version._version_manager import _VersionManager
 from taipy.core.common import _utils
 from taipy.core.common._utils import _Subscriber
-from taipy.core.config.job_config import JobConfig
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.data.in_memory import InMemoryDataNode
 from taipy.core.exceptions.exceptions import (
@@ -73,8 +71,6 @@ def test_raise_sequence_does_not_belong_to_scenario():
 
 
 def __init():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    _OrchestratorFactory._build_dispatcher()
     input_dn = InMemoryDataNode("foo", Scope.SCENARIO)
     output_dn = InMemoryDataNode("foo", Scope.SCENARIO)
     task = Task("task", {}, print, [input_dn], [output_dn], TaskId("task_id"))
@@ -217,9 +213,6 @@ def test_is_submittable():
 
 
 def test_submit():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    _OrchestratorFactory._build_dispatcher()
-
     data_node_1 = InMemoryDataNode("foo", Scope.SCENARIO, "s1")
     data_node_2 = InMemoryDataNode("bar", Scope.SCENARIO, "s2")
     data_node_3 = InMemoryDataNode("baz", Scope.SCENARIO, "s3")
@@ -328,9 +321,6 @@ def mock_function_no_input_one_output():
 
 
 def test_submit_sequence_from_tasks_with_one_or_no_input_output():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    _OrchestratorFactory._build_dispatcher()
-
     # test no input and no output Task
     task_no_input_no_output = Task("task_no_input_no_output", {}, mock_function_no_input_no_output)
     scenario_1 = Scenario("scenario_1", {task_no_input_no_output}, {})
@@ -397,8 +387,6 @@ def mult_by_3(nb: int):
 
 def test_get_or_create_data():
     # only create intermediate data node once
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     dn_config_1 = Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)
     dn_config_2 = Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0)
     dn_config_6 = Config.configure_data_node("baz", "in_memory", Scope.SCENARIO, default_data=0)
@@ -408,8 +396,6 @@ def test_get_or_create_data():
     # dn_1 ---> mult_by_two ---> dn_2 ---> mult_by_3 ---> dn_6
     scenario_config = Config.configure_scenario("scenario", [task_config_mult_by_two, task_config_mult_by_3])
 
-    _OrchestratorFactory._build_dispatcher()
-
     assert len(_DataManager._get_all()) == 0
     assert len(_TaskManager._get_all()) == 0
 
@@ -460,8 +446,6 @@ def notify_multi_param(*args, **kwargs):
 
 
 def test_sequence_notification_subscribe(mocker):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
 
     task_configs = [
@@ -473,8 +457,6 @@ def test_sequence_notification_subscribe(mocker):
         )
     ]
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks = _TaskManager._bulk_get_or_create(task_configs=task_configs)
     scenario = Scenario("scenario", set(tasks), {}, sequences={"by_1": {"tasks": tasks}})
     _ScenarioManager._set(scenario)
@@ -514,8 +496,6 @@ def test_sequence_notification_subscribe(mocker):
 
 
 def test_sequence_notification_subscribe_multi_param(mocker):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
 
     task_configs = [
@@ -527,8 +507,6 @@ def test_sequence_notification_subscribe_multi_param(mocker):
         )
     ]
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks = _TaskManager._bulk_get_or_create(task_configs)
     scenario = Scenario("scenario", set(tasks), {}, sequences={"by_6": {"tasks": tasks}})
     _ScenarioManager._set(scenario)
@@ -549,8 +527,6 @@ def test_sequence_notification_subscribe_multi_param(mocker):
 
 
 def test_sequence_notification_unsubscribe(mocker):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     mocker.patch("taipy.core._entity._reload._Reloader._reload", side_effect=lambda m, o: o)
 
     task_configs = [
@@ -562,8 +538,6 @@ def test_sequence_notification_unsubscribe(mocker):
         )
     ]
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks = _TaskManager._bulk_get_or_create(task_configs)
     scenario = Scenario("scenario", set(tasks), {}, sequences={"by_6": {"tasks": tasks}})
     _ScenarioManager._set(scenario)
@@ -584,8 +558,6 @@ def test_sequence_notification_unsubscribe(mocker):
 
 
 def test_sequence_notification_unsubscribe_multi_param():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     task_configs = [
         Config.configure_task(
             "mult_by_two",
@@ -595,8 +567,6 @@ def test_sequence_notification_unsubscribe_multi_param():
         )
     ]
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks = _TaskManager._bulk_get_or_create(task_configs)
     scenario = Scenario("scenario", tasks, {}, sequences={"by_6": {"tasks": tasks}})
     _ScenarioManager._set(scenario)
@@ -622,8 +592,6 @@ def test_sequence_notification_unsubscribe_multi_param():
 
 
 def test_sequence_notification_subscribe_all():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     task_configs = [
         Config.configure_task(
             "mult_by_two",
@@ -633,8 +601,6 @@ def test_sequence_notification_subscribe_all():
         )
     ]
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks = _TaskManager._bulk_get_or_create(task_configs)
     scenario = Scenario("scenario", tasks, {}, sequences={"by_6": {"tasks": tasks}, "other_sequence": {"tasks": tasks}})
     _ScenarioManager._set(scenario)
@@ -802,14 +768,10 @@ def test_export(tmpdir_factory):
 
 
 def test_hard_delete_one_single_sequence_with_scenario_data_nodes():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing")
     dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO)
     task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks = _TaskManager._bulk_get_or_create([task_config])
     scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
     _ScenarioManager._set(scenario)
@@ -831,14 +793,10 @@ def test_hard_delete_one_single_sequence_with_scenario_data_nodes():
 
 
 def test_hard_delete_one_single_sequence_with_cycle_data_nodes():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.CYCLE, default_data="testing")
     dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.CYCLE)
     task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks = _TaskManager._bulk_get_or_create([task_config])
     scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
     _ScenarioManager._set(scenario)
@@ -860,16 +818,12 @@ def test_hard_delete_one_single_sequence_with_cycle_data_nodes():
 
 
 def test_hard_delete_shared_entities():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
     input_dn = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing")
     intermediate_dn = Config.configure_data_node("my_inter", "in_memory", scope=Scope.GLOBAL, default_data="testing")
     output_dn = Config.configure_data_node("my_output", "in_memory", scope=Scope.GLOBAL, default_data="testing")
     task_1 = Config.configure_task("task_1", print, input_dn, intermediate_dn)
     task_2 = Config.configure_task("task_2", print, intermediate_dn, output_dn)
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks_scenario_1 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_1")
     tasks_scenario_2 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_2")
 

+ 6 - 39
tests/core/sequence/test_sequence_manager_with_sql_repo.py

@@ -13,9 +13,7 @@ import pytest
 
 from taipy.config.common.scope import Scope
 from taipy.config.config import Config
-from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core._version._version_manager import _VersionManager
-from taipy.core.config.job_config import JobConfig
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.data.in_memory import InMemoryDataNode
 from taipy.core.exceptions import SequenceAlreadyExists
@@ -29,12 +27,7 @@ from taipy.core.task.task import Task
 from taipy.core.task.task_id import TaskId
 
 
-def test_set_and_get_sequence(init_sql_repo, init_managers):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
-    init_managers()
-    _OrchestratorFactory._build_dispatcher()
-
+def test_set_and_get_sequence(init_sql_repo):
     input_dn = InMemoryDataNode("foo", Scope.SCENARIO)
     output_dn = InMemoryDataNode("foo", Scope.SCENARIO)
     task = Task("task", {}, print, [input_dn], [output_dn], TaskId("task_id"))
@@ -89,9 +82,7 @@ def test_set_and_get_sequence(init_sql_repo, init_managers):
     assert len(_SequenceManager._get(sequence_2).tasks) == 1
 
 
-def test_get_all_on_multiple_versions_environment(init_sql_repo, init_managers):
-    init_managers()
-
+def test_get_all_on_multiple_versions_environment(init_sql_repo):
     # Create 5 sequences from Scenario with 2 versions each
     for version in range(1, 3):
         for i in range(5):
@@ -152,12 +143,8 @@ def mult_by_3(nb: int):
     return nb * 3
 
 
-def test_get_or_create_data(init_sql_repo, init_managers):
+def test_get_or_create_data(init_sql_repo):
     # only create intermediate data node once
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
-    init_managers()
-
     dn_config_1 = Config.configure_data_node("foo", "in_memory", Scope.SCENARIO, default_data=1)
     dn_config_2 = Config.configure_data_node("bar", "in_memory", Scope.SCENARIO, default_data=0)
     dn_config_6 = Config.configure_data_node("baz", "in_memory", Scope.SCENARIO, default_data=0)
@@ -167,8 +154,6 @@ def test_get_or_create_data(init_sql_repo, init_managers):
     # dn_1 ---> mult_by_two ---> dn_2 ---> mult_by_3 ---> dn_6
     scenario_config = Config.configure_scenario("scenario", [task_config_mult_by_two, task_config_mult_by_3])
 
-    _OrchestratorFactory._build_dispatcher()
-
     assert len(_DataManager._get_all()) == 0
     assert len(_TaskManager._get_all()) == 0
 
@@ -206,17 +191,11 @@ def test_get_or_create_data(init_sql_repo, init_managers):
         sequence.WRONG.write(7)
 
 
-def test_hard_delete_one_single_sequence_with_scenario_data_nodes(init_sql_repo, init_managers):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
-    init_managers()
-
+def test_hard_delete_one_single_sequence_with_scenario_data_nodes(init_sql_repo):
     dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing")
     dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO)
     task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks = _TaskManager._bulk_get_or_create([task_config])
     scenario = Scenario("scenario", set(tasks), {}, sequences={"sequence": {"tasks": tasks}})
     _ScenarioManager._set(scenario)
@@ -237,17 +216,11 @@ def test_hard_delete_one_single_sequence_with_scenario_data_nodes(init_sql_repo,
     assert len(_JobManager._get_all()) == 1
 
 
-def test_hard_delete_one_single_sequence_with_cycle_data_nodes(init_sql_repo, init_managers):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
-    init_managers()
-
+def test_hard_delete_one_single_sequence_with_cycle_data_nodes(init_sql_repo):
     dn_input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.CYCLE, default_data="testing")
     dn_output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.CYCLE)
     task_config = Config.configure_task("task_config", print, dn_input_config, dn_output_config)
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks = _TaskManager._bulk_get_or_create([task_config])
     scenario = Scenario("scenario", tasks, {}, sequences={"sequence": {"tasks": tasks}})
     _ScenarioManager._set(scenario)
@@ -268,19 +241,13 @@ def test_hard_delete_one_single_sequence_with_cycle_data_nodes(init_sql_repo, in
     assert len(_JobManager._get_all()) == 1
 
 
-def test_hard_delete_shared_entities(init_sql_repo, init_managers):
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-
-    init_managers()
-
+def test_hard_delete_shared_entities(init_sql_repo):
     input_dn = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing")
     intermediate_dn = Config.configure_data_node("my_inter", "in_memory", scope=Scope.GLOBAL, default_data="testing")
     output_dn = Config.configure_data_node("my_output", "in_memory", scope=Scope.GLOBAL, default_data="testing")
     task_1 = Config.configure_task("task_1", print, input_dn, intermediate_dn)
     task_2 = Config.configure_task("task_2", print, intermediate_dn, output_dn)
 
-    _OrchestratorFactory._build_dispatcher()
-
     tasks_scenario_1 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_1")
     tasks_scenario_2 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_2")
 

+ 0 - 17
tests/core/submission/test_submission_manager_with_sql_repo.py

@@ -22,14 +22,7 @@ from taipy.core.submission.submission import Submission
 from taipy.core.submission.submission_status import SubmissionStatus
 
 
-def init_managers():
-    _VersionManagerFactory._build_manager()._delete_all()
-    _SubmissionManagerFactory._build_manager()._delete_all()
-
-
 def test_create_submission(scenario, init_sql_repo):
-    init_managers()
-
     submission_1 = _SubmissionManagerFactory._build_manager()._create(
         scenario.id, scenario._ID_PREFIX, scenario.config_id, debug=True, log="log_file", retry_note=5
     )
@@ -44,8 +37,6 @@ def test_create_submission(scenario, init_sql_repo):
 
 
 def test_get_submission(init_sql_repo):
-    init_managers()
-
     submission_manager = _SubmissionManagerFactory._build_manager()
 
     submission_1 = submission_manager._create(
@@ -63,8 +54,6 @@ def test_get_submission(init_sql_repo):
 
 
 def test_get_all_submission(init_sql_repo):
-    init_managers()
-
     submission_manager = _SubmissionManagerFactory._build_manager()
     version_manager = _VersionManagerFactory._build_manager()
 
@@ -86,8 +75,6 @@ def test_get_all_submission(init_sql_repo):
 
 
 def test_get_latest_submission(init_sql_repo):
-    init_managers()
-
     task_1 = Task("task_config_1", {}, print, id="task_id_1")
     task_2 = Task("task_config_2", {}, print, id="task_id_2")
 
@@ -113,8 +100,6 @@ def test_get_latest_submission(init_sql_repo):
 
 
 def test_delete_submission(init_sql_repo):
-    init_managers()
-
     submission_manager = _SubmissionManagerFactory._build_manager()
 
     submission = Submission("entity_id", "submission_id", "entity_config_id")
@@ -140,8 +125,6 @@ def test_delete_submission(init_sql_repo):
 
 
 def test_is_deletable(init_sql_repo):
-    init_managers()
-
     submission_manager = _SubmissionManagerFactory._build_manager()
 
     submission = Submission("entity_id", "submission_id", "entity_config_id")

+ 0 - 29
tests/core/task/test_task_manager_with_sql_repo.py

@@ -19,25 +19,14 @@ from taipy.config.config import Config
 from taipy.core._orchestrator._orchestrator import _Orchestrator
 from taipy.core._version._version_manager import _VersionManager
 from taipy.core.data._data_manager import _DataManager
-from taipy.core.data._data_manager_factory import _DataManagerFactory
 from taipy.core.data.in_memory import InMemoryDataNode
 from taipy.core.exceptions.exceptions import ModelNotFound, NonExistingTask
-from taipy.core.job._job_manager_factory import _JobManagerFactory
 from taipy.core.task._task_manager import _TaskManager
-from taipy.core.task._task_manager_factory import _TaskManagerFactory
 from taipy.core.task.task import Task
 from taipy.core.task.task_id import TaskId
 
 
-def init_managers():
-    _JobManagerFactory._build_manager()._delete_all()
-    _TaskManagerFactory._build_manager()._delete_all()
-    _DataManagerFactory._build_manager()._delete_all()
-
-
 def test_create_and_save(init_sql_repo):
-    init_managers()
-
     input_configs = [Config.configure_data_node("my_input", "in_memory")]
     output_configs = Config.configure_data_node("my_output", "in_memory")
     task_config = Config.configure_task("foo", print, input_configs, output_configs)
@@ -66,8 +55,6 @@ def test_create_and_save(init_sql_repo):
 
 
 def test_do_not_recreate_existing_data_node(init_sql_repo):
-    init_managers()
-
     input_config = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO)
     output_config = Config.configure_data_node("my_output", "in_memory", scope=Scope.SCENARIO)
 
@@ -80,7 +67,6 @@ def test_do_not_recreate_existing_data_node(init_sql_repo):
 
 
 def test_do_not_recreate_existing_task(init_sql_repo):
-    init_managers()
     assert len(_TaskManager._get_all()) == 0
 
     input_config_scope_scenario = Config.configure_data_node("my_input_1", "in_memory", Scope.SCENARIO)
@@ -166,8 +152,6 @@ def test_do_not_recreate_existing_task(init_sql_repo):
 
 
 def test_set_and_get_task(init_sql_repo):
-    init_managers()
-
     task_id_1 = TaskId("id1")
     first_task = Task("name_1", {}, print, [], [], task_id_1)
     task_id_2 = TaskId("id2")
@@ -218,9 +202,6 @@ def test_set_and_get_task(init_sql_repo):
 
 
 def test_get_all_on_multiple_versions_environment(init_sql_repo):
-    Config.configure_global_app(repository_type="sql")
-    init_managers()
-
     # Create 5 tasks with 2 versions each
     # Only version 1.0 has the task with config_id = "config_id_1"
     # Only version 2.0 has the task with config_id = "config_id_6"
@@ -254,8 +235,6 @@ def test_get_all_on_multiple_versions_environment(init_sql_repo):
 
 
 def test_ensure_conservation_of_order_of_data_nodes_on_task_creation(init_sql_repo):
-    init_managers()
-
     embedded_1 = Config.configure_data_node("dn_1", "in_memory", scope=Scope.SCENARIO)
     embedded_2 = Config.configure_data_node("dn_2", "in_memory", scope=Scope.SCENARIO)
     embedded_3 = Config.configure_data_node("a_dn_3", "in_memory", scope=Scope.SCENARIO)
@@ -278,8 +257,6 @@ def test_ensure_conservation_of_order_of_data_nodes_on_task_creation(init_sql_re
 
 
 def test_delete_raise_exception(init_sql_repo):
-    init_managers()
-
     dn_input_config_1 = Config.configure_data_node(
         "my_input_1", "in_memory", scope=Scope.SCENARIO, default_data="testing"
     )
@@ -293,8 +270,6 @@ def test_delete_raise_exception(init_sql_repo):
 
 
 def test_hard_delete(init_sql_repo):
-    init_managers()
-
     dn_input_config_1 = Config.configure_data_node(
         "my_input_1", "in_memory", scope=Scope.SCENARIO, default_data="testing"
     )
@@ -354,8 +329,6 @@ def test_submit_task():
 
 
 def test_get_tasks_by_config_id(init_sql_repo):
-    init_managers()
-
     dn_config = Config.configure_data_node("dn", scope=Scope.SCENARIO)
     task_config_1 = Config.configure_task("t1", print, dn_config)
     task_config_2 = Config.configure_task("t2", print, dn_config)
@@ -387,8 +360,6 @@ def test_get_tasks_by_config_id(init_sql_repo):
 
 
 def test_get_scenarios_by_config_id_in_multiple_versions_environment(init_sql_repo):
-    init_managers()
-
     dn_config = Config.configure_data_node("dn", scope=Scope.SCENARIO)
     task_config_1 = Config.configure_task("t1", print, dn_config)
     task_config_2 = Config.configure_task("t2", print, dn_config)

+ 0 - 7
tests/core/test_complex_application.py

@@ -19,8 +19,6 @@ import pandas as pd
 import taipy.core.taipy as tp
 from taipy.config import Config
 from taipy.core import Core, Status
-from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
-from taipy.core.config.job_config import JobConfig
 
 # ################################  USER FUNCTIONS  ##################################
 
@@ -71,8 +69,6 @@ def return_a_number_with_sleep():
 
 
 def test_skipped_jobs():
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    _OrchestratorFactory._build_orchestrator()
     input_config = Config.configure_data_node("input_dn")
     intermediate_config = Config.configure_data_node("intermediate")
     output_config = Config.configure_data_node("output_dn")
@@ -116,9 +112,6 @@ def test_complex():
     # |      |
     # t4     d4
 
-    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
-    _OrchestratorFactory._build_orchestrator()
-
     csv_path_inp = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
     excel_path_inp = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.xlsx")
 

+ 6 - 12
tests/core/test_core_cli_with_sql_repo.py

@@ -98,9 +98,8 @@ def test_core_cli_production_mode(init_sql_repo):
         core.stop()
 
 
-def test_dev_mode_clean_all_entities_of_the_latest_version(init_sql_repo, init_managers):
+def test_dev_mode_clean_all_entities_of_the_latest_version(init_sql_repo):
     scenario_config = config_scenario()
-    init_managers()
 
     # Create a scenario in development mode
     with patch("sys.argv", ["prog"]):
@@ -302,9 +301,8 @@ def test_version_number_when_switching_mode(init_sql_repo):
         core.stop()
 
 
-def test_production_mode_load_all_entities_from_previous_production_version(init_sql_repo, init_managers):
+def test_production_mode_load_all_entities_from_previous_production_version(init_sql_repo):
     scenario_config = config_scenario()
-    init_managers()
 
     with patch("sys.argv", ["prog", "--development"]):
         core = Core()
@@ -353,9 +351,8 @@ def test_production_mode_load_all_entities_from_previous_production_version(init
         core.stop()
 
 
-def test_force_override_experiment_version(init_sql_repo, init_managers):
+def test_force_override_experiment_version(init_sql_repo):
     scenario_config = config_scenario()
-    init_managers()
 
     with patch("sys.argv", ["prog", "--experiment", "1.0"]):
         core = Core()
@@ -406,9 +403,8 @@ def test_force_override_experiment_version(init_sql_repo, init_managers):
     assert len(_JobManager._get_all()) == 2
 
 
-def test_force_override_production_version(init_sql_repo, init_managers):
+def test_force_override_production_version(init_sql_repo):
     scenario_config = config_scenario()
-    init_managers()
 
     with patch("sys.argv", ["prog", "--production", "1.0"]):
         core = Core()
@@ -461,9 +457,8 @@ def test_force_override_production_version(init_sql_repo, init_managers):
         core.stop()
 
 
-def test_modify_config_properties_without_force(caplog, init_sql_repo, init_config, init_managers):
+def test_modify_config_properties_without_force(caplog, init_sql_repo, init_config):
     scenario_config = config_scenario()
-    init_managers()
 
     with patch("sys.argv", ["prog", "--experiment", "1.0"]):
         core = Core()
@@ -504,9 +499,8 @@ def test_modify_config_properties_without_force(caplog, init_sql_repo, init_conf
     assert 'DATA_NODE "d2" has attribute "exposed_type" modified' in error_message
 
 
-def test_modify_job_configuration_dont_stop_application(caplog, init_sql_repo, init_config, init_managers):
+def test_modify_job_configuration_dont_stop_application(caplog, init_sql_repo, init_config):
     scenario_config = config_scenario()
-    init_managers()
 
     with patch("sys.argv", ["prog", "--experiment", "1.0"]):
         Config.configure_job_executions(mode="development")

+ 3 - 3
tests/core/utils/__init__.py

@@ -9,11 +9,11 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
+from datetime import datetime
+from time import sleep
 
-def assert_true_after_time(assertion, msg=None, time=120):
-    from datetime import datetime
-    from time import sleep
 
+def assert_true_after_time(assertion, msg=None, time=120):
     loops = 0
     start = datetime.now()
     while (datetime.now() - start).seconds < time: