浏览代码

feat: add execution duration as a information field in Job and Submission entities

trgiangdo 9 月之前
父节点
当前提交
808dba504d

+ 3 - 0
taipy/core/_orchestrator/_dispatcher/_development_job_dispatcher.py

@@ -9,6 +9,7 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
+import datetime
 from typing import Optional
 
 from ...job.job import Job
@@ -44,5 +45,7 @@ class _DevelopmentJobDispatcher(_JobDispatcher):
         Parameters:
             job (Job^): The job to submit on an executor with an available worker.
         """
+        job.execution_started_at = datetime.datetime.now()
         rs = _TaskFunctionWrapper(job.id, job.task).execute()
         self._update_job_status(job, rs)
+        job.execution_ended_at = datetime.datetime.now()

+ 4 - 0
taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py

@@ -9,6 +9,7 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
+import datetime
 import multiprocessing as mp
 from concurrent.futures import Executor, ProcessPoolExecutor
 from functools import partial
@@ -59,6 +60,8 @@ class _StandaloneJobDispatcher(_JobDispatcher):
             self._nb_available_workers -= 1
             self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the dispatch method.")
         config_as_string = _TomlSerializer()._serialize(Config._applied_config)  # type: ignore[attr-defined]
+
+        job.execution_started_at = datetime.datetime.now()
         future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string)
         future.add_done_callback(partial(self._update_job_status_from_future, job))
 
@@ -67,3 +70,4 @@ class _StandaloneJobDispatcher(_JobDispatcher):
             self._nb_available_workers += 1
             self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the callback method.")
         self._update_job_status(job, ft.result())
+        job.execution_ended_at = datetime.datetime.now()

+ 6 - 0
taipy/core/job/_job_converter.py

@@ -31,6 +31,8 @@ class _JobConverter(_AbstractConverter):
             job.submit_id,
             job.submit_entity_id,
             job._creation_date.isoformat(),
+            job._execution_started_at.isoformat() if job._execution_started_at else None,
+            job._execution_ended_at.isoformat() if job._execution_ended_at else None,
             cls.__serialize_subscribers(job._subscribers),
             job._stacktrace,
             version=job._version,
@@ -52,6 +54,10 @@ class _JobConverter(_AbstractConverter):
         job._status = model.status  # type: ignore
         job._force = model.force  # type: ignore
         job._creation_date = datetime.fromisoformat(model.creation_date)  # type: ignore
+        job._execution_started_at = (
+            datetime.fromisoformat(model.execution_started_at) if model.execution_started_at else None
+        )
+        job._execution_ended_at = datetime.fromisoformat(model.execution_ended_at) if model.execution_ended_at else None
         for it in model.subscribers:
             try:
                 fct_module, fct_name = it.get("fct_module"), it.get("fct_name")

+ 6 - 0
taipy/core/job/_job_model.py

@@ -26,6 +26,8 @@ class _JobModel(_BaseModel):
     submit_id: str
     submit_entity_id: str
     creation_date: str
+    execution_started_at: str
+    execution_ended_at: str
     subscribers: List[Dict]
     stacktrace: List[str]
     version: str
@@ -40,6 +42,8 @@ class _JobModel(_BaseModel):
             submit_id=data["submit_id"],
             submit_entity_id=data["submit_entity_id"],
             creation_date=data["creation_date"],
+            execution_started_at=data["execution_started_at"],
+            execution_ended_at=data["execution_ended_at"],
             subscribers=_BaseModel._deserialize_attribute(data["subscribers"]),
             stacktrace=_BaseModel._deserialize_attribute(data["stacktrace"]),
             version=data["version"],
@@ -54,6 +58,8 @@ class _JobModel(_BaseModel):
             self.submit_id,
             self.submit_entity_id,
             self.creation_date,
+            self.execution_started_at,
+            self.execution_ended_at,
             _BaseModel._serialize_attribute(self.subscribers),
             _BaseModel._serialize_attribute(self.stacktrace),
             self.version,

+ 35 - 0
taipy/core/job/job.py

@@ -78,6 +78,8 @@ class Job(_Entity, _Labeled):
         self._creation_date = datetime.now()
         self._submit_id: str = submit_id
         self._submit_entity_id: str = submit_entity_id
+        self._execution_started_at: Optional[datetime] = None
+        self._execution_ended_at: Optional[datetime] = None
         self._subscribers: List[Callable] = []
         self._stacktrace: List[str] = []
         self.__logger = _TaipyLogger._get_logger()
@@ -144,6 +146,39 @@ class Job(_Entity, _Labeled):
     def creation_date(self, val):
         self._creation_date = val
 
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def execution_started_at(self) -> Optional[datetime]:
+        return self._execution_started_at
+
+    @execution_started_at.setter
+    @_self_setter(_MANAGER_NAME)
+    def execution_started_at(self, val):
+        self._execution_started_at = val
+
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def execution_ended_at(self) -> Optional[datetime]:
+        return self._execution_ended_at
+
+    @execution_ended_at.setter
+    @_self_setter(_MANAGER_NAME)
+    def execution_ended_at(self, val):
+        self._execution_ended_at = val
+
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def execution_duration(self) -> Optional[float]:
+        """Get the duration of the job execution in seconds.
+
+        Returns:
+            Optional[float]: The duration of the job execution in seconds. If the job is not
+            completed, None is returned.
+        """
+        if self._execution_started_at and self._execution_ended_at:
+            return (self._execution_ended_at - self._execution_started_at).total_seconds()
+        return None
+
     @property  # type: ignore
     @_self_reload(_MANAGER_NAME)
     def stacktrace(self) -> List[str]:

+ 27 - 0
taipy/core/submission/submission.py

@@ -138,6 +138,33 @@ class Submission(_Entity, _Labeled):
     def creation_date(self):
         return self._creation_date
 
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def execution_started_at(self) -> Optional[datetime]:
+        if all(job.execution_started_at is not None for job in self.jobs):
+            return min(job.execution_started_at for job in self.jobs)
+        return None
+
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def execution_ended_at(self) -> Optional[datetime]:
+        if all(job.execution_ended_at is not None for job in self.jobs):
+            return max(job.execution_ended_at for job in self.jobs)
+        return None
+
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def execution_duration(self) -> Optional[float]:
+        """Get the duration of the submission in seconds.
+
+        Returns:
+            Optional[float]: The duration of the submission in seconds. If the job is not
+            completed, None is returned.
+        """
+        if self.execution_started_at and self.execution_ended_at:
+            return (self.execution_ended_at - self.execution_started_at).total_seconds()
+        return None
+
     def get_label(self) -> str:
         """Returns the submission simple label prefixed by its owner label.
 

