Переглянути джерело

Merge pull request #813 from Avaiga/gmarabout/improve_dispatcher_shutdown

🐛Shutdown executor after exiting dispatcher loop
Grégoire Marabout 1 рік тому
батько
коміт
58f5f8d645

+ 5 - 0
taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py

@@ -57,6 +57,8 @@ class _JobDispatcher(threading.Thread):
             try:
                 if self._can_execute():
                     with self.lock:
+                        if self._STOP_FLAG:
+                            break
                         job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1)
                     self._execute_job(job)
             except Empty:  # In case the last job of the queue has been removed.
@@ -65,6 +67,9 @@ class _JobDispatcher(threading.Thread):
                 _TaipyLogger._get_logger().exception(e)
                 pass
 
+        # The dispatcher is now shutting down, let's shutdown its executor.
+        self._executor.shutdown(wait=True)
+
     def _can_execute(self) -> bool:
         """Returns True if the dispatcher have resources to execute a new job."""
         return self._nb_available_workers > 0

+ 0 - 8
taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py

@@ -8,7 +8,6 @@
 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
-import sys
 from concurrent.futures import Executor, ProcessPoolExecutor
 from functools import partial
 from typing import Callable, Optional
@@ -52,10 +51,3 @@ class _StandaloneJobDispatcher(_JobDispatcher):
     def _update_job_status_from_future(self, job: Job, ft):
         self._pop_dispatched_process(job.id)  # type: ignore
         self._update_job_status(job, ft.result())
-
-    def stop(self):
-        super().stop()
-        if sys.version_info >= (3, 9):
-            self._executor.shutdown(wait=True, cancel_futures=False)
-        else:
-            self._executor.shutdown(wait=True)  # cancel_futures is not available in Python 3.8