瀏覽代碼

Merge pull request #861 from Avaiga/fix/performance-issue

release worker first in standalone mode
Jean-Robin 1 年之前
父節點
當前提交
605ad9e001

+ 3 - 0
taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py

@@ -10,6 +10,7 @@
 # specific language governing permissions and limitations under the License.
 
 import threading
+import time
 from abc import abstractmethod
 from queue import Empty
 from typing import Dict, Optional
@@ -69,6 +70,8 @@ class _JobDispatcher(threading.Thread):
                             break
                         job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1)
                     self._execute_job(job)
+                else:
+                    time.sleep(0.1)  # We need to sleep to avoid busy waiting.
             except Empty:  # In case the last job of the queue has been removed.
                 pass
             except Exception as e:

+ 2 - 1
taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py

@@ -51,8 +51,9 @@ class _StandaloneJobDispatcher(_JobDispatcher):
         future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string)
 
         self._set_dispatched_processes(job.id, future)  # type: ignore
+        future.add_done_callback(self._release_worker)  # We must release the worker before updating the job status
+        # so that the worker is available for another job as soon as possible.
         future.add_done_callback(partial(self._update_job_status_from_future, job))
-        future.add_done_callback(self._release_worker)
 
     def _release_worker(self, _):
         self._nb_available_workers += 1