Browse Source

fix: mypy issues on orchestrator

trgiangdo 1 year ago
parent
commit
85ab305531

+ 2 - 2
taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py

@@ -12,7 +12,7 @@
 import threading
 from abc import abstractmethod
 from queue import Empty
-from typing import Dict
+from typing import Dict, Optional
 
 from taipy.config.config import Config
 from taipy.logger._taipy_logger import _TaipyLogger
@@ -32,7 +32,7 @@ class _JobDispatcher(threading.Thread):
     __logger = _TaipyLogger._get_logger()
     _nb_available_workers: int = 1
 
-    def __init__(self, orchestrator: _AbstractOrchestrator):
+    def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
         threading.Thread.__init__(self, name="Thread-Taipy-JobDispatcher")
         self.daemon = True
         self.orchestrator = orchestrator

+ 2 - 2
taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py

@@ -9,7 +9,7 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
-from concurrent.futures import ProcessPoolExecutor
+from concurrent.futures import Executor, ProcessPoolExecutor
 from functools import partial
 from typing import Callable, Optional
 
@@ -28,7 +28,7 @@ class _StandaloneJobDispatcher(_JobDispatcher):
     def __init__(self, orchestrator: Optional[_AbstractOrchestrator], subproc_initializer: Optional[Callable] = None):
         super().__init__(orchestrator)
         max_workers = Config.job_config.max_nb_of_workers or 1
-        self._executor = ProcessPoolExecutor(max_workers=max_workers, initializer=subproc_initializer)  # type: ignore
+        self._executor: Executor = ProcessPoolExecutor(max_workers=max_workers, initializer=subproc_initializer)  # type: ignore
         self._nb_available_workers = self._executor._max_workers  # type: ignore
 
     def _dispatch(self, job: Job):

+ 11 - 10
tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py

@@ -8,8 +8,9 @@
 # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
+
 from concurrent.futures import Executor, Future
-from typing import Optional
+from typing import List, Optional
 
 from taipy.core import Job
 from taipy.core._orchestrator._abstract_orchestrator import _AbstractOrchestrator
@@ -17,8 +18,8 @@ from taipy.core._orchestrator._dispatcher import _StandaloneJobDispatcher
 
 
 class MockProcessPoolExecutor(Executor):
-    submit_called = []
-    f = []
+    submit_called: List = []
+    f: List = []
 
     def submit(self, fn, *args, **kwargs):
         self.submit_called.append((fn, args, kwargs))
@@ -36,15 +37,15 @@ class MockProcessPoolExecutor(Executor):
 class MockStandaloneDispatcher(_StandaloneJobDispatcher):
     def __init__(self, orchestrator: Optional[_AbstractOrchestrator]):
         super(_StandaloneJobDispatcher, self).__init__(orchestrator)
-        self._executor = MockProcessPoolExecutor()
-        self.dispatch_calls = []
-        self.release_worker_calls = []
-        self.set_dispatch_processes_calls = []
-        self.pop_dispatch_processes_calls = []
-        self.update_job_status_from_future_calls = []
+        self._executor: Executor = MockProcessPoolExecutor()
+        self.dispatch_calls: List = []
+        self.release_worker_calls: List = []
+        self.set_dispatch_processes_calls: List = []
+        self.pop_dispatch_processes_calls: List = []
+        self.update_job_status_from_future_calls: List = []
 
     def mock_exception_for_job(self, task_id, e: Exception):
-        self.exceptions[task_id] = e
+        self.exceptions[task_id] = e  # type: ignore[attr-defined]
 
     def _dispatch(self, job: Job):
         self.dispatch_calls.append(job)