Browse Source

feat: store the status changes in a dict records instead of execution_start and end

trgiangdo 7 months ago
parent
commit
53a3c2946a

+ 0 - 3
taipy/core/_orchestrator/_dispatcher/_development_job_dispatcher.py

@@ -9,7 +9,6 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
-import datetime
 from typing import Optional
 
 from ...job.job import Job
@@ -45,7 +44,5 @@ class _DevelopmentJobDispatcher(_JobDispatcher):
         Parameters:
             job (Job^): The job to submit on an executor with an available worker.
         """
-        job.execution_started_at = datetime.datetime.now()
         rs = _TaskFunctionWrapper(job.id, job.task).execute()
         self._update_job_status(job, rs)
-        job.execution_ended_at = datetime.datetime.now()

+ 0 - 3
taipy/core/_orchestrator/_dispatcher/_standalone_job_dispatcher.py

@@ -9,7 +9,6 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
-import datetime
 import multiprocessing as mp
 from concurrent.futures import Executor, ProcessPoolExecutor
 from functools import partial
@@ -61,7 +60,6 @@ class _StandaloneJobDispatcher(_JobDispatcher):
             self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the dispatch method.")
         config_as_string = _TomlSerializer()._serialize(Config._applied_config)  # type: ignore[attr-defined]
 
-        job.execution_started_at = datetime.datetime.now()
         future = self._executor.submit(_TaskFunctionWrapper(job.id, job.task), config_as_string=config_as_string)
         future.add_done_callback(partial(self._update_job_status_from_future, job))
 
@@ -70,4 +68,3 @@ class _StandaloneJobDispatcher(_JobDispatcher):
             self._nb_available_workers += 1
             self._logger.debug(f"Setting nb_available_workers to {self._nb_available_workers} in the callback method.")
         self._update_job_status(job, ft.result())
-        job.execution_ended_at = datetime.datetime.now()

+ 4 - 6
taipy/core/job/_job_converter.py

@@ -27,12 +27,11 @@ class _JobConverter(_AbstractConverter):
             job.id,
             job._task.id,
             job._status,
+            {status: timestamp.isoformat() for status, timestamp in job._status_change_records.items()},
             job._force,
             job.submit_id,
             job.submit_entity_id,
             job._creation_date.isoformat(),
-            job._execution_started_at.isoformat() if job._execution_started_at else None,
-            job._execution_ended_at.isoformat() if job._execution_ended_at else None,
             cls.__serialize_subscribers(job._subscribers),
             job._stacktrace,
             version=job._version,
@@ -52,12 +51,11 @@ class _JobConverter(_AbstractConverter):
         )
 
         job._status = model.status  # type: ignore
+        job._status_change_records = {
+            status: datetime.fromisoformat(timestamp) for status, timestamp in model.status_change_records.items()
+        }
         job._force = model.force  # type: ignore
         job._creation_date = datetime.fromisoformat(model.creation_date)  # type: ignore
-        job._execution_started_at = (
-            datetime.fromisoformat(model.execution_started_at) if model.execution_started_at else None
-        )
-        job._execution_ended_at = datetime.fromisoformat(model.execution_ended_at) if model.execution_ended_at else None
         for it in model.subscribers:
             try:
                 fct_module, fct_name = it.get("fct_module"), it.get("fct_name")

+ 4 - 7
taipy/core/job/_job_model.py

@@ -10,7 +10,7 @@
 # specific language governing permissions and limitations under the License.
 
 from dataclasses import dataclass
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List
 
 from .._repository._base_taipy_model import _BaseModel
 from .job_id import JobId
@@ -22,12 +22,11 @@ class _JobModel(_BaseModel):
     id: JobId
     task_id: str
     status: Status
+    status_change_records: Dict[str, str]
     force: bool
     submit_id: str
     submit_entity_id: str
     creation_date: str
-    execution_started_at: Optional[str]
-    execution_ended_at: Optional[str]
     subscribers: List[Dict]
     stacktrace: List[str]
     version: str
@@ -38,12 +37,11 @@ class _JobModel(_BaseModel):
             id=data["id"],
             task_id=data["task_id"],
             status=Status._from_repr(data["status"]),
+            status_change_records=_BaseModel._deserialize_attribute(data["status_change_records"]),
             force=data["force"],
             submit_id=data["submit_id"],
             submit_entity_id=data["submit_entity_id"],
             creation_date=data["creation_date"],
-            execution_started_at=data["execution_started_at"],
-            execution_ended_at=data["execution_ended_at"],
             subscribers=_BaseModel._deserialize_attribute(data["subscribers"]),
             stacktrace=_BaseModel._deserialize_attribute(data["stacktrace"]),
             version=data["version"],
@@ -54,12 +52,11 @@ class _JobModel(_BaseModel):
             self.id,
             self.task_id,
             repr(self.status),
+            _BaseModel._serialize_attribute(self.status_change_records),
             self.force,
             self.submit_id,
             self.submit_entity_id,
             self.creation_date,
-            self.execution_started_at,
-            self.execution_ended_at,
             _BaseModel._serialize_attribute(self.subscribers),
             _BaseModel._serialize_attribute(self.stacktrace),
             self.version,

+ 101 - 22
taipy/core/job/job.py

@@ -12,7 +12,7 @@
 __all__ = ["Job"]
 
 from datetime import datetime
-from typing import TYPE_CHECKING, Any, Callable, List, Optional
+from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
 
 from taipy.logger._taipy_logger import _TaipyLogger
 
@@ -49,8 +49,8 @@ class Job(_Entity, _Labeled):
 
     Every time a task is submitted for execution, a new *Job* is created. A job represents a
     single execution of a task. It holds all the information related to the task execution,
-    including the **creation date**, the execution `Status^`, and the **stacktrace** of any
-    exception that may be raised by the user function.
+    including the **creation date**, the execution `Status^`, the timestamp of status changes,
+    and the **stacktrace** of any exception that may be raised by the user function.
 
     In addition, a job notifies scenario or sequence subscribers on its status change.
 
@@ -78,8 +78,7 @@ class Job(_Entity, _Labeled):
         self._creation_date = datetime.now()
         self._submit_id: str = submit_id
         self._submit_entity_id: str = submit_entity_id
-        self._execution_started_at: Optional[datetime] = None
-        self._execution_ended_at: Optional[datetime] = None
+        self._status_change_records: Dict[str, datetime] = {"SUBMITTED": self._creation_date}
         self._subscribers: List[Callable] = []
         self._stacktrace: List[str] = []
         self.__logger = _TaipyLogger._get_logger()
@@ -134,6 +133,7 @@ class Job(_Entity, _Labeled):
     @status.setter  # type: ignore
     @_self_setter(_MANAGER_NAME)
     def status(self, val):
+        self._status_change_records[val.name] = datetime.now()
         self._status = val
 
     @property  # type: ignore
@@ -148,36 +148,107 @@ class Job(_Entity, _Labeled):
 
     @property
     @_self_reload(_MANAGER_NAME)
-    def execution_started_at(self) -> Optional[datetime]:
-        return self._execution_started_at
+    def submitted_time(self) -> datetime:
+        """Get the date time when the job was submitted.
 
