浏览代码

Merge pull request #1837 from Avaiga/feature/#1704-storing-job-status-change-timestamp

Feature/#1704 - Store the status changes in a records
Đỗ Trường Giang 7 月之前
父节点
当前提交
15359f5a41

+ 0 - 3
taipy/core/_orchestrator/_dispatcher/_development_job_dispatcher.py

@@ -9,7 +9,6 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
-import datetime
 from typing import Optional
 
 from ...job.job import Job
@@ -45,7 +44,5 @@ class _DevelopmentJobDispatcher(_JobDispatcher):
         Parameters:
             job (Job^): The job to submit on an executor with an available worker.
         """
-        job.execution_started_at = datetime.datetime.now()
         rs = _TaskFunctionWrapper(job.id, job.task).execute()
         self._update_job_status(job, rs)
-        job.execution_ended_at = datetime.datetime.now()

+ 0 - 3
taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py

@@ -9,7 +9,6 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
-import datetime
 import multiprocessing as mp
 from concurrent.futures import Executor, ProcessPoolExecutor
 from functools import partial
@@ -61,7 +60,6 @@ class _StandaloneJobDispatcher(_JobDispatcher):
             self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the dispatch method.")
         config_as_string = _TomlSerializer()._serialize(Config._applied_config)  # type: ignore[attr-defined]
 
-        job.execution_started_at = datetime.datetime.now()
         future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string)
         future.add_done_callback(partial(self._update_job_status_from_future, job))
 
@@ -70,4 +68,3 @@ class _StandaloneJobDispatcher(_JobDispatcher):
             self._nb_available_workers += 1
             self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the callback method.")
         self._update_job_status(job, ft.result())
-        job.execution_ended_at = datetime.datetime.now()

+ 4 - 6
taipy/core/job/_job_converter.py

@@ -27,12 +27,11 @@ class _JobConverter(_AbstractConverter):
             job.id,
             job._task.id,
             job._status,
+            {status: timestamp.isoformat() for status, timestamp in job._status_change_records.items()},
             job._force,
             job.submit_id,
             job.submit_entity_id,
             job._creation_date.isoformat(),
-            job._execution_started_at.isoformat() if job._execution_started_at else None,
-            job._execution_ended_at.isoformat() if job._execution_ended_at else None,
             cls.__serialize_subscribers(job._subscribers),
             job._stacktrace,
             version=job._version,
@@ -52,12 +51,11 @@ class _JobConverter(_AbstractConverter):
         )
 
         job._status = model.status  # type: ignore
+        job._status_change_records = {
+            status: datetime.fromisoformat(timestamp) for status, timestamp in model.status_change_records.items()
+        }
         job._force = model.force  # type: ignore
         job._creation_date = datetime.fromisoformat(model.creation_date)  # type: ignore
-        job._execution_started_at = (
-            datetime.fromisoformat(model.execution_started_at) if model.execution_started_at else None
-        )
-        job._execution_ended_at = datetime.fromisoformat(model.execution_ended_at) if model.execution_ended_at else None
         for it in model.subscribers:
             try:
                 fct_module, fct_name = it.get("fct_module"), it.get("fct_name")

+ 4 - 7
taipy/core/job/_job_model.py

@@ -10,7 +10,7 @@
 # specific language governing permissions and limitations under the License.
 
 from dataclasses import dataclass
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List
 
 from .._repository._base_taipy_model import _BaseModel
 from .job_id import JobId
@@ -22,12 +22,11 @@ class _JobModel(_BaseModel):
     id: JobId
     task_id: str
     status: Status
+    status_change_records: Dict[str, str]
     force: bool
     submit_id: str
     submit_entity_id: str
     creation_date: str
-    execution_started_at: Optional[str]
-    execution_ended_at: Optional[str]
     subscribers: List[Dict]
     stacktrace: List[str]
     version: str
@@ -38,12 +37,11 @@ class _JobModel(_BaseModel):
             id=data["id"],
             task_id=data["task_id"],
             status=Status._from_repr(data["status"]),
+            status_change_records=_BaseModel._deserialize_attribute(data["status_change_records"]),
             force=data["force"],
             submit_id=data["submit_id"],
             submit_entity_id=data["submit_entity_id"],
             creation_date=data["creation_date"],
-            execution_started_at=data["execution_started_at"],
-            execution_ended_at=data["execution_ended_at"],
             subscribers=_BaseModel._deserialize_attribute(data["subscribers"]),
             stacktrace=_BaseModel._deserialize_attribute(data["stacktrace"]),
             version=data["version"],
@@ -54,12 +52,11 @@ class _JobModel(_BaseModel):
             self.id,
             self.task_id,
             repr(self.status),
+            _BaseModel._serialize_attribute(self.status_change_records),
             self.force,
             self.submit_id,
             self.submit_entity_id,
             self.creation_date,
-            self.execution_started_at,
-            self.execution_ended_at,
             _BaseModel._serialize_attribute(self.subscribers),
             _BaseModel._serialize_attribute(self.stacktrace),
             self.version,

+ 99 - 22
taipy/core/job/job.py

@@ -12,7 +12,7 @@
 __all__ = ["Job"]
 
 from datetime import datetime
-from typing import TYPE_CHECKING, Any, Callable, List, Optional
+from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
 
 from taipy.logger._taipy_logger import _TaipyLogger
 
@@ -49,8 +49,8 @@ class Job(_Entity, _Labeled):
 
     Every time a task is submitted for execution, a new *Job* is created. A job represents a
     single execution of a task. It holds all the information related to the task execution,
-    including the **creation date**, the execution `Status^`, and the **stacktrace** of any
-    exception that may be raised by the user function.
+    including the **creation date**, the execution `Status^`, the timestamp of status changes,
+    and the **stacktrace** of any exception that may be raised by the user function.
 
     In addition, a job notifies scenario or sequence subscribers on its status change.
 
@@ -78,8 +78,7 @@ class Job(_Entity, _Labeled):
         self._creation_date = datetime.now()
         self._submit_id: str = submit_id
         self._submit_entity_id: str = submit_entity_id
-        self._execution_started_at: Optional[datetime] = None
-        self._execution_ended_at: Optional[datetime] = None
+        self._status_change_records: Dict[str, datetime] = {"SUBMITTED": self._creation_date}
         self._subscribers: List[Callable] = []
         self._stacktrace: List[str] = []
         self.__logger = _TaipyLogger._get_logger()
@@ -134,6 +133,7 @@ class Job(_Entity, _Labeled):
     @status.setter  # type: ignore
     @_self_setter(_MANAGER_NAME)
     def status(self, val):
+        self._status_change_records[val.name] = datetime.now()
         self._status = val
 
     @property  # type: ignore
@@ -148,36 +148,113 @@ class Job(_Entity, _Labeled):
 
     @property
     @_self_reload(_MANAGER_NAME)
-    def execution_started_at(self) -> Optional[datetime]:
-        return self._execution_started_at
+    def submitted_at(self) -> datetime:
+        """Get the date time when the job was submitted.
 
