|
@@ -12,7 +12,7 @@
|
|
|
__all__ = ["Job"]
|
|
|
|
|
|
from datetime import datetime
|
|
|
-from typing import TYPE_CHECKING, Any, Callable, List, Optional
|
|
|
+from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
|
|
|
|
|
|
from taipy.logger._taipy_logger import _TaipyLogger
|
|
|
|
|
@@ -49,8 +49,8 @@ class Job(_Entity, _Labeled):
|
|
|
|
|
|
Every time a task is submitted for execution, a new *Job* is created. A job represents a
|
|
|
single execution of a task. It holds all the information related to the task execution,
|
|
|
- including the **creation date**, the execution `Status^`, and the **stacktrace** of any
|
|
|
- exception that may be raised by the user function.
|
|
|
+ including the **creation date**, the execution `Status^`, the timestamp of status changes,
|
|
|
+ and the **stacktrace** of any exception that may be raised by the user function.
|
|
|
|
|
|
In addition, a job notifies scenario or sequence subscribers on its status change.
|
|
|
|
|
@@ -78,8 +78,7 @@ class Job(_Entity, _Labeled):
|
|
|
self._creation_date = datetime.now()
|
|
|
self._submit_id: str = submit_id
|
|
|
self._submit_entity_id: str = submit_entity_id
|
|
|
- self._execution_started_at: Optional[datetime] = None
|
|
|
- self._execution_ended_at: Optional[datetime] = None
|
|
|
+ self._status_change_records: Dict[str, datetime] = {"SUBMITTED": self._creation_date}
|
|
|
self._subscribers: List[Callable] = []
|
|
|
self._stacktrace: List[str] = []
|
|
|
self.__logger = _TaipyLogger._get_logger()
|
|
@@ -134,6 +133,7 @@ class Job(_Entity, _Labeled):
|
|
|
@status.setter # type: ignore
|
|
|
@_self_setter(_MANAGER_NAME)
|
|
|
def status(self, val):
|
|
|
+ self._status_change_records[val.name] = datetime.now()
|
|
|
self._status = val
|
|
|
|
|
|
@property # type: ignore
|
|
@@ -148,36 +148,113 @@ class Job(_Entity, _Labeled):
|
|
|
|
|
|
@property
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
- def execution_started_at(self) -> Optional[datetime]:
|
|
|
- return self._execution_started_at
|
|
|
+ def submitted_at(self) -> datetime:
|
|
|
+ """Get the date time when the job was submitted.
|
|
|
|
|
|
- @execution_started_at.setter
|
|
|
- @_self_setter(_MANAGER_NAME)
|
|
|
- def execution_started_at(self, val):
|
|
|
- self._execution_started_at = val
|
|
|
+ Returns:
|
|
|
+ datetime: The date time when the job was submitted.
|
|
|
+ """
|
|
|
+ return self._status_change_records["SUBMITTED"]
|
|
|
|
|
|
@property
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
- def execution_ended_at(self) -> Optional[datetime]:
|
|
|
- return self._execution_ended_at
|
|
|
+ def run_at(self) -> Optional[datetime]:
|
|
|
+ """Get the date time when the job was run.
|
|
|
|
|
|
- @execution_ended_at.setter
|
|
|
- @_self_setter(_MANAGER_NAME)
|
|
|
- def execution_ended_at(self, val):
|
|
|
- self._execution_ended_at = val
|
|
|
+ Returns:
|
|
|
+ Optional[datetime]: The date time when the job was run.
|
|
|
+ If the job is not run, None is returned.
|
|
|
+ """
|
|
|
+ return self._status_change_records.get(Status.RUNNING.name, None)
|
|
|
+
|
|
|
+ @property
|
|
|
+ @_self_reload(_MANAGER_NAME)
|
|
|
+ def finished_at(self) -> Optional[datetime]:
|
|
|
+ """Get the date time when the job was finished.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Optional[datetime]: The date time when the job was finished.
|
|
|
+ If the job is not finished, None is returned.
|
|
|
+ """
|
|
|
+ if self.is_finished():
|
|
|
+ if self.is_completed():
|
|
|
+ return self._status_change_records[Status.COMPLETED.name]
|
|
|
+ elif self.is_failed():
|
|
|
+ return self._status_change_records[Status.FAILED.name]
|
|
|
+ elif self.is_canceled():
|
|
|
+ return self._status_change_records[Status.CANCELED.name]
|
|
|
+ elif self.is_skipped():
|
|
|
+ return self._status_change_records[Status.SKIPPED.name]
|
|
|
+ elif self.is_abandoned():
|
|
|
+ return self._status_change_records[Status.ABANDONED.name]
|
|
|
+
|
|
|
+ return None
|
|
|
|
|
|
@property
|
|
|
@_self_reload(_MANAGER_NAME)
|
|
|
def execution_duration(self) -> Optional[float]:
|
|
|
"""Get the duration of the job execution in seconds.
|
|
|
+ The execution time is the duration from the job running to the job completion.
|
|
|
|
|
|
Returns:
|
|
|
- Optional[float]: The duration of the job execution in seconds. If the job is not
|
|
|
- completed, None is returned.
|
|
|
+ Optional[float]: The duration of the job execution in seconds.
|
|
|
+ - If the job was not run, None is returned.
|
|
|
+ - If the job is not finished, the execution time is the duration
|
|
|
+ from the running time to the current time.
|
|
|
"""
|
|
|
- if self._execution_started_at and self._execution_ended_at:
|
|
|
- return (self._execution_ended_at - self._execution_started_at).total_seconds()
|
|
|
- return None
|
|
|
+ if Status.RUNNING.name not in self._status_change_records:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if self.is_finished():
|
|
|
+ return (self.finished_at - self._status_change_records[Status.RUNNING.name]).total_seconds()
|
|
|
+
|
|
|
+ return (datetime.now() - self._status_change_records[Status.RUNNING.name]).total_seconds()
|
|
|
+
|
|
|
+ @property
|
|
|
+ @_self_reload(_MANAGER_NAME)
|
|
|
+ def pending_duration(self) -> Optional[float]:
|
|
|
+ """Get the duration of the job in the pending state in seconds.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Optional[float]: The duration of the job in the pending state in seconds.
|
|
|
+ - If the job is not running, None is returned.
|
|
|
+ - If the job is not pending, the pending time is the duration
|
|
|
+ from the submission to the current time.
|
|
|
+ """
|
|
|
+ if Status.PENDING.name not in self._status_change_records:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if self.is_finished() or self.is_running():
|
|
|
+ return (
|
|
|
+ self._status_change_records[Status.RUNNING.name] - self._status_change_records[Status.PENDING.name]
|
|
|
+ ).total_seconds()
|
|
|
+
|
|
|
+ return (datetime.now() - self._status_change_records[Status.PENDING.name]).total_seconds()
|
|
|
+
|
|
|
+ @property
|
|
|
+ @_self_reload(_MANAGER_NAME)
|
|
|
+ def blocked_duration(self) -> Optional[float]:
|
|
|
+ """Get the duration of the job in the blocked state in seconds.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Optional[float]: The duration of the job in the blocked state in seconds.
|
|
|
+ - If the job is not running, None is returned.
|
|
|
+ - If the job is not blocked, the blocked time is the duration
|
|
|
+ from the submission to the current time.
|
|
|
+ """
|
|
|
+ if Status.BLOCKED.name not in self._status_change_records:
|
|
|
+ return None
|
|
|
+
|
|
|
+ if Status.PENDING.name in self._status_change_records:
|
|
|
+ return (
|
|
|
+ self._status_change_records[Status.PENDING.name] - self._status_change_records[Status.BLOCKED.name]
|
|
|
+ ).total_seconds()
|
|
|
+ if self.is_finished():
|
|
|
+ return (self.finished_at - self._status_change_records[Status.BLOCKED.name]).total_seconds()
|
|
|
+
|
|
|
+ # If pending time is not recorded, and the job is not finished, the only possible status left is blocked
|
|
|
+ # which means the current status is blocked.
|
|
|
+ return (datetime.now() - self._status_change_records[Status.BLOCKED.name]).total_seconds()
|
|
|
|
|
|
@property # type: ignore
|
|
|
@_self_reload(_MANAGER_NAME)
|