فهرست منبع

Merge branch 'feature/#1704-storing-job-status-change-timestamp' into feature/add-submission-status-change-attributre

trgiangdo 7 ماه پیش
والد
کامیت
4be5d07d44
3فایلهای تغییر یافته به همراه50 افزوده شده و 53 حذف شده
  1. 26 28
      taipy/core/job/job.py
  2. 8 9
      tests/core/_orchestrator/test_orchestrator__submit.py
  3. 16 16
      tests/core/job/test_job.py

+ 26 - 28
taipy/core/job/job.py

@@ -148,7 +148,7 @@ class Job(_Entity, _Labeled):
 
     @property
     @_self_reload(_MANAGER_NAME)
-    def submitted_time(self) -> datetime:
+    def submitted_at(self) -> datetime:
         """Get the date time when the job was submitted.
 
         Returns:
@@ -158,18 +158,18 @@ class Job(_Entity, _Labeled):
 
     @property
     @_self_reload(_MANAGER_NAME)
-    def run_time(self) -> Optional[datetime]:
+    def run_at(self) -> Optional[datetime]:
         """Get the date time when the job was run.
 
         Returns:
             Optional[datetime]: The date time when the job was run.
                 If the job is not run, None is returned.
         """
-        return self._status_change_records.get("RUNNING", None)
+        return self._status_change_records.get(Status.RUNNING.name, None)
 
     @property
     @_self_reload(_MANAGER_NAME)
-    def finished_time(self) -> Optional[datetime]:
+    def finished_at(self) -> Optional[datetime]:
         """Get the date time when the job was finished.
 
         Returns:
@@ -178,15 +178,15 @@ class Job(_Entity, _Labeled):
         """
         if self.is_finished():
             if self.is_completed():
-                return self._status_change_records["COMPLETED"]
+                return self._status_change_records[Status.COMPLETED.name]
             elif self.is_failed():
-                return self._status_change_records["FAILED"]
+                return self._status_change_records[Status.FAILED.name]
             elif self.is_canceled():
-                return self._status_change_records["CANCELED"]
+                return self._status_change_records[Status.CANCELED.name]
             elif self.is_skipped():
-                return self._status_change_records["SKIPPED"]
+                return self._status_change_records[Status.SKIPPED.name]
             elif self.is_abandoned():
-                return self._status_change_records["ABANDONED"]
+                return self._status_change_records[Status.ABANDONED.name]
 
         return None
 
@@ -202,13 +202,13 @@ class Job(_Entity, _Labeled):
                 - If the job is not finished, the execution time is the duration
                   from the running time to the current time.
         """
-        if "RUNNING" not in self._status_change_records:
+        if Status.RUNNING.name not in self._status_change_records:
             return None
 
         if self.is_finished():
-            return (self.finished_time - self._status_change_records["RUNNING"]).total_seconds()
+            return (self.finished_at - self._status_change_records[Status.RUNNING.name]).total_seconds()
 
-        return (datetime.now() - self._status_change_records["RUNNING"]).total_seconds()
+        return (datetime.now() - self._status_change_records[Status.RUNNING.name]).total_seconds()
 
     @property
     @_self_reload(_MANAGER_NAME)
@@ -221,13 +221,15 @@ class Job(_Entity, _Labeled):
                 - If the job is not pending, the pending time is the duration
                   from the submission to the current time.
         """
-        if "PENDING" not in self._status_change_records:
+        if Status.PENDING.name not in self._status_change_records:
             return None
 
         if self.is_finished() or self.is_running():
-            return (self._status_change_records["RUNNING"] - self._status_change_records["PENDING"]).total_seconds()
+            return (
+                self._status_change_records[Status.RUNNING.name] - self._status_change_records[Status.PENDING.name]
+            ).total_seconds()
 
-        return (datetime.now() - self._status_change_records["PENDING"]).total_seconds()
+        return (datetime.now() - self._status_change_records[Status.PENDING.name]).total_seconds()
 
     @property
     @_self_reload(_MANAGER_NAME)
@@ -240,15 +242,19 @@ class Job(_Entity, _Labeled):
                 - If the job is not blocked, the blocked time is the duration
                   from the submission to the current time.
         """
-        if "BLOCKED" not in self._status_change_records:
+        if Status.BLOCKED.name not in self._status_change_records:
             return None
 