-    @execution_started_at.setter
-    @_self_setter(_MANAGER_NAME)
-    def execution_started_at(self, val):
-        self._execution_started_at = val
+        Returns:
+            datetime: The date time when the job was submitted.
+        """
+        return self._status_change_records["SUBMITTED"]
 
     @property
     @_self_reload(_MANAGER_NAME)
-    def execution_ended_at(self) -> Optional[datetime]:
-        return self._execution_ended_at
+    def run_time(self) -> Optional[datetime]:
+        """Get the date time when the job was run.
 
-    @execution_ended_at.setter
-    @_self_setter(_MANAGER_NAME)
-    def execution_ended_at(self, val):
-        self._execution_ended_at = val
+        Returns:
+            Optional[datetime]: The date time when the job was run.
+                If the job is not run, None is returned.
+        """
+        return self._status_change_records.get("RUNNING", None)
+
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def finished_time(self) -> Optional[datetime]:
+        """Get the date time when the job was finished.
+
+        Returns:
+            Optional[datetime]: The date time when the job was finished.
+                If the job is not finished, None is returned.
+        """
+        if self.is_finished():
+            if self.is_completed():
+                return self._status_change_records["COMPLETED"]
+            elif self.is_failed():
+                return self._status_change_records["FAILED"]
+            elif self.is_canceled():
+                return self._status_change_records["CANCELED"]
+            elif self.is_skipped():
+                return self._status_change_records["SKIPPED"]
+            elif self.is_abandoned():
+                return self._status_change_records["ABANDONED"]
+
+        return None
 
     @property
     @_self_reload(_MANAGER_NAME)
     def execution_duration(self) -> Optional[float]:
         """Get the duration of the job execution in seconds.
+        The execution time is the duration from the job running to the job completion.
 
         Returns:
