فهرست منبع

add tests for submit_task

jrobinAV 1 سال پیش
والد
کامیت
e938933abf
3فایلهای تغییر یافته به همراه224 افزوده شده و 19 حذف شده
  1. 1 0
      Pipfile
  2. 15 19
      taipy/core/_orchestrator/_orchestrator.py
  3. 208 0
      tests/core/_orchestrator/test_orchestrator__submit_task.py

+ 1 - 0
Pipfile

@@ -40,6 +40,7 @@ autopep8 = "*"
 black = "*"
 flake8 = "*"
 flake8-docstrings = "*"
+freezegun = "*"
 ipython = "*"
 ipykernel = "*"
 isort = "*"

+ 15 - 19
taipy/core/_orchestrator/_orchestrator.py

@@ -18,16 +18,14 @@ from typing import Callable, Iterable, List, Optional, Set, Union
 
 from taipy.config.config import Config
 from taipy.logger._taipy_logger import _TaipyLogger
-
+from ._abstract_orchestrator import _AbstractOrchestrator
 from .._entity.submittable import Submittable
 from ..data._data_manager_factory import _DataManagerFactory
 from ..job._job_manager_factory import _JobManagerFactory
 from ..job.job import Job
 from ..job.job_id import JobId
-from ..scenario.scenario import Scenario
 from ..submission._submission_manager_factory import _SubmissionManagerFactory
 from ..task.task import Task
-from ._abstract_orchestrator import _AbstractOrchestrator
 
 
 class _Orchestrator(_AbstractOrchestrator):
@@ -96,7 +94,7 @@ class _Orchestrator(_AbstractOrchestrator):
             cls._check_and_execute_jobs_if_development_mode()
         else:
             if wait:
-                cls.__wait_until_job_finished(jobs, timeout=timeout)
+                cls._wait_until_job_finished(jobs, timeout=timeout)
 
         return jobs
 
@@ -113,7 +111,6 @@ class _Orchestrator(_AbstractOrchestrator):
 
         Parameters:
              task (Task^): The task to submit for execution.
-             submit_id (str): The optional id to differentiate each submission.
              callbacks: The optional list of functions that should be executed on job status change.
              force (bool): Enforce execution of the task even if its output data nodes are cached.
              wait (bool): Wait for the orchestrated job created from the task submission to be finished
@@ -143,7 +140,7 @@ class _Orchestrator(_AbstractOrchestrator):
             cls._check_and_execute_jobs_if_development_mode()
         else:
             if wait:
-                cls.__wait_until_job_finished(job, timeout=timeout)
+                cls._wait_until_job_finished(job, timeout=timeout)
 
         return job
 
@@ -182,23 +179,22 @@ class _Orchestrator(_AbstractOrchestrator):
             cls.jobs_to_run.put(job)
 
     @classmethod
-    def __wait_until_job_finished(cls, jobs: Union[List[Job], Job], timeout: Optional[Union[float, int]] = None):
-        def __check_if_timeout(start, timeout):
-            if timeout:
-                return (datetime.now() - start).seconds < timeout
+    def _wait_until_job_finished(cls, jobs: Union[List[Job], Job], timeout: Optional[Union[float, int]] = None):
+        #  Note: this method should be prefixed by two underscores, but it has only one, so it can be mocked in tests.
+        def __check_if_timeout(st, to):
+            if to:
+                return (datetime.now() - st).seconds < to
             return True
 
         start = datetime.now()
         jobs = jobs if isinstance(jobs, Iterable) else [jobs]
         index = 0
-
         while __check_if_timeout(start, timeout) and index < len(jobs):
             try:
                 if jobs[index]._is_finished():
                     index = index + 1
                 else:
                     sleep(0.5)  # Limit CPU usage
-
             except Exception:
                 pass
 
@@ -217,7 +213,7 @@ class _Orchestrator(_AbstractOrchestrator):
         return any(not data_manager._get(dn.id).is_ready_for_reading for dn in input_data_nodes)
 
     @staticmethod
-    def __unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]):
+    def _unlock_edit_on_jobs_outputs(jobs: Union[Job, List[Job], Set[Job]]):
         jobs = [jobs] if isinstance(jobs, Job) else jobs
         for job in jobs:
             job._unlock_edit_on_outputs()
@@ -227,7 +223,7 @@ class _Orchestrator(_AbstractOrchestrator):
         if job.is_completed() or job.is_skipped():
             cls.__unblock_jobs()
         elif job.is_failed():
-            cls.__fail_subsequent_jobs(job)
+            cls._fail_subsequent_jobs(job)
 
     @classmethod
     def __unblock_jobs(cls):
