浏览代码

Clean standalone run running under with context to automatically shutdown executor.

jrobinAV 1 年之前
父节点
当前提交
cc56f86d7a

+ 1 - 1
taipy/core/_orchestrator/_dispatcher/_development_job_dispatcher.py

@@ -19,7 +19,7 @@ from ._task_function_wrapper import _TaskFunctionWrapper
 class _DevelopmentJobDispatcher(_JobDispatcher):
     """Manages job dispatching (instances of `Job^` class) in a synchronous way."""
 
-    def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
+    def __init__(self, orchestrator: _AbstractOrchestrator):
         super().__init__(orchestrator)
 
     def start(self):

+ 1 - 4
taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py

@@ -32,7 +32,7 @@ class _JobDispatcher(threading.Thread):
     _logger = _TaipyLogger._get_logger()
     _nb_available_workers: int = 1
 
-    def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
+    def __init__(self, orchestrator: _AbstractOrchestrator):
         threading.Thread.__init__(self, name="Thread-Taipy-JobDispatcher")
         self.daemon = True
         self.orchestrator = orchestrator
@@ -67,9 +67,6 @@ class _JobDispatcher(threading.Thread):
                 _TaipyLogger._get_logger().exception(e)
                 pass
 
-        # The dispatcher is now shutting down, let's shutdown its executor.
-        self._executor.shutdown(wait=True)
-
     def _can_execute(self) -> bool:
         """Returns True if the dispatcher have resources to execute a new job."""
         return self._nb_available_workers > 0

+ 9 - 2
taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py

@@ -24,12 +24,19 @@ from ._task_function_wrapper import _TaskFunctionWrapper
 class _StandaloneJobDispatcher(_JobDispatcher):
     """Manages job dispatching (instances of `Job^` class) in an asynchronous way using a ProcessPoolExecutor."""
 
-    def __init__(self, orchestrator: Optional[_AbstractOrchestrator], subproc_initializer: Optional[Callable] = None):
+    def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Optional[Callable] = None):
         super().__init__(orchestrator)
         max_workers = Config.job_config.max_nb_of_workers or 1
-        self._executor: Executor = ProcessPoolExecutor(max_workers=max_workers, initializer=subproc_initializer)  # type: ignore
+        self._executor: Executor = ProcessPoolExecutor(
+            max_workers=max_workers,
+            initializer=subproc_initializer
+        )  # type: ignore
         self._nb_available_workers = self._executor._max_workers  # type: ignore
 
+    def run(self):
+        with self._executor:
+            super().run()
+
     def _dispatch(self, job: Job):
         """Dispatches the given `Job^` on an available worker for execution.
 

+ 1 - 1
tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py

@@ -35,7 +35,7 @@ class MockProcessPoolExecutor(Executor):
 
 
 class MockStandaloneDispatcher(_StandaloneJobDispatcher):
-    def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
+    def __init__(self, orchestrator: _AbstractOrchestrator):
         super(_StandaloneJobDispatcher, self).__init__(orchestrator)
         self._executor: Executor = MockProcessPoolExecutor()
         self.dispatch_calls: List = []