-            Optional[float]: The duration of the job execution in seconds. If the job is not
-            completed, None is returned.
+            Optional[float]: The duration of the job execution in seconds.
+                - If the job was not run, None is returned.
+                - If the job is not finished, the execution time is the duration
+                  from the running time to the current time.
         """
-        if self._execution_started_at and self._execution_ended_at:
-            return (self._execution_ended_at - self._execution_started_at).total_seconds()
-        return None
+        if "RUNNING" not in self._status_change_records:
+            return None
+
+        if self.is_finished():
+            return (self.finished_time - self._status_change_records["RUNNING"]).total_seconds()
+
+        return (datetime.now() - self._status_change_records["RUNNING"]).total_seconds()
+
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def pending_time(self) -> Optional[float]:
+        """Get the duration of the job in the pending state in seconds.
+
+        Returns:
+            Optional[float]: The duration of the job in the pending state in seconds.
+                - If the job is not running, None is returned.
+                - If the job is not pending, the pending time is the duration
+                  from the submission to the current time.
+        """
+        if "PENDING" not in self._status_change_records:
+            return None
+
+        if self.is_finished() or self.is_running():
+            return (self._status_change_records["RUNNING"] - self._status_change_records["PENDING"]).total_seconds()
+
+        return (datetime.now() - self._status_change_records["PENDING"]).total_seconds()
+
+    @property
+    @_self_reload(_MANAGER_NAME)
+    def blocked_time(self) -> Optional[float]:
+        """Get the duration of the job in the blocked state in seconds.
+
+        Returns:
+            Optional[float]: The duration of the job in the blocked state in seconds.
+                - If the job is not running, None is returned.
+                - If the job is not blocked, the blocked time is the duration
+                  from the submission to the current time.
+        """
+        if "BLOCKED" not in self._status_change_records:
+            return None
+
+        if self.is_finished():
+            return (self.finished_time - self._status_change_records["BLOCKED"]).total_seconds()
+        if self.is_running():
+            return (self._status_change_records["RUNNING"] - self._status_change_records["BLOCKED"]).total_seconds()
+
+        return (datetime.now() - self._status_change_records["BLOCKED"]).total_seconds()
 
     @property  # type: ignore
     @_self_reload(_MANAGER_NAME)
@@ -214,42 +285,50 @@ class Job(_Entity, _Labeled):
     @_run_callbacks
     def blocked(self):
         """Set the status to _blocked_ and notify subscribers."""
+        self._status_change_records["BLOCKED"] = datetime.now()
         self.status = Status.BLOCKED
 
     @_run_callbacks
     def pending(self):
         """Set the status to _pending_ and notify subscribers."""
+        self._status_change_records["PENDING"] = datetime.now()
         self.status = Status.PENDING
 
     @_run_callbacks
     def running(self):
         """Set the status to _running_ and notify subscribers."""
+        self._status_change_records["RUNNING"] = datetime.now()
         self.status = Status.RUNNING
 
     @_run_callbacks
     def canceled(self):
         """Set the status to _canceled_ and notify subscribers."""
+        self._status_change_records["CANCELED"] = datetime.now()
         self.status = Status.CANCELED
 
     @_run_callbacks
     def abandoned(self):
         """Set the status to _abandoned_ and notify subscribers."""
+        self._status_change_records["ABANDONED"] = datetime.now()
         self.status = Status.ABANDONED
 
     @_run_callbacks
     def failed(self):
         """Set the status to _failed_ and notify subscribers."""
+        self._status_change_records["FAILED"] = datetime.now()
         self.status = Status.FAILED
 
     @_run_callbacks
     def completed(self):
         """Set the status to _completed_ and notify subscribers."""
+        self._status_change_records["COMPLETED"] = datetime.now()
         self.status = Status.COMPLETED
         self.__logger.info(f"job {self.id} is completed.")
 
     @_run_callbacks
     def skipped(self):
         """Set the status to _skipped_ and notify subscribers."""
+        self._status_change_records["SKIPPED"] = datetime.now()
         self.status = Status.SKIPPED
 
     def is_failed(self) -> bool:

+ 0 - 27
taipy/core/submission/submission.py

@@ -139,33 +139,6 @@ class Submission(_Entity, _Labeled):
     def creation_date(self):
         return self._creation_date
 
-    @property
-    @_self_reload(_MANAGER_NAME)
-    def execution_started_at(self) -> Optional[datetime]:
-        if all(job.execution_started_at is not None for job in self.jobs):
-            return min(job.execution_started_at for job in self.jobs)
-        return None
-
-    @property
-    @_self_reload(_MANAGER_NAME)
-    def execution_ended_at(self) -> Optional[datetime]:
-        if all(job.execution_ended_at is not None for job in self.jobs):
-            return max(job.execution_ended_at for job in self.jobs)
-        return None
-
-    @property
-    @_self_reload(_MANAGER_NAME)
-    def execution_duration(self) -> Optional[float]:
-        """Get the duration of the submission in seconds.
-
-        Returns:
-            Optional[float]: The duration of the submission in seconds. If the job is not
-            completed, None is returned.
-        """
-        if self.execution_started_at and self.execution_ended_at:
-            return (self.execution_ended_at - self.execution_started_at).total_seconds()
-        return None
-
     def get_label(self) -> str:
         """Returns the submission simple label prefixed by its owner label.
 