@@ -259,8 +255,8 @@ class _Orchestrator(_AbstractOrchestrator):
                 to_cancel_or_abandon_jobs.update(cls.__find_subsequent_jobs(job.submit_id, set(job.task.output.keys())))
                 cls.__remove_blocked_jobs(to_cancel_or_abandon_jobs)
                 cls.__remove_jobs_to_run(to_cancel_or_abandon_jobs)
-                cls.__cancel_jobs(job.id, to_cancel_or_abandon_jobs)
-                cls.__unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs)
+                cls._cancel_jobs(job.id, to_cancel_or_abandon_jobs)
+                cls._unlock_edit_on_jobs_outputs(to_cancel_or_abandon_jobs)
 
     @classmethod
     def __find_subsequent_jobs(cls, submit_id, output_dn_config_ids: Set) -> Set[Job]:
@@ -292,7 +288,7 @@ class _Orchestrator(_AbstractOrchestrator):
         cls.jobs_to_run = new_jobs_to_run
 
     @classmethod
-    def __fail_subsequent_jobs(cls, failed_job: Job):
+    def _fail_subsequent_jobs(cls, failed_job: Job):
         with cls.lock:
             to_fail_or_abandon_jobs = set()
             to_fail_or_abandon_jobs.update(
@@ -303,10 +299,10 @@ class _Orchestrator(_AbstractOrchestrator):
             to_fail_or_abandon_jobs.update([failed_job])
             cls.__remove_blocked_jobs(to_fail_or_abandon_jobs)
             cls.__remove_jobs_to_run(to_fail_or_abandon_jobs)
-            cls.__unlock_edit_on_jobs_outputs(to_fail_or_abandon_jobs)
+            cls._unlock_edit_on_jobs_outputs(to_fail_or_abandon_jobs)
 
     @classmethod
-    def __cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]):
+    def _cancel_jobs(cls, job_id_to_cancel: JobId, jobs: Set[Job]):
         from ._orchestrator_factory import _OrchestratorFactory
 
         for job in jobs:

+ 208 - 0
tests/core/_orchestrator/test_orchestrator__submit_task.py