+        if Status.PENDING.name in self._status_change_records:
+            return (
+                self._status_change_records[Status.PENDING.name] - self._status_change_records[Status.BLOCKED.name]
+            ).total_seconds()
         if self.is_finished():
-            return (self.finished_time - self._status_change_records["BLOCKED"]).total_seconds()
-        if self.is_running():
-            return (self._status_change_records["RUNNING"] - self._status_change_records["BLOCKED"]).total_seconds()
+            return (self.finished_at - self._status_change_records[Status.BLOCKED.name]).total_seconds()
 
-        return (datetime.now() - self._status_change_records["BLOCKED"]).total_seconds()
+        # If pending time is not recorded, and the job is not finished, the only possible status left is blocked
+        # which means the current status is blocked.
+        return (datetime.now() - self._status_change_records[Status.BLOCKED.name]).total_seconds()
 
     @property  # type: ignore
     @_self_reload(_MANAGER_NAME)
@@ -285,50 +291,42 @@ class Job(_Entity, _Labeled):
     @_run_callbacks
     def blocked(self):
         """Set the status to _blocked_ and notify subscribers."""
-        self._status_change_records["BLOCKED"] = datetime.now()
         self.status = Status.BLOCKED
 
     @_run_callbacks
     def pending(self):
         """Set the status to _pending_ and notify subscribers."""
-        self._status_change_records["PENDING"] = datetime.now()
         self.status = Status.PENDING
 
     @_run_callbacks
     def running(self):
         """Set the status to _running_ and notify subscribers."""
-        self._status_change_records["RUNNING"] = datetime.now()
         self.status = Status.RUNNING
 
     @_run_callbacks
     def canceled(self):
         """Set the status to _canceled_ and notify subscribers."""
-        self._status_change_records["CANCELED"] = datetime.now()
         self.status = Status.CANCELED
 
     @_run_callbacks
     def abandoned(self):
         """Set the status to _abandoned_ and notify subscribers."""
-        self._status_change_records["ABANDONED"] = datetime.now()
         self.status = Status.ABANDONED
 
     @_run_callbacks
     def failed(self):
         """Set the status to _failed_ and notify subscribers."""
-        self._status_change_records["FAILED"] = datetime.now()
         self.status = Status.FAILED
 
     @_run_callbacks
     def completed(self):
         """Set the status to _completed_ and notify subscribers."""
-        self._status_change_records["COMPLETED"] = datetime.now()
         self.status = Status.COMPLETED
         self.__logger.info(f"job {self.id} is completed.")
 
     @_run_callbacks
     def skipped(self):
         """Set the status to _skipped_ and notify subscribers."""
-        self._status_change_records["SKIPPED"] = datetime.now()
         self.status = Status.SKIPPED
 
     def is_failed(self) -> bool:

+ 8 - 9
tests/core/_orchestrator/test_orchestrator__submit.py

@@ -28,6 +28,7 @@ from taipy.core.scenario._scenario_manager import _ScenarioManager
 from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
 from taipy.core.submission.submission_status import SubmissionStatus
 from taipy.core.task._task_manager import _TaskManager
+from tests.core.utils import assert_true_after_time
 
 
 def nothing(*args, **kwargs):
@@ -534,9 +535,9 @@ def test_submit_duration_development_mode():
     jobs = submission.jobs
     orchestrator.stop()
 
-    assert all(isinstance(job.submitted_time, datetime) for job in jobs)
-    assert all(isinstance(job.run_time, datetime) for job in jobs)
-    assert all(isinstance(job.finished_time, datetime) for job in jobs)
+    assert all(isinstance(job.submitted_at, datetime) for job in jobs)
+    assert all(isinstance(job.run_at, datetime) for job in jobs)
+    assert all(isinstance(job.finished_at, datetime) for job in jobs)
     jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
     jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
     assert jobs_1s.execution_duration >= 1
@@ -564,16 +565,14 @@ def test_submit_duration_standalone_mode():
     _ScenarioManager._set(scenario)
     submission = taipy.submit(scenario)
 
-    while not all(job is not None and job.is_completed() for job in submission.jobs):
-        sleep(1)  # Limit CPU usage
-
+    assert_true_after_time(lambda: all(job is not None and job.is_completed() for job in submission.jobs))
     orchestrator.stop()
 
     jobs = submission.jobs
 