-    @execution_started_at.setter
-    @_self_setter(_MANAGER_NAME)
-    def execution_started_at(self, val):
-        self._execution_started_at = val
+        Returns:
+            datetime: The date time when the job was submitted.
+        """
+        return self._status_change_records["SUBMITTED"]
 
     @property
     @_self_reload(_MANAGER_NAME)
-    def execution_ended_at(self) -> Optional[datetime]:
-        return self._execution_ended_at
+    def run_at(self) -> Optional[datetime]:
+        """Get the date time when the job was run.
 
-    @execution_ended_at.setter
-    @_self_setter(_MANAGER_NAME)
-    def execution_ended_at(self, val):
-        self._execution_ended_at = val
+        Returns:
+            Optional[datetime]: The date time when the job was run.
+                If the job is not run, None is returned.
+        """
+        return self._status_change_records.get(Status.RUNNING.name, None)
+
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def finished_at(self) -> Optional[datetime]:
+        """Get the date time when the job was finished.
+
+        Returns:
+            Optional[datetime]: The date time when the job was finished.
+                If the job is not finished, None is returned.
+        """
+        if self.is_finished():
+            if self.is_completed():
+                return self._status_change_records[Status.COMPLETED.name]
+            elif self.is_failed():
+                return self._status_change_records[Status.FAILED.name]
+            elif self.is_canceled():
+                return self._status_change_records[Status.CANCELED.name]
+            elif self.is_skipped():
+                return self._status_change_records[Status.SKIPPED.name]
+            elif self.is_abandoned():
+                return self._status_change_records[Status.ABANDONED.name]
+
+        return None
 
     @property
     @_self_reload(_MANAGER_NAME)
     def execution_duration(self) -> Optional[float]:
         """Get the duration of the job execution in seconds.