@@ -0,0 +1,208 @@
+# Copyright 2023 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from datetime import datetime
+import freezegun
+
+from taipy.config import Config
+from taipy.core import taipy
+from taipy.core._orchestrator._orchestrator import _Orchestrator
+from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
+from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
+from taipy.core.submission.submission_status import SubmissionStatus
+
+
+def nothing(*args, **kwargs):
+    pass
+
+
+def create_scenario():
+    # dn_0 --> t1 --> dn_1 --> t2 --> dn_2 --> t3 --> dn_3
+    #                  \
+    #                   \--> t2_bis
+    dn_0 = Config.configure_data_node("dn_0", default_data=0)
+    dn_1 = Config.configure_data_node("dn_1")
+    dn_2 = Config.configure_data_node("dn_2")
+    dn_3 = Config.configure_data_node("dn_3")
+    t1 = Config.configure_task("t1", nothing, [dn_0], [dn_1], skippable=True)
+    t2 = Config.configure_task("t2", nothing, [dn_1], [dn_2])
+    t3 = Config.configure_task("t3", nothing, [dn_2], [dn_3])
+    t2_bis = Config.configure_task("t2bis", nothing, [dn_1], [])
+    sc_conf = Config.configure_scenario("scenario", [t1, t2, t3, t2_bis])
+    return taipy.create_scenario(sc_conf)
+
+
+def test_submit_task_development_mode():
+    scenario = create_scenario()
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+
+    submit_time = datetime.now()
+    with freezegun.freeze_time(submit_time):
+        job = orchestrator.submit_task(scenario.t1)  # t1 is executed directly in development mode
+
+    # task output should have been written
+    assert scenario.dn_1.last_edit_date == submit_time
+
+    # job exists and is correct
+    assert job.task == scenario.t1
+    assert not job.force
+    assert job.is_completed()
+    assert job.submit_entity_id == scenario.t1.id
+    assert job.creation_date == submit_time
+    assert job.stacktrace == []
+    assert len(job._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+
+    # submission is created and correct
+    all_submissions = _SubmissionManagerFactory._build_manager()._get_all()
+    assert len(all_submissions) == 1
+    assert all_submissions[0].creation_date == submit_time
+    assert all_submissions[0].submission_status == SubmissionStatus.COMPLETED
+    assert all_submissions[0].jobs == [job]
+    assert all_submissions[0].entity_id == scenario.t1.id
+    assert all_submissions[0].entity_type == "TASK"
+    assert all_submissions[0].entity_config_id == "t1"
+
+    # orchestrator state is correct
+    assert len(orchestrator.blocked_jobs) == 0
+    assert orchestrator.jobs_to_run.qsize() == 0
+
+
+def test_submit_task_development_mode_blocked_job():
+    scenario = create_scenario()
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+
+    submit_time = datetime.now()
+    with freezegun.freeze_time(submit_time):
+        job = orchestrator.submit_task(scenario.t2)  # t1 is executed directly in development mode
+
+    # task output should have been written
+    assert scenario.dn_2.edit_in_progress
+
+    # job exists and is correct
+    assert job.task == scenario.t2
+    assert not job.force
+    assert job.is_blocked()  # input data is not ready
+    assert job.submit_entity_id == scenario.t2.id
+    assert job.creation_date == submit_time
+    assert len(job._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert job.stacktrace == []
+
+    # submission is created and correct
+    submission = _SubmissionManagerFactory._build_manager()._get(job.submit_id)
+    assert submission.submission_status == SubmissionStatus.BLOCKED
+    assert submission.creation_date == submit_time
+    assert submission.jobs == [job]
+    assert submission.entity_id == scenario.t2.id
+    assert submission.entity_type == "TASK"
+    assert submission.entity_config_id == "t2"
+
+    # orchestrator state is correct
+    assert len(orchestrator.blocked_jobs) == 1
+    assert orchestrator.jobs_to_run.qsize() == 0
+
+
+def test_submit_task_standalone_mode():
+    Config.configure_job_executions(mode="standalone")
+    sc = create_scenario()
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+
+    submit_time = datetime.now()
+    with freezegun.freeze_time(submit_time):
+        job = orchestrator.submit_task(sc.t1)  # No dispatcher running. t1 is not executed in standalone mode.
+
+    # task output should NOT have been written
+    assert sc.dn_1.last_edit_date is None
+
+    # task output should be locked for edition
+    assert sc.dn_1.edit_in_progress
+
+    # job exists and is correct
+    assert job.creation_date == submit_time
+    assert job.task == sc.t1
+    assert not job.force
+    assert job.is_pending()
+    assert job.submit_entity_id == sc.t1.id
+    assert len(job._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert job.stacktrace == []
+
+    # submission is created and correct
+    submission = _SubmissionManagerFactory._build_manager()._get(job.submit_id)
+    assert submission.creation_date == submit_time
+    assert submission.submission_status == SubmissionStatus.PENDING
+    assert submission.jobs == [job]
+    assert submission.entity_id == sc.t1.id
+    assert submission.entity_type == "TASK"
+    assert submission.entity_config_id == "t1"
+
+    # orchestrator state is correct
+    assert len(orchestrator.blocked_jobs) == 0
+    assert orchestrator.jobs_to_run.qsize() == 1
+
+
+def test_submit_task_standalone_mode_blocked_job():
+    Config.configure_job_executions(mode="standalone")
+    sc = create_scenario()
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+
+    submit_time = datetime.now()
+    with freezegun.freeze_time(submit_time):
+        job = orchestrator.submit_task(sc.t2)  # No dispatcher running. t2 is not executed in standalone mode.
+
+    # task output should NOT have been written
+    assert sc.dn_2.last_edit_date == None
+
+    # task output should be locked for edition
+    assert sc.dn_2.edit_in_progress
+
+    # job exists and is correct
+    assert job.creation_date == submit_time
+    assert job.task == sc.t2
+    assert not job.force
+    assert job.is_blocked()  # input data is not ready
+    assert job.stacktrace == []
+    assert len(job._subscribers) == 2  # submission._update_submission_status and orchestrator._on_status_change
+    assert job.submit_entity_id == sc.t2.id
+
+    # submission is created and correct
+    submission = _SubmissionManagerFactory._build_manager()._get(job.submit_id)
+    assert submission.creation_date == submit_time
+    assert submission.submission_status == SubmissionStatus.BLOCKED
+    assert submission.jobs == [job]
+    assert submission.entity_id == sc.t2.id
+    assert submission.entity_type == "TASK"
+    assert submission.entity_config_id == "t2"
+
+    # orchestrator state is correct
+    assert len(orchestrator.blocked_jobs) == 1
+    assert orchestrator.jobs_to_run.qsize() == 0
+
+
+def test_submit_task_with_callbacks_and_force_and_wait():
+    Config.configure_job_executions(mode="standalone")
+    scenario = create_scenario()
+    orchestrator = _OrchestratorFactory._build_orchestrator()
+
+    # Mock the wait function
+    mock_is_called = []
+
+    def mock(job, timeout):
+        mock_is_called.append((job, timeout))
+
+    orchestrator._wait_until_job_finished = mock
+
+    job = orchestrator.submit_task(scenario.t1, callbacks=[nothing], force=True, wait=True, timeout=2)
+
+    # job exists and is correct
+    assert job.task == scenario.t1
+    assert job.force
+    assert len(job._subscribers) == 3  # nothing, _update_submission_status, and _on_status_change
+    assert len(mock_is_called) == 1
+    assert mock_is_called[0][0] == job
+    assert mock_is_called[0][1] == 2