123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498 |
- # Copyright 2021-2024 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- __all__ = ["Job"]
- from datetime import datetime
- from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
- from taipy.logger._taipy_logger import _TaipyLogger
- from .._entity._entity import _Entity
- from .._entity._labeled import _Labeled
- from .._entity._reload import _self_reload, _self_setter
- from .._version._version_manager_factory import _VersionManagerFactory
- from ..common._utils import _fcts_to_dict
- from ..notification.event import Event, EventEntityType, EventOperation, _make_event
- from ..reason import ReasonCollection
- from .job_id import JobId
- from .status import Status
- if TYPE_CHECKING:
- from ..task.task import Task
- def _run_callbacks(fn):
- def __run_callbacks(job):
- fn(job)
- _TaipyLogger._get_logger().debug(f"{job.id} status has changed to {job.status}.")
- for fct in job._subscribers:
- fct(job)
- return __run_callbacks
- class Job(_Entity, _Labeled):
- """Execution of a `Task^`.
- Task, Sequence, and Scenario entities can be submitted for execution. The submission
- of a scenario triggers the submission of all the contained tasks. Similarly, the submission
- of a sequence also triggers the execution of all the ordered tasks.
- Every time a task is submitted for execution, a new *Job* is created. A job represents a
- single execution of a task. It holds all the information related to the task execution,
- including the **creation date**, the execution `Status^`, the timestamp of status changes,
- and the **stacktrace** of any exception that may be raised by the user function.
- In addition, a job notifies scenario or sequence subscribers on its status change.
- Attributes:
- id (str): The identifier of this job.
- task (Task^): The task of this job.
- force (bool): Enforce the job's execution whatever the output data nodes are in cache or
- not.
- status (Status^): The current status of this job.
- creation_date (datetime): The date of this job's creation.
- stacktrace (List[str]): The list of stacktraces of the exceptions raised during the
- execution.
- version (str): The string indicates the application version of the job to instantiate.
- If not provided, the latest version is used.
- """
- _MANAGER_NAME = "job"
- _ID_PREFIX = "JOB"
- def __init__(self, id: JobId, task: "Task", submit_id: str, submit_entity_id: str, force=False, version=None):
- self.id = id
- self._task = task
- self._force = force
- self._status = Status.SUBMITTED
- self._creation_date = datetime.now()
- self._submit_id: str = submit_id
- self._submit_entity_id: str = submit_entity_id
- self._status_change_records: Dict[str, datetime] = {"SUBMITTED": self._creation_date}
- self._subscribers: List[Callable] = []
- self._stacktrace: List[str] = []
- self.__logger = _TaipyLogger._get_logger()
- self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
- def get_event_context(self):
- return {"task_config_id": self._task.config_id}
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def task(self):
- return self._task
- @task.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def task(self, val):
- self._task = val
- @property
- def owner_id(self) -> str:
- return self.task.id
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def force(self):
- return self._force
- @force.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def force(self, val):
- self._force = val
- @property
- def submit_id(self):
- return self._submit_id
- @property
- def submit_entity_id(self):
- return self._submit_entity_id
- @property # type: ignore
- def submit_entity(self):
- from ..taipy import get as tp_get
- return tp_get(self._submit_entity_id)
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def status(self):
- return self._status
- @status.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def status(self, val):
- self._status_change_records[val.name] = datetime.now()
- self._status = val
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def creation_date(self):
- return self._creation_date
- @creation_date.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def creation_date(self, val):
- self._creation_date = val
- @property
- @_self_reload(_MANAGER_NAME)
- def submitted_at(self) -> datetime:
- """Get the date time when the job was submitted.
- Returns:
- datetime: The date time when the job was submitted.
- """
- return self._status_change_records["SUBMITTED"]
- @property
- @_self_reload(_MANAGER_NAME)
- def run_at(self) -> Optional[datetime]:
- """Get the date time when the job was run.
- Returns:
- Optional[datetime]: The date time when the job was run.
- If the job is not run, None is returned.
- """
- return self._status_change_records.get(Status.RUNNING.name, None)
- @property
- @_self_reload(_MANAGER_NAME)
- def finished_at(self) -> Optional[datetime]:
- """Get the date time when the job was finished.
- Returns:
- Optional[datetime]: The date time when the job was finished.
- If the job is not finished, None is returned.
- """
- if self.is_finished():
- if self.is_completed():
- return self._status_change_records[Status.COMPLETED.name]
- elif self.is_failed():
- return self._status_change_records[Status.FAILED.name]
- elif self.is_canceled():
- return self._status_change_records[Status.CANCELED.name]
- elif self.is_skipped():
- return self._status_change_records[Status.SKIPPED.name]
- elif self.is_abandoned():
- return self._status_change_records[Status.ABANDONED.name]
- return None
- @property
- @_self_reload(_MANAGER_NAME)
- def execution_duration(self) -> Optional[float]:
- """Get the duration of the job execution in seconds.
- The execution time is the duration from the job running to the job completion.
- Returns:
- Optional[float]: The duration of the job execution in seconds.
- - If the job was not run, None is returned.
- - If the job is not finished, the execution time is the duration
- from the running time to the current time.
- """
- if Status.RUNNING.name not in self._status_change_records:
- return None
- if self.is_finished():
- return (self.finished_at - self._status_change_records[Status.RUNNING.name]).total_seconds()
- return (datetime.now() - self._status_change_records[Status.RUNNING.name]).total_seconds()
- @property
- @_self_reload(_MANAGER_NAME)
- def pending_duration(self) -> Optional[float]:
- """Get the duration of the job in the pending state in seconds.
- Returns:
- Optional[float]: The duration of the job in the pending state in seconds.
- - If the job is not running, None is returned.
- - If the job is not pending, the pending time is the duration
- from the submission to the current time.
- """
- if Status.PENDING.name not in self._status_change_records:
- return None
- if self.is_finished() or self.is_running():
- return (
- self._status_change_records[Status.RUNNING.name] - self._status_change_records[Status.PENDING.name]
- ).total_seconds()
- return (datetime.now() - self._status_change_records[Status.PENDING.name]).total_seconds()
- @property
- @_self_reload(_MANAGER_NAME)
- def blocked_duration(self) -> Optional[float]:
- """Get the duration of the job in the blocked state in seconds.
- Returns:
- Optional[float]: The duration of the job in the blocked state in seconds.
- - If the job is not running, None is returned.
- - If the job is not blocked, the blocked time is the duration
- from the submission to the current time.
- """
- if Status.BLOCKED.name not in self._status_change_records:
- return None
- if Status.PENDING.name in self._status_change_records:
- return (
- self._status_change_records[Status.PENDING.name] - self._status_change_records[Status.BLOCKED.name]
- ).total_seconds()
- if self.is_finished():
- return (self.finished_at - self._status_change_records[Status.BLOCKED.name]).total_seconds()
- # If pending time is not recorded, and the job is not finished, the only possible status left is blocked
- # which means the current status is blocked.
- return (datetime.now() - self._status_change_records[Status.BLOCKED.name]).total_seconds()
- @property # type: ignore
- @_self_reload(_MANAGER_NAME)
- def stacktrace(self) -> List[str]:
- return self._stacktrace
- @stacktrace.setter # type: ignore
- @_self_setter(_MANAGER_NAME)
- def stacktrace(self, val):
- self._stacktrace = val
- @property
- def version(self):
- return self._version
- def __contains__(self, task: "Task"):
- return self.task.id == task.id
- def __lt__(self, other):
- return self.creation_date.timestamp() < other.creation_date.timestamp()
- def __le__(self, other):
- return self.creation_date.timestamp() <= other.creation_date.timestamp()
- def __gt__(self, other):
- return self.creation_date.timestamp() > other.creation_date.timestamp()
- def __ge__(self, other):
- return self.creation_date.timestamp() >= other.creation_date.timestamp()
- def __eq__(self, other):
- return isinstance(other, Job) and self.id == other.id
- @_run_callbacks
- def blocked(self):
- """Set the status to _blocked_ and notify subscribers."""
- self.status = Status.BLOCKED
- @_run_callbacks
- def pending(self):
- """Set the status to _pending_ and notify subscribers."""
- self.status = Status.PENDING
- @_run_callbacks
- def running(self):
- """Set the status to _running_ and notify subscribers."""
- self.status = Status.RUNNING
- @_run_callbacks
- def canceled(self):
- """Set the status to _canceled_ and notify subscribers."""
- self.status = Status.CANCELED
- @_run_callbacks
- def abandoned(self):
- """Set the status to _abandoned_ and notify subscribers."""
- self.status = Status.ABANDONED
- @_run_callbacks
- def failed(self):
- """Set the status to _failed_ and notify subscribers."""
- self.status = Status.FAILED
- @_run_callbacks
- def completed(self):
- """Set the status to _completed_ and notify subscribers."""
- self.status = Status.COMPLETED
- self.__logger.info(f"job {self.id} is completed.")
- @_run_callbacks
- def skipped(self):
- """Set the status to _skipped_ and notify subscribers."""
- self.status = Status.SKIPPED
- def is_failed(self) -> bool:
- """Indicate if the job has failed.
- Returns:
- True if the job has failed.
- """
- return self.status == Status.FAILED
- def is_blocked(self) -> bool:
- """Indicate if the job is blocked.
- Returns:
- True if the job is blocked.
- """
- return self.status == Status.BLOCKED
- def is_canceled(self) -> bool:
- """Indicate if the job was canceled.
- Returns:
- True if the job was canceled.
- """
- return self.status == Status.CANCELED
- def is_abandoned(self) -> bool:
- """Indicate if the job was abandoned.
- Returns:
- True if the job was abandoned.
- """
- return self.status == Status.ABANDONED
- def is_submitted(self) -> bool:
- """Indicate if the job is submitted.
- Returns:
- True if the job is submitted.
- """
- return self.status == Status.SUBMITTED
- def is_completed(self) -> bool:
- """Indicate if the job has completed.
- Returns:
- True if the job has completed.
- """
- return self.status == Status.COMPLETED
- def is_skipped(self) -> bool:
- """Indicate if the job was skipped.
- Returns:
- True if the job was skipped.
- """
- return self.status == Status.SKIPPED
- def is_running(self) -> bool:
- """Indicate if the job is running.
- Returns:
- True if the job is running.
- """
- return self.status == Status.RUNNING
- def is_pending(self) -> bool:
- """Indicate if the job is pending.
- Returns:
- True if the job is pending.
- """
- return self.status == Status.PENDING
- def is_finished(self) -> bool:
- """Indicate if the job is finished.
- Returns:
- True if the job is finished.
- """
- return self.is_completed() or self.is_failed() or self.is_canceled() or self.is_skipped() or self.is_abandoned()
- def _is_finished(self) -> bool:
- """Indicate if the job is finished. This function will not trigger the persistence feature like is_finished().
- Returns:
- True if the job is finished.
- """
- return self._status in [Status.COMPLETED, Status.FAILED, Status.CANCELED, Status.SKIPPED, Status.ABANDONED]
- def _on_status_change(self, *functions):
- """Get a notification when the status of the job changes.
- Job are assigned different statuses (_submitted_, _pending_, etc.) before being finished.
- You can be triggered on each change through this function except for the _submitted_
- status.
- Parameters:
- functions: Callables that will be called on each status change.
- """
- functions = list(functions)
- function = functions.pop()
- self._subscribers.append(function)
- if functions:
- self._on_status_change(*functions)
- def __hash__(self):
- return hash(self.id)
- def _unlock_edit_on_outputs(self):
- for dn in self.task.output.values():
- dn.unlock_edit()
- @staticmethod
- def _serialize_subscribers(subscribers: List) -> List:
- return _fcts_to_dict(subscribers)
- def get_label(self) -> str:
- """Returns the job simple label prefixed by its owner label.
- Returns:
- The label of the job as a string.
- """
- return self._get_label()
- def get_simple_label(self) -> str:
- """Returns the job simple label.
- Returns:
- The simple label of the job as a string.
- """
- return self._get_simple_label()
- def is_deletable(self) -> ReasonCollection:
- """Indicate if the job can be deleted.
- Returns:
- A ReasonCollection object that can function as a Boolean value,
- which is True if the job can be deleted. False otherwise.
- """
- from ... import core as tp
- return tp.is_deletable(self)
- @_make_event.register(Job)
- def _make_event_for_job(
- job: Job,
- operation: EventOperation,
- /,
- attribute_name: Optional[str] = None,
- attribute_value: Optional[Any] = None,
- **kwargs,
- ) -> Event:
- metadata = {
- "creation_date": job._creation_date,
- "task_config_id": job._task.config_id,
- "version": job._version,
- **kwargs,
- }
- return Event(
- entity_type=EventEntityType.JOB,
- entity_id=job.id,
- operation=operation,
- attribute_name=attribute_name,
- attribute_value=attribute_value,
- metadata=metadata,
- )
|