-    assert all(isinstance(job.submitted_time, datetime) for job in jobs)
-    assert all(isinstance(job.run_time, datetime) for job in jobs)
-    assert all(isinstance(job.finished_time, datetime) for job in jobs)
+    assert all(isinstance(job.submitted_at, datetime) for job in jobs)
+    assert all(isinstance(job.run_at, datetime) for job in jobs)
+    assert all(isinstance(job.finished_at, datetime) for job in jobs)
     jobs_1s = jobs[0] if jobs[0].task.config_id == "task_config_id_1" else jobs[1]
     jobs_2s = jobs[0] if jobs[0].task.config_id == "task_config_id_2" else jobs[1]
     assert jobs_1s.execution_duration >= 1

+ 16 - 16
tests/core/job/test_job.py

@@ -335,53 +335,53 @@ def test_status_records(job_id):
     _JobManager._set(job_1)
 
     assert job_1._status_change_records == {"SUBMITTED": datetime(2024, 9, 25, 13, 30, 30)}
-    assert job_1.submitted_time == datetime(2024, 9, 25, 13, 30, 30)
+    assert job_1.submitted_at == datetime(2024, 9, 25, 13, 30, 30)
     assert job_1.execution_duration is None
 
     with freezegun.freeze_time("2024-09-25 13:35:30"):
-        job_1.pending()
+        job_1.blocked()
     assert job_1._status_change_records == {
         "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
-        "PENDING": datetime(2024, 9, 25, 13, 35, 30),
+        "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
     }
     assert job_1.execution_duration is None
     with freezegun.freeze_time("2024-09-25 13:36:00"):
-        assert job_1.pending_duration == 30
+        assert job_1.blocked_duration == 30  # = 13:36:00 - 13:35:30
 
     with freezegun.freeze_time("2024-09-25 13:40:30"):
-        job_1.blocked()
+        job_1.pending()
     assert job_1._status_change_records == {
         "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
-        "PENDING": datetime(2024, 9, 25, 13, 35, 30),
-        "BLOCKED": datetime(2024, 9, 25, 13, 40, 30),
+        "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
+        "PENDING": datetime(2024, 9, 25, 13, 40, 30),
     }
     assert job_1.execution_duration is None
     with freezegun.freeze_time("2024-09-25 13:41:00"):
-        assert job_1.blocked_duration == 30
+        assert job_1.pending_duration == 30  # = 13:41:00 - 13:40:30
 
     with freezegun.freeze_time("2024-09-25 13:50:30"):
         job_1.running()
     assert job_1._status_change_records == {
         "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
-        "PENDING": datetime(2024, 9, 25, 13, 35, 30),
-        "BLOCKED": datetime(2024, 9, 25, 13, 40, 30),
+        "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
+        "PENDING": datetime(2024, 9, 25, 13, 40, 30),
         "RUNNING": datetime(2024, 9, 25, 13, 50, 30),
     }
-    assert job_1.run_time == datetime(2024, 9, 25, 13, 50, 30)
-    assert job_1.pending_duration == 900
-    assert job_1.blocked_duration == 600
+    assert job_1.run_at == datetime(2024, 9, 25, 13, 50, 30)
+    assert job_1.blocked_duration == 300  # = 13:40:30 - 13:35:30
+    assert job_1.pending_duration == 600  # = 13:50:30 - 13:40:30
     assert job_1.execution_duration > 0
 
     with freezegun.freeze_time("2024-09-25 13:56:35"):
         job_1.completed()
     assert job_1._status_change_records == {
         "SUBMITTED": datetime(2024, 9, 25, 13, 30, 30),
-        "PENDING": datetime(2024, 9, 25, 13, 35, 30),
-        "BLOCKED": datetime(2024, 9, 25, 13, 40, 30),
+        "BLOCKED": datetime(2024, 9, 25, 13, 35, 30),
+        "PENDING": datetime(2024, 9, 25, 13, 40, 30),
         "RUNNING": datetime(2024, 9, 25, 13, 50, 30),
         "COMPLETED": datetime(2024, 9, 25, 13, 56, 35),
     }
-    assert job_1.execution_duration == 365
+    assert job_1.execution_duration == 365  # = 13:56:35 - 13:50:30
 
 
 def test_is_deletable():