+ 74 - 1
tests/core/_orchestrator/test_orchestrator__submit.py

@@ -10,6 +10,7 @@
 # specific language governing permissions and limitations under the License.
 
 from datetime import datetime, timedelta
+from time import sleep
 from unittest import mock
 
 import freezegun
@@ -17,7 +18,7 @@ import pytest
 
 from taipy import Scenario, Scope, Task
 from taipy.config import Config
-from taipy.core import taipy
+from taipy.core import Core, taipy
 from taipy.core._orchestrator._orchestrator import _Orchestrator
 from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
 from taipy.core.config import JobConfig
@@ -27,6 +28,7 @@ from taipy.core.scenario._scenario_manager import _ScenarioManager
 from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
 from taipy.core.submission.submission_status import SubmissionStatus
 from taipy.core.task._task_manager import _TaskManager
+from tests.core.utils import assert_true_after_time
 
 
 def nothing(*args, **kwargs):
@@ -53,6 +55,7 @@ def test_submit_scenario_development_mode():
     scenario = create_scenario()
     scenario.dn_0.write(0)  # input data is made ready
     orchestrator = _OrchestratorFactory._build_orchestrator()
+    _OrchestratorFactory._build_dispatcher()
 
     submit_time = datetime.now() + timedelta(seconds=1)  # +1 to ensure the edit time of dn_0 is before the submit time
     with freezegun.freeze_time(submit_time):
@@ -505,3 +508,73 @@ def test_submit_submittable_generate_unique_submit_id():
     assert jobs_1[0].submit_id == jobs_1[1].submit_id
     assert jobs_2[0].submit_id == jobs_2[1].submit_id
     assert jobs_1[0].submit_id != jobs_2[0].submit_id