+        The execution time is the duration from the job running to the job completion.
 
         Returns:
-            Optional[float]: The duration of the job execution in seconds. If the job is not
-            completed, None is returned.
+            Optional[float]: The duration of the job execution in seconds.
+                - If the job was not run, None is returned.
+                - If the job is not finished, the execution time is the duration
+                  from the running time to the current time.
         """
-        if self._execution_started_at and self._execution_ended_at:
-            return (self._execution_ended_at - self._execution_started_at).total_seconds()
-        return None
+        if Status.RUNNING.name not in self._status_change_records:
+            return None
+
+        if self.is_finished():
+            return (self.finished_at - self._status_change_records[Status.RUNNING.name]).total_seconds()
+
+        return (datetime.now() - self._status_change_records[Status.RUNNING.name]).total_seconds()
+
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def pending_duration(self) -> Optional[float]:
+        """Get the duration of the job in the pending state in seconds.
+
+        Returns:
+            Optional[float]: The duration of the job in the pending state in seconds.
+                - If the job is not running, None is returned.
+                - If the job is not pending, the pending time is the duration
+                  from the submission to the current time.
+        """
+        if Status.PENDING.name not in self._status_change_records:
+            return None
+
+        if self.is_finished() or self.is_running():
+            return (
+                self._status_change_records[Status.RUNNING.name] - self._status_change_records[Status.PENDING.name]
+            ).total_seconds()
+
+        return (datetime.now() - self._status_change_records[Status.PENDING.name]).total_seconds()
+
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def blocked_duration(self) -> Optional[float]:
+        """Get the duration of the job in the blocked state in seconds.
+
+        Returns:
+            Optional[float]: The duration of the job in the blocked state in seconds.
+                - If the job is not running, None is returned.
+                - If the job is not blocked, the blocked time is the duration
+                  from the submission to the current time.
+        """
+        if Status.BLOCKED.name not in self._status_change_records:
+            return None
+
+        if Status.PENDING.name in self._status_change_records:
+            return (
+                self._status_change_records[Status.PENDING.name] - self._status_change_records[Status.BLOCKED.name]
+            ).total_seconds()
+        if self.is_finished():
+            return (self.finished_at - self._status_change_records[Status.BLOCKED.name]).total_seconds()
+
+        # If pending time is not recorded, and the job is not finished, the only possible status left is blocked
+        # which means the current status is blocked.
+        return (datetime.now() - self._status_change_records[Status.BLOCKED.name]).total_seconds()
 
     @property  # type: ignore
     @_self_reload(_MANAGER_NAME)

+ 0 - 27
taipy/core/submission/submission.py

@@ -139,33 +139,6 @@ class Submission(_Entity, _Labeled):
     def creation_date(self):
         return self._creation_date
 
-    @property
-    @_self_reload(_MANAGER_NAME)
-    def execution_started_at(self) -> Optional[datetime]:
-        if all(job.execution_started_at is not None for job in self.jobs):
-            return min(job.execution_started_at for job in self.jobs)
-        return None
-
-    @property
-    @_self_reload(_MANAGER_NAME)
-    def execution_ended_at(self) -> Optional[datetime]:
-        if all(job.execution_ended_at is not None for job in self.jobs):
-            return max(job.execution_ended_at for job in self.jobs)
-        return None
-
-    @property
-    @_self_reload(_MANAGER_NAME)
-    def execution_duration(self) -> Optional[float]:
-        """Get the duration of the submission in seconds.
-
-        Returns:
-            Optional[float]: The duration of the submission in seconds. If the job is not
-            completed, None is returned.
-        """
-        if self.execution_started_at and self.execution_ended_at:
-            return (self.execution_ended_at - self.execution_started_at).total_seconds()
-        return None
-
     def get_label(self) -> str:
         """Returns the submission simple label prefixed by its owner label.
 