+ 13 - 8
tests/core/_orchestrator/test_orchestrator__submit.py

@@ -535,16 +535,18 @@ def test_submit_duration_development_mode():
     jobs = submission.jobs
     orchestrator.stop()
 
-    assert all(isinstance(job.execution_started_at, datetime) for job in jobs)
-    assert all(isinstance(job.execution_ended_at, datetime) for job in jobs)
+    assert all(isinstance(job.submitted_time, datetime) for job in jobs)
+    assert all(isinstance(job.run_time, datetime) for job in jobs)
+    assert all(isinstance(job.finished_time, datetime) for job in jobs)
     jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
     jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
     assert jobs_1s.execution_duration >= 1
     assert jobs_2s.execution_duration >= 2
 
     assert submission.execution_duration >= 3
-    assert submission.execution_started_at == min(jobs_1s.execution_started_at, jobs_2s.execution_started_at)
-    assert submission.execution_ended_at == max(jobs_1s.execution_ended_at, jobs_2s.execution_ended_at)
+    assert submission.submitted_time == min(jobs_1s.submitted_time, jobs_2s.submitted_time)
+    assert submission.run_time == min(jobs_1s.run_time, jobs_2s.run_time)
+    assert submission.finished_time == max(jobs_1s.finished_time, jobs_2s.finished_time)
 
 
 @pytest.mark.standalone
@@ -564,17 +566,20 @@ def test_submit_duration_standalone_mode():
     submission = taipy.submit(scenario)
     jobs = submission.jobs
 
+    assert_true_after_time(jobs[0].is_completed)
     assert_true_after_time(jobs[1].is_completed)
 
     orchestrator.stop()
 
-    assert all(isinstance(job.execution_started_at, datetime) for job in jobs)
-    assert all(isinstance(job.execution_ended_at, datetime) for job in jobs)
+    assert all(isinstance(job.submitted_time, datetime) for job in jobs)
+    assert all(isinstance(job.run_time, datetime) for job in jobs)
+    assert all(isinstance(job.finished_time, datetime) for job in jobs)
     jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
     jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
     assert jobs_1s.execution_duration >= 1
     assert jobs_2s.execution_duration >= 2
 
     assert submission.execution_duration >= 2  # Both tasks are executed in parallel so the duration may smaller than 3
-    assert submission.execution_started_at == min(jobs_1s.execution_started_at, jobs_2s.execution_started_at)
-    assert submission.execution_ended_at == max(jobs_1s.execution_ended_at, jobs_2s.execution_ended_at)
+    assert submission.submitted_time == min(jobs_1s.submitted_time, jobs_2s.submitted_time)
+    assert submission.run_time == min(jobs_1s.run_time, jobs_2s.run_time)
+    assert submission.finished_time == max(jobs_1s.finished_time, jobs_2s.finished_time)

+ 0 - 28
tests/core/submission/test_submission.py

@@ -903,31 +903,3 @@ def test_is_finished():
     submission.submission_status = SubmissionStatus.COMPLETED
     assert submission.submission_status == SubmissionStatus.COMPLETED
     assert submission.is_finished()
-
-
-def test_execution_duration():
-    task = Task(config_id="task_1", properties={}, function=print, id=TaskId("task_1"))
-    submission = Submission(task.id, task._ID_PREFIX, task.config_id, properties={})
-    job_1 = Job("job_1", task, submission.id, submission.entity_id)
-    job_2 = Job("job_2", task, submission.id, submission.entity_id)
-
-    _TaskManagerFactory._build_manager()._set(task)
-    _SubmissionManagerFactory._build_manager()._set(submission)
-    _JobManagerFactory._build_manager()._set(job_1)
-    _JobManagerFactory._build_manager()._set(job_2)
-
-    submission.jobs = [job_1, job_2]
-    _SubmissionManagerFactory._build_manager()._set(submission)
-
-    job_1.execution_started_at = datetime(2024, 1, 1, 0, 0, 0)
-    job_1.execution_ended_at = datetime(2024, 1, 1, 0, 0, 10)
-    job_2.execution_started_at = datetime(2024, 1, 1, 0, 1, 0)
-    job_2.execution_ended_at = datetime(2024, 1, 1, 0, 2, 30)
-    assert submission.execution_started_at == job_1.execution_started_at
-    assert submission.execution_ended_at == job_2.execution_ended_at
-    assert submission.execution_duration == 150
-
-    job_2.execution_ended_at = None  # job_2 is still running
-    assert submission.execution_started_at == job_1.execution_started_at
-    assert submission.execution_ended_at is None
-    assert submission.execution_duration is None