+
+
+def task_sleep_1():
+    sleep(1)
+
+
+def task_sleep_2():
+    sleep(2)
+    return
+
+
+def test_submit_duration_development_mode():
+    core = Core()
+    core.run()
+
+    task_1 = Task("task_config_id_1", {}, task_sleep_1, [], [])
+    task_2 = Task("task_config_id_2", {}, task_sleep_2, [], [])
+
+    _TaskManager._set(task_1)
+    _TaskManager._set(task_2)
+
+    scenario = Scenario("scenario", {task_1, task_2}, {})
+    _ScenarioManager._set(scenario)
+    submission = taipy.submit(scenario)
+    jobs = submission.jobs
+    core.stop()
+
+    assert all(isinstance(job.execution_started_at, datetime) for job in jobs)
+    assert all(isinstance(job.execution_ended_at, datetime) for job in jobs)
+    jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
+    jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
+    assert jobs_1s.execution_duration >= 1
+    assert jobs_2s.execution_duration >= 2
+
+    assert submission.execution_duration >= 3
+    assert submission.execution_started_at == min(jobs_1s.execution_started_at, jobs_2s.execution_started_at)
+    assert submission.execution_ended_at == max(jobs_1s.execution_ended_at, jobs_2s.execution_ended_at)
+
+
+@pytest.mark.standalone
+def test_submit_duration_standalone_mode():
+    Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE)
+    core = Core()
+    core.run()
+
+    task_1 = Task("task_config_id_1", {}, task_sleep_1, [], [])
+    task_2 = Task("task_config_id_2", {}, task_sleep_2, [], [])
+
+    _TaskManager._set(task_1)
+    _TaskManager._set(task_2)
+
+    scenario = Scenario("scenario", {task_1, task_2}, {})
+    _ScenarioManager._set(scenario)
+    submission = taipy.submit(scenario)
+    jobs = submission.jobs
+
+    assert_true_after_time(jobs[1].is_completed)
+
+    core.stop()
+
+    assert all(isinstance(job.execution_started_at, datetime) for job in jobs)
+    assert all(isinstance(job.execution_ended_at, datetime) for job in jobs)
+    jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
+    jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
+    assert jobs_1s.execution_duration >= 1
+    assert jobs_2s.execution_duration >= 2
+
+    assert submission.execution_duration >= 2  # Both tasks are executed in parallel so the duration may smaller than 3
+    assert submission.execution_started_at == min(jobs_1s.execution_started_at, jobs_2s.execution_started_at)
+    assert submission.execution_ended_at == max(jobs_1s.execution_ended_at, jobs_2s.execution_ended_at)

+ 28 - 0
tests/core/submission/test_submission.py

@@ -903,3 +903,31 @@ def test_is_finished():
     submission.submission_status = SubmissionStatus.COMPLETED
     assert submission.submission_status == SubmissionStatus.COMPLETED
     assert submission.is_finished()
+
+
+def test_execution_duration():
+    task = Task(config_id="task_1", properties={}, function=print, id=TaskId("task_1"))
+    submission = Submission(task.id, task._ID_PREFIX, task.config_id, properties={})
+    job_1 = Job("job_1", task, submission.id, submission.entity_id)
+    job_2 = Job("job_2", task, submission.id, submission.entity_id)
+
+    _TaskManagerFactory._build_manager()._set(task)
+    _SubmissionManagerFactory._build_manager()._set(submission)
+    _JobManagerFactory._build_manager()._set(job_1)
+    _JobManagerFactory._build_manager()._set(job_2)
+
+    submission.jobs = [job_1, job_2]
+    _SubmissionManagerFactory._build_manager()._set(submission)
+
+    job_1.execution_started_at = datetime(2024, 1, 1, 0, 0, 0)
+    job_1.execution_ended_at = datetime(2024, 1, 1, 0, 0, 10)
+    job_2.execution_started_at = datetime(2024, 1, 1, 0, 1, 0)
+    job_2.execution_ended_at = datetime(2024, 1, 1, 0, 2, 30)
+    assert submission.execution_started_at == job_1.execution_started_at
+    assert submission.execution_ended_at == job_2.execution_ended_at
+    assert submission.execution_duration == 150
+
+    job_2.execution_ended_at = None  # job_2 is still running
+    assert submission.execution_started_at == job_1.execution_started_at
+    assert submission.execution_ended_at is None
+    assert submission.execution_duration is None