+ 9 - 15
tests/core/_orchestrator/test_orchestrator__submit.py

@@ -535,17 +535,14 @@ def test_submit_duration_development_mode():
     jobs = submission.jobs
     orchestrator.stop()
 
-    assert all(isinstance(job.execution_started_at, datetime) for job in jobs)
-    assert all(isinstance(job.execution_ended_at, datetime) for job in jobs)
+    assert all(isinstance(job.submitted_at, datetime) for job in jobs)
+    assert all(isinstance(job.run_at, datetime) for job in jobs)
+    assert all(isinstance(job.finished_at, datetime) for job in jobs)
     jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
     jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
     assert jobs_1s.execution_duration >= 1
     assert jobs_2s.execution_duration >= 2
 
-    assert submission.execution_duration >= 3
-    assert submission.execution_started_at == min(jobs_1s.execution_started_at, jobs_2s.execution_started_at)
-    assert submission.execution_ended_at == max(jobs_1s.execution_ended_at, jobs_2s.execution_ended_at)
-
 
 @pytest.mark.standalone
 def test_submit_duration_standalone_mode():
@@ -562,19 +559,16 @@ def test_submit_duration_standalone_mode():
     scenario = Scenario("scenario", {task_1, task_2}, {})
     _ScenarioManager._set(scenario)
     submission = taipy.submit(scenario)
-    jobs = submission.jobs
-
-    assert_true_after_time(jobs[1].is_completed)
 
+    assert_true_after_time(lambda: all(job is not None and job.is_completed() for job in submission.jobs))
     orchestrator.stop()
 
-    assert all(isinstance(job.execution_started_at, datetime) for job in jobs)
-    assert all(isinstance(job.execution_ended_at, datetime) for job in jobs)
+    jobs = submission.jobs
+
+    assert all(isinstance(job.submitted_at, datetime) for job in jobs)
+    assert all(isinstance(job.run_at, datetime) for job in jobs)
+    assert all(isinstance(job.finished_at, datetime) for job in jobs)
     jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
     jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
     assert jobs_1s.execution_duration >= 1
     assert jobs_2s.execution_duration >= 2
-
-    assert submission.execution_duration >= 2  # Both tasks are executed in parallel so the duration may smaller than 3
-    assert submission.execution_started_at == min(jobs_1s.execution_started_at, jobs_2s.execution_started_at)
-    assert submission.execution_ended_at == max(jobs_1s.execution_ended_at, jobs_2s.execution_ended_at)

+ 62 - 1
tests/core/job/test_job.py

@@ -9,12 +9,13 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
-from datetime import timedelta
+from datetime import datetime, timedelta
 from time import sleep
 from typing import Union, cast
 from unittest import mock
 from unittest.mock import MagicMock
 
+import freezegun
 import pytest
 
 from taipy.config.common.scope import Scope
@@ -323,6 +324,66 @@ def test_auto_set_and_reload(current_datetime, job_id):
     assert not job_1._is_in_context
 
 
