ソースを参照

Test/integration testing (#951)

Add debug logs
---------

Co-authored-by: trgiangdo <dtr.giang.1299@gmail.com>
Co-authored-by: Toan Quach <shiro@Shiros-MacBook-Pro.local>
Jean-Robin 1 年間 前
コミット
8b71da8360

+ 23 - 18
taipy/core/_orchestrator/_dispatcher/_job_dispatcher.py

@@ -55,30 +55,35 @@ class _JobDispatcher(threading.Thread):
             wait (bool): If True, the method will wait for the dispatcher to stop.
             timeout (Optional[float]): The maximum time to wait. If None, the method will wait indefinitely.
         """
-        self.stop_wait = wait
-        self.stop_timeout = timeout
         self._STOP_FLAG = True
+        if wait and self.is_running():
+            self._logger.debug("Waiting for the dispatcher thread to stop...")
+            self.join(timeout=timeout)
 
     def run(self):
         self._logger.debug("Job dispatcher started.")
         while not self._STOP_FLAG:
-            try:
-                if self._can_execute():
-                    with self.lock:
-                        if self._STOP_FLAG:
-                            break
+            if not self._can_execute():
+                time.sleep(0.2)  # We need to sleep to avoid busy waiting.
+                continue
+
+            with self.lock:
+                self._logger.debug("Acquiring lock to check jobs to run.")
+                job = None
+                try:
+                    if not self._STOP_FLAG:
                         job = self.orchestrator.jobs_to_run.get(block=True, timeout=0.1)
-                    self._execute_job(job)
-                else:
-                    time.sleep(0.1)  # We need to sleep to avoid busy waiting.
-            except Empty:  # In case the last job of the queue has been removed.
-                pass
-            except Exception as e:
-                self._logger.exception(e)
-                pass
-        if self.stop_wait:
-            self._logger.debug("Waiting for the dispatcher thread to stop...")
-            self.join(timeout=self.stop_timeout)
+                except Empty:  # In case the last job of the queue has been removed.
+                    pass
+            if job:
+                self._logger.debug(f"Got a job to execute {job.id}.")
+                try:
+                    if not self._STOP_FLAG:
+                        self._execute_job(job)
+                    else:
+                        self.orchestrator.jobs_to_run.put(job)
+                except Exception as e:
+                    self._logger.exception(e)
         self._logger.debug("Job dispatcher stopped.")
 
     @abstractmethod

+ 12 - 10
taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py

@@ -11,6 +11,7 @@
 
 from concurrent.futures import Executor, ProcessPoolExecutor
 from functools import partial
+from threading import Lock
 from typing import Callable, Optional
 
 from taipy.config._serializer._toml_serializer import _TomlSerializer
@@ -25,6 +26,7 @@ from ._task_function_wrapper import _TaskFunctionWrapper
 class _StandaloneJobDispatcher(_JobDispatcher):
     """Manages job dispatching (instances of `Job^` class) in an asynchronous way using a ProcessPoolExecutor."""
 
+    _nb_available_workers_lock = Lock()
     _DEFAULT_MAX_NB_OF_WORKERS = 2
 
     def __init__(self, orchestrator: _AbstractOrchestrator, subproc_initializer: Optional[Callable] = None):
@@ -38,12 +40,14 @@ class _StandaloneJobDispatcher(_JobDispatcher):
 
     def _can_execute(self) -> bool:
         """Returns True if the dispatcher have resources to dispatch a job."""
-        return self._nb_available_workers > 0
+        with self._nb_available_workers_lock:
+            self._logger.debug(f"{self._nb_available_workers=}")
+            return self._nb_available_workers > 0
 
     def run(self):
         with self._executor:
             super().run()
-        self._logger.debug("Standalone job dispatcher: Pool executor shut down")
+        self._logger.debug("Standalone job dispatcher: Pool executor shut down.")
 
     def _dispatch(self, job: Job):
         """Dispatches the given `Job^` on an available worker for execution.
@@ -51,17 +55,15 @@ class _StandaloneJobDispatcher(_JobDispatcher):
         Parameters:
             job (Job^): The job to submit on an executor with an available worker.
         """
-
-        self._nb_available_workers -= 1
+        with self._nb_available_workers_lock:
+            self._nb_available_workers -= 1
+            self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the dispatch method.")
         config_as_string = _TomlSerializer()._serialize(Config._applied_config)  # type: ignore[attr-defined]
         future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string)
-
-        future.add_done_callback(self._release_worker)  # We must release the worker before updating the job status
-        # so that the worker is available for another job as soon as possible.
         future.add_done_callback(partial(self._update_job_status_from_future, job))
 
-    def _release_worker(self, _):
-        self._nb_available_workers += 1
-
     def _update_job_status_from_future(self, job: Job, ft):
+        with self._nb_available_workers_lock:
+            self._nb_available_workers += 1
+            self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the callback method.")
         self._update_job_status(job, ft.result())

+ 21 - 12
taipy/core/_orchestrator/_orchestrator.py

@@ -36,7 +36,7 @@ class _Orchestrator(_AbstractOrchestrator):
     """
 
     jobs_to_run: Queue = Queue()
-    blocked_jobs: List = []
+    blocked_jobs: List[Job] = []
 
     lock = Lock()
     __logger = _TaipyLogger._get_logger()
@@ -58,7 +58,7 @@ class _Orchestrator(_AbstractOrchestrator):
         """Submit the given `Scenario^` or `Sequence^` for an execution.
 
         Parameters:
-             submittable (Union[SCenario^, Sequence^]): The scenario or sequence to submit for execution.
+             submittable (Union[Scenario^, Sequence^]): The scenario or sequence to submit for execution.
              callbacks: The optional list of functions that should be executed on jobs status change.
              force (bool) : Enforce execution of the scenario's or sequence's tasks even if their output data
                 nodes are cached.
@@ -66,7 +66,7 @@ class _Orchestrator(_AbstractOrchestrator):
                 finished in asynchronous mode.
              timeout (Union[float, int]): The optional maximum number of seconds to wait for the jobs to be finished
                 before returning.
-             **properties (dict[str, any]): A keyworded variable length list of user additional arguments
+             **properties (dict[str, any]): A key worded variable length list of user additional arguments
                 that will be stored within the `Submission^`. It can be accessed via `Submission.properties^`.
         Returns:
             The created `Submission^` containing the information about the submission.
@@ -80,6 +80,7 @@ class _Orchestrator(_AbstractOrchestrator):
         jobs: List[Job] = []
         tasks = submittable._get_sorted_tasks()
         with cls.lock:
+            cls.__logger.debug(f"Acquiring lock to submit {submission.entity_id}.")
             for ts in tasks:
                 jobs.extend(
                     cls._lock_dn_output_and_create_job(
@@ -91,8 +92,8 @@ class _Orchestrator(_AbstractOrchestrator):
                     )
                     for task in ts
                 )
-        submission.jobs = jobs  # type: ignore
-        cls._orchestrate_job_to_run_or_block(jobs)
+            submission.jobs = jobs  # type: ignore
+            cls._orchestrate_job_to_run_or_block(jobs)
         if Config.job_config.is_development:
             cls._check_and_execute_jobs_if_development_mode()
         elif wait:
@@ -119,7 +120,7 @@ class _Orchestrator(_AbstractOrchestrator):
                 in asynchronous mode.
              timeout (Union[float, int]): The optional maximum number of seconds to wait for the job
                 to be finished before returning.
-             **properties (dict[str, any]): A keyworded variable length list of user additional arguments
+             **properties (dict[str, any]): A key worded variable length list of user additional arguments
                 that will be stored within the `Submission^`. It can be accessed via `Submission.properties^`.
         Returns:
             The created `Submission^` containing the information about the submission.
@@ -129,6 +130,7 @@ class _Orchestrator(_AbstractOrchestrator):
         )
         submit_id = submission.id
         with cls.lock:
+            cls.__logger.debug(f"Acquiring lock to submit task {task.id}.")
             job = cls._lock_dn_output_and_create_job(
                 task,
                 submit_id,
@@ -136,9 +138,9 @@ class _Orchestrator(_AbstractOrchestrator):
                 itertools.chain([cls._update_submission_status], callbacks or []),
                 force,
             )
-        jobs = [job]
-        submission.jobs = jobs  # type: ignore
-        cls._orchestrate_job_to_run_or_block(jobs)
+            jobs = [job]
+            submission.jobs = jobs  # type: ignore
+            cls._orchestrate_job_to_run_or_block(jobs)
         if Config.job_config.is_development:
             cls._check_and_execute_jobs_if_development_mode()
         else:
@@ -223,17 +225,22 @@ class _Orchestrator(_AbstractOrchestrator):
     @classmethod
     def _on_status_change(cls, job: Job):
         if job.is_completed() or job.is_skipped():
+            cls.__logger.debug(f"{job.id} has been completed or skipped. Unblocking jobs.")
             cls.__unblock_jobs()
         elif job.is_failed():
             cls._fail_subsequent_jobs(job)
 
     @classmethod
     def __unblock_jobs(cls):
-        for job in cls.blocked_jobs:
-            if not cls._is_blocked(job):
-                with cls.lock:
+        with cls.lock:
+            cls.__logger.debug("Acquiring lock to unblock jobs.")
+            for job in cls.blocked_jobs:
+                if not cls._is_blocked(job):
+                    cls.__logger.debug(f"Unblocking job: {job.id}.")
                     job.pending()
+                    cls.__logger.debug(f"Removing job {job.id} from the blocked_job list.")
                     cls.__remove_blocked_job(job)
+                    cls.__logger.debug(f"Adding job {job.id} to the list of jobs to run.")
                     cls.jobs_to_run.put(job)
 
     @classmethod
@@ -253,6 +260,7 @@ class _Orchestrator(_AbstractOrchestrator):
             cls.__logger.info(f"{job.id} has already failed and cannot be canceled.")
         else:
             with cls.lock:
+                cls.__logger.debug(f"Acquiring lock to cancel job {job.id}.")
                 to_cancel_or_abandon_jobs = {job}
                 to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys())))
                 cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs)
@@ -292,6 +300,7 @@ class _Orchestrator(_AbstractOrchestrator):
     @classmethod
     def _fail_subsequent_jobs(cls, failed_job: Job):
         with cls.lock:
+            cls.__logger.debug("Acquiring lock to fail subsequent jobs.")
             to_fail_or_abandon_jobs = set()
             to_fail_or_abandon_jobs.update(
                 cls.__find_subsequent_jobs(failed_job.submit_id, set(failed_job.task.output.keys()))

+ 17 - 3
taipy/core/_repository/_sql_repository.py

@@ -11,11 +11,13 @@
 
 import json
 import pathlib
+from sqlite3 import DatabaseError
 from typing import Any, Dict, Iterable, List, Optional, Type, Union
 
 from sqlalchemy.dialects import sqlite
 from sqlalchemy.exc import NoResultFound
 
+from ...logger._taipy_logger import _TaipyLogger
 from .._repository._abstract_repository import _AbstractRepository
 from ..common.typing import Converter, Entity, ModelType
 from ..exceptions import ModelNotFound
@@ -23,6 +25,8 @@ from .db._sql_connection import _SQLConnection
 
 
 class _SQLRepository(_AbstractRepository[ModelType, Entity]):
+    _logger = _TaipyLogger._get_logger()
+
     def __init__(self, model_type: Type[ModelType], converter: Type[Converter]):
         """
         Holds common methods to be used and extended when the need for saving
@@ -47,9 +51,19 @@ class _SQLRepository(_AbstractRepository[ModelType, Entity]):
     def _save(self, entity: Entity):
         obj = self.converter._entity_to_model(entity)
         if self._exists(entity.id):  # type: ignore
-            self._update_entry(obj)
-            return
-        self.__insert_model(obj)
+            try:
+                self._update_entry(obj)
+                return
+            except DatabaseError as e:
+                self._logger.error(f"Error while updating {entity.id} in {self.table.name}. ")  # type: ignore
+                self._logger.error(f"Error : {e}")
+                raise e
+        try:
+            self.__insert_model(obj)
+        except DatabaseError as e:
+            self._logger.error(f"Error while inserting {entity.id} into {self.table.name}. ")  # type: ignore
+            self._logger.error(f"Error : {e}")
+            raise e
 
     def _exists(self, entity_id: str):
         query = self.table.select().filter_by(id=entity_id)

+ 1 - 1
taipy/core/_repository/db/_sql_connection.py

@@ -84,4 +84,4 @@ def _build_connection() -> Connection:
 
 @lru_cache
 def __build_connection(db_location: str):
-    return sqlite3.connect(db_location, check_same_thread=False)
+    return sqlite3.connect(db_location, check_same_thread=False, timeout=20)

+ 3 - 2
taipy/core/job/job.py

@@ -33,6 +33,7 @@ if TYPE_CHECKING:
 def _run_callbacks(fn):
     def __run_callbacks(job):
         fn(job)
+        _TaipyLogger._get_logger().debug(f"{job.id} status has changed to {job.status}.")
         for fct in job._subscribers:
             fct(job)
 
@@ -200,6 +201,7 @@ class Job(_Entity, _Labeled):
     def completed(self):
         """Set the status to _completed_ and notify subscribers."""
         self.status = Status.COMPLETED
+        self.__logger.info(f"job {self.id} is completed.")
 
     @_run_callbacks
     def skipped(self):
@@ -287,7 +289,7 @@ class Job(_Entity, _Labeled):
         return self.is_completed() or self.is_failed() or self.is_canceled() or self.is_skipped() or self.is_abandoned()
 
     def _is_finished(self) -> bool:
-        """Indicate if the job is finished. This function will not triggered the persistency feature like is_finished().
+        """Indicate if the job is finished. This function will not trigger the persistence feature like is_finished().
 
         Returns:
             True if the job is finished.
@@ -322,7 +324,6 @@ class Job(_Entity, _Labeled):
                 self.__logger.error(st)
         else:
             self.completed()
-            self.__logger.info(f"job {self.id} is completed.")
 
     def __hash__(self):
         return hash(self.id)

+ 10 - 1
taipy/core/submission/submission.py

@@ -14,6 +14,8 @@ import uuid
 from datetime import datetime
 from typing import Any, Dict, List, Optional, Set, Union
 
+from taipy.logger._taipy_logger import _TaipyLogger
+
 from .._entity._entity import _Entity
 from .._entity._labeled import _Labeled
 from .._entity._properties import _Properties
@@ -43,6 +45,7 @@ class Submission(_Entity, _Labeled):
     _MANAGER_NAME = "submission"
     __SEPARATOR = "_"
     lock = threading.Lock()
+    __logger = _TaipyLogger._get_logger()
 
     def __init__(
         self,
@@ -201,7 +204,10 @@ class Submission(_Entity, _Labeled):
             job_status = job.status
             if job_status == Status.FAILED:
                 submission._submission_status = SubmissionStatus.FAILED
-                _SubmissionManagerFactory._build_manager()._set(submission)
+                submission_manager._set(submission)
+                self.__logger.debug(
+                    f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}"
+                )
                 return
             if job_status == Status.CANCELED:
                 submission._is_canceled = True
@@ -242,6 +248,9 @@ class Submission(_Entity, _Labeled):
                 submission.submission_status = SubmissionStatus.COMPLETED  # type: ignore
             else:
                 submission.submission_status = SubmissionStatus.UNDEFINED  # type: ignore
+            self.__logger.debug(
+                f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}"
+            )
 
     def is_finished(self) -> bool:
         """Indicate if the submission is finished.

+ 2 - 2
taipy/logger/_taipy_logger.py

@@ -34,7 +34,7 @@ class _TaipyLogger:
             cls.__logger.setLevel(logging.INFO)
             ch = logging.StreamHandler(sys.stdout)
             ch.setLevel(logging.INFO)
-            formatter = logging.Formatter("[%(asctime)s][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S")
-            ch.setFormatter(formatter)
+            f = logging.Formatter("[%(asctime)s.%(msecs)03d][%(name)s][%(levelname)s] %(message)s", "%Y-%m-%d %H:%M:%S")
+            ch.setFormatter(f)
             cls.__logger.addHandler(ch)
         return cls.__logger

+ 2 - 5
tests/core/_orchestrator/_dispatcher/mock_standalone_dispatcher.py

@@ -10,6 +10,7 @@
 # specific language governing permissions and limitations under the License.
 
 from concurrent.futures import Executor, Future
+from threading import Lock
 from typing import List
 
 from taipy.core import Job
@@ -39,9 +40,9 @@ class MockStandaloneDispatcher(_StandaloneJobDispatcher):
         super(_StandaloneJobDispatcher, self).__init__(orchestrator)
         self._executor: Executor = MockProcessPoolExecutor()
         self._nb_available_workers = 1
+        self._nb_available_workers_lock = Lock()
 
         self.dispatch_calls: List = []
-        self.release_worker_calls: List = []
         self.update_job_status_from_future_calls: List = []
 
     def mock_exception_for_job(self, task_id, e: Exception):
@@ -51,10 +52,6 @@ class MockStandaloneDispatcher(_StandaloneJobDispatcher):
         self.dispatch_calls.append(job)
         super()._dispatch(job)
 
-    def _release_worker(self, _):
-        self.release_worker_calls.append(None)
-        super()._release_worker(_)
-
     def _update_job_status_from_future(self, job: Job, ft):
         self.update_job_status_from_future_calls.append((job, ft))
         super()._update_job_status_from_future(job, ft)

+ 2 - 13
tests/core/_orchestrator/_dispatcher/test_standalone_job_dispatcher.py

@@ -71,9 +71,6 @@ def test_dispatch_job():
     assert submit_first_call[1] == ()
     assert submit_first_call[2]["config_as_string"] == _TomlSerializer()._serialize(Config._applied_config)
 
-    # test that the worker is released after the job is done
-    assert len(dispatcher.release_worker_calls) == 1
-
     # test that the job status is updated after execution on future
     assert len(dispatcher.update_job_status_from_future_calls) == 1
     assert dispatcher.update_job_status_from_future_calls[0][0] == job
@@ -92,16 +89,6 @@ def test_can_execute():
     assert dispatcher._can_execute()
 
 
-def test_release_worker():
-    dispatcher = _StandaloneJobDispatcher(_OrchestratorFactory._orchestrator)
-
-    assert dispatcher._nb_available_workers == 2
-    dispatcher._release_worker(None)
-    assert dispatcher._nb_available_workers == 3
-    dispatcher._release_worker(None)
-    assert dispatcher._nb_available_workers == 4
-
-
 def test_update_job_status_from_future():
     task = create_task()
     job = Job(JobId("job"), task, "s_id", task.id)
@@ -109,7 +96,9 @@ def test_update_job_status_from_future():
     dispatcher = _StandaloneJobDispatcher(orchestrator)
     ft = Future()
     ft.set_result(None)
+    assert dispatcher._nb_available_workers == 2
     dispatcher._update_job_status_from_future(job, ft)
+    assert dispatcher._nb_available_workers == 3
     assert job.is_completed()
 
 

+ 1 - 1
tests/core/_orchestrator/test_orchestrator__cancel_jobs.py

@@ -59,7 +59,7 @@ def test_cancel_job_no_subsequent_jobs():
 
 def test_cancel_job_with_subsequent_blocked_jobs():
     scenario = create_scenario()
-    orchestrator = _OrchestratorFactory._build_orchestrator()
+    orchestrator = cast(_Orchestrator, _OrchestratorFactory._build_orchestrator())
     job1 = orchestrator._lock_dn_output_and_create_job(scenario.t1, "s_id", "e_id")
     job2 = orchestrator._lock_dn_output_and_create_job(scenario.t2, "s_id", "e_id")
     job3 = orchestrator._lock_dn_output_and_create_job(scenario.t3, "s_id", "e_id")

+ 3 - 0
tests/core/_orchestrator/test_orchestrator_factory.py

@@ -113,6 +113,9 @@ def test_build_unknown_dispatcher():
     with pytest.raises(ModeNotAvailable):
         _OrchestratorFactory._build_dispatcher()
 
+    Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
+    _OrchestratorFactory._build_dispatcher()
+
 
 def test_remove_dispatcher_not_built():
     _OrchestratorFactory._dispatcher = None

+ 3 - 2
tests/core/config/test_override_config.py

@@ -60,13 +60,14 @@ def test_override_default_configuration_with_code_configuration():
 def test_override_default_config_with_code_config_including_env_variable_values():
     Config.configure_core()
     assert Config.core.repository_type == "filesystem"
-    Config.configure_core(repository_type="othertype")
-    assert Config.core.repository_type == "othertype"
 
     with mock.patch.dict(os.environ, {"REPOSITORY_TYPE": "foo"}):
         Config.configure_core(repository_type="ENV[REPOSITORY_TYPE]")
         assert Config.core.repository_type == "foo"
 
+    Config.configure_core(repository_type="othertype")
+    assert Config.core.repository_type == "othertype"
+
 
 def test_override_default_configuration_with_file_configuration():
     tf = NamedTemporaryFile(

+ 9 - 8
tests/core/conftest.py

@@ -180,14 +180,9 @@ def default_multi_sheet_data_frame():
 def cleanup_files():
     yield
 
-    if os.path.exists(".data"):
-        shutil.rmtree(".data", ignore_errors=True)
-    if os.path.exists("user_data"):
-        shutil.rmtree("user_data", ignore_errors=True)
-    if os.path.exists(".taipy"):
-        shutil.rmtree(".taipy", ignore_errors=True)
-    if os.path.exists(".my_data"):
-        shutil.rmtree(".my_data", ignore_errors=True)
+    for path in [".data", ".my_data", "user_data", ".taipy"]:
+        if os.path.exists(path):
+            shutil.rmtree(path, ignore_errors=True)
 
 
 @pytest.fixture(scope="function")
@@ -331,6 +326,12 @@ def clean_repository(init_config, init_managers, init_orchestrator, init_notifie
     with patch("sys.argv", ["prog"]):
         yield
 
+    close_all_sessions()
+    init_orchestrator()
+    init_managers()
+    init_config()
+    init_notifier()
+
 
 @pytest.fixture
 def init_config(reset_configuration_singleton, inject_core_sections):

+ 1 - 9
tests/core/job/test_job_manager.py

@@ -138,15 +138,6 @@ def test_delete_job():
     assert _JobManager._get(job_1.id) is None
 
 
-m = multiprocessing.Manager()
-lock = m.Lock()
-
-
-def inner_lock_multiply(nb1: float, nb2: float):
-    with lock:
-        return multiply(1 or nb1, 2 or nb2)
-
-
 def test_raise_when_trying_to_delete_unfinished_job():
     Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE, max_nb_of_workers=2)
     m = multiprocessing.Manager()
@@ -326,6 +317,7 @@ def test_cancel_subsequent_jobs():
     orchestrator = _OrchestratorFactory._orchestrator
     submission_manager = _SubmissionManagerFactory._build_manager()
 
+    m = multiprocessing.Manager()
     lock_0 = m.Lock()
 
     dn_1 = InMemoryDataNode("dn_config_1", Scope.SCENARIO, properties={"default_data": 1})