+def test_status_records(job_id):
+    task_1 = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1"))
+    submission = _SubmissionManagerFactory._build_manager()._create(task_1.id, task_1._ID_PREFIX, task_1.config_id)
+    with freezegun.freeze_time("2024-09-25 13:30:30"):
+        job_1 = Job(job_id, task_1, submission.id, "scenario_entity_id")
+    submission.jobs = [job_1]
+
+    _TaskManager._set(task_1)
+    _JobManager._set(job_1)
+
+    assert job_1._status_change_records == {"SUBMITTED": datetime(2024, 9, 25, 13, 30, 30)}
+    assert job_1.submitted_at == datetime(2024, 9, 25, 13, 30, 30)
+    assert job_1.execution_duration is None
+
+    with freezegun.freeze_time("2024-09-25 13:35:30"):
+        job_1.blocked()
+    assert job_1._status_change_records == {
+        "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
+        "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
+    }
+    assert job_1.execution_duration is None
+    with freezegun.freeze_time("2024-09-25 13:36:00"):
+        assert job_1.blocked_duration == 30  # = 13:36:00 - 13:35:30
+
+    with freezegun.freeze_time("2024-09-25 13:40:30"):
+        job_1.pending()
+    assert job_1._status_change_records == {
+        "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
+        "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
+        "PENDING": datetime(2024, 9, 25, 13, 40, 30),
+    }
+    assert job_1.execution_duration is None
+    with freezegun.freeze_time("2024-09-25 13:41:00"):
+        assert job_1.pending_duration == 30  # = 13:41:00 - 13:40:30
+
+    with freezegun.freeze_time("2024-09-25 13:50:30"):
+        job_1.running()
+    assert job_1._status_change_records == {
+        "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
+        "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
+        "PENDING": datetime(2024, 9, 25, 13, 40, 30),
+        "RUNNING": datetime(2024, 9, 25, 13, 50, 30),
+    }
+    assert job_1.run_at == datetime(2024, 9, 25, 13, 50, 30)
+    assert job_1.blocked_duration == 300  # = 13:40:30 - 13:35:30
+    assert job_1.pending_duration == 600  # = 13:50:30 - 13:40:30
+    assert job_1.execution_duration > 0
+
+    with freezegun.freeze_time("2024-09-25 13:56:35"):
+        job_1.completed()
+    assert job_1._status_change_records == {
+        "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
+        "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
+        "PENDING": datetime(2024, 9, 25, 13, 40, 30),
+        "RUNNING": datetime(2024, 9, 25, 13, 50, 30),
+        "COMPLETED": datetime(2024, 9, 25, 13, 56, 35),
+    }
+    assert job_1.execution_duration == 365  # = 13:56:35 - 13:50:30
+
+
 def test_is_deletable():
     with mock.patch("taipy.core.job._job_manager._JobManager._is_deletable") as mock_submit:
         task = Task(config_id="name_1", properties={}, function=_foo, id=TaskId("task_1"))

+ 3 - 3
tests/core/notification/test_events_published.py

@@ -178,16 +178,16 @@ def test_events_published_for_scenario_submission():
     # 1 submission update event for is_completed
     scenario.submit()
     snapshot = all_evts.capture()
-    assert len(snapshot.collected_events) == 19
+    assert len(snapshot.collected_events) == 17
     assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 0
     assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 7
     assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 0
     assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 0
     assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
-    assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 6
+    assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 4
     assert snapshot.entity_type_collected.get(EventEntityType.SUBMISSION, 0) == 5
     assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 2
-    assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 16
+    assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 14
     assert snapshot.operation_collected.get(EventOperation.SUBMISSION, 0) == 1
 
     assert snapshot.attr_name_collected["last_edit_date"] == 1

+ 0 - 28
tests/core/submission/test_submission.py

@@ -903,31 +903,3 @@ def test_is_finished():
     submission.submission_status = SubmissionStatus.COMPLETED
     assert submission.submission_status == SubmissionStatus.COMPLETED
     assert submission.is_finished()
-
-
-def test_execution_duration():
-    task = Task(config_id="task_1", properties={}, function=print, id=TaskId("task_1"))
-    submission = Submission(task.id, task._ID_PREFIX, task.config_id, properties={})
-    job_1 = Job("job_1", task, submission.id, submission.entity_id)
-    job_2 = Job("job_2", task, submission.id, submission.entity_id)
-
-    _TaskManagerFactory._build_manager()._set(task)
-    _SubmissionManagerFactory._build_manager()._set(submission)
-    _JobManagerFactory._build_manager()._set(job_1)
-    _JobManagerFactory._build_manager()._set(job_2)
-
-    submission.jobs = [job_1, job_2]
-    _SubmissionManagerFactory._build_manager()._set(submission)
-
-    job_1.execution_started_at = datetime(2024, 1, 1, 0, 0, 0)
-    job_1.execution_ended_at = datetime(2024, 1, 1, 0, 0, 10)
-    job_2.execution_started_at = datetime(2024, 1, 1, 0, 1, 0)
-    job_2.execution_ended_at = datetime(2024, 1, 1, 0, 2, 30)
-    assert submission.execution_started_at == job_1.execution_started_at
-    assert submission.execution_ended_at == job_2.execution_ended_at
-    assert submission.execution_duration == 150
-
-    job_2.execution_ended_at = None  # job_2 is still running
-    assert submission.execution_started_at == job_1.execution_started_at
-    assert submission.execution_ended_at is None
-    assert submission.execution_duration is None