job.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. __all__ = ["Job"]
  12. from datetime import datetime
  13. from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
  14. from taipy.logger._taipy_logger import _TaipyLogger
  15. from .._entity._entity import _Entity
  16. from .._entity._labeled import _Labeled
  17. from .._entity._reload import _self_reload, _self_setter
  18. from .._version._version_manager_factory import _VersionManagerFactory
  19. from ..common._utils import _fcts_to_dict
  20. from ..notification.event import Event, EventEntityType, EventOperation, _make_event
  21. from ..reason import ReasonCollection
  22. from .job_id import JobId
  23. from .status import Status
  24. if TYPE_CHECKING:
  25. from ..task.task import Task
  26. def _run_callbacks(fn):
  27. def __run_callbacks(job):
  28. fn(job)
  29. _TaipyLogger._get_logger().debug(f"{job.id} status has changed to {job.status}.")
  30. for fct in job._subscribers:
  31. fct(job)
  32. return __run_callbacks
  33. class Job(_Entity, _Labeled):
  34. """Execution of a `Task^`.
  35. Task, Sequence, and Scenario entities can be submitted for execution. The submission
  36. of a scenario triggers the submission of all the contained tasks. Similarly, the submission
  37. of a sequence also triggers the execution of all the ordered tasks.
  38. Every time a task is submitted for execution, a new *Job* is created. A job represents a
  39. single execution of a task. It holds all the information related to the task execution,
  40. including the **creation date**, the execution `Status^`, the timestamp of status changes,
  41. and the **stacktrace** of any exception that may be raised by the user function.
  42. In addition, a job notifies scenario or sequence subscribers on its status change.
  43. Attributes:
  44. id (str): The identifier of this job.
  45. task (Task^): The task of this job.
  46. force (bool): Enforce the job's execution whatever the output data nodes are in cache or
  47. not.
  48. status (Status^): The current status of this job.
  49. creation_date (datetime): The date of this job's creation.
  50. stacktrace (List[str]): The list of stacktraces of the exceptions raised during the
  51. execution.
  52. version (str): The string indicates the application version of the job to instantiate.
  53. If not provided, the latest version is used.
  54. """
  55. _MANAGER_NAME = "job"
  56. _ID_PREFIX = "JOB"
  57. def __init__(self, id: JobId, task: "Task", submit_id: str, submit_entity_id: str, force=False, version=None):
  58. self.id = id
  59. self._task = task
  60. self._force = force
  61. self._status = Status.SUBMITTED
  62. self._creation_date = datetime.now()
  63. self._submit_id: str = submit_id
  64. self._submit_entity_id: str = submit_entity_id
  65. self._status_change_records: Dict[str, datetime] = {"SUBMITTED": self._creation_date}
  66. self._subscribers: List[Callable] = []
  67. self._stacktrace: List[str] = []
  68. self.__logger = _TaipyLogger._get_logger()
  69. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  70. def get_event_context(self):
  71. return {"task_config_id": self._task.config_id}
  72. @property # type: ignore
  73. @_self_reload(_MANAGER_NAME)
  74. def task(self):
  75. return self._task
  76. @task.setter # type: ignore
  77. @_self_setter(_MANAGER_NAME)
  78. def task(self, val):
  79. self._task = val
  80. @property
  81. def owner_id(self) -> str:
  82. return self.task.id
  83. @property # type: ignore
  84. @_self_reload(_MANAGER_NAME)
  85. def force(self):
  86. return self._force
  87. @force.setter # type: ignore
  88. @_self_setter(_MANAGER_NAME)
  89. def force(self, val):
  90. self._force = val
  91. @property
  92. def submit_id(self):
  93. return self._submit_id
  94. @property
  95. def submit_entity_id(self):
  96. return self._submit_entity_id
  97. @property # type: ignore
  98. def submit_entity(self):
  99. from ..taipy import get as tp_get
  100. return tp_get(self._submit_entity_id)
  101. @property # type: ignore
  102. @_self_reload(_MANAGER_NAME)
  103. def status(self):
  104. return self._status
  105. @status.setter # type: ignore
  106. @_self_setter(_MANAGER_NAME)
  107. def status(self, val):
  108. self._status_change_records[val.name] = datetime.now()
  109. self._status = val
  110. @property # type: ignore
  111. @_self_reload(_MANAGER_NAME)
  112. def creation_date(self):
  113. return self._creation_date
  114. @creation_date.setter # type: ignore
  115. @_self_setter(_MANAGER_NAME)
  116. def creation_date(self, val):
  117. self._creation_date = val
  118. @property
  119. @_self_reload(_MANAGER_NAME)
  120. def submitted_at(self) -> datetime:
  121. """Get the date time when the job was submitted.
  122. Returns:
  123. datetime: The date time when the job was submitted.
  124. """
  125. return self._status_change_records["SUBMITTED"]
  126. @property
  127. @_self_reload(_MANAGER_NAME)
  128. def run_at(self) -> Optional[datetime]:
  129. """Get the date time when the job was run.
  130. Returns:
  131. Optional[datetime]: The date time when the job was run.
  132. If the job is not run, None is returned.
  133. """
  134. return self._status_change_records.get(Status.RUNNING.name, None)
  135. @property
  136. @_self_reload(_MANAGER_NAME)
  137. def finished_at(self) -> Optional[datetime]:
  138. """Get the date time when the job was finished.
  139. Returns:
  140. Optional[datetime]: The date time when the job was finished.
  141. If the job is not finished, None is returned.
  142. """
  143. if self.is_finished():
  144. if self.is_completed():
  145. return self._status_change_records[Status.COMPLETED.name]
  146. elif self.is_failed():
  147. return self._status_change_records[Status.FAILED.name]
  148. elif self.is_canceled():
  149. return self._status_change_records[Status.CANCELED.name]
  150. elif self.is_skipped():
  151. return self._status_change_records[Status.SKIPPED.name]
  152. elif self.is_abandoned():
  153. return self._status_change_records[Status.ABANDONED.name]
  154. return None
  155. @property
  156. @_self_reload(_MANAGER_NAME)
  157. def execution_duration(self) -> Optional[float]:
  158. """Get the duration of the job execution in seconds.
  159. The execution time is the duration from the job running to the job completion.
  160. Returns:
  161. Optional[float]: The duration of the job execution in seconds.
  162. - If the job was not run, None is returned.
  163. - If the job is not finished, the execution time is the duration
  164. from the running time to the current time.
  165. """
  166. if Status.RUNNING.name not in self._status_change_records:
  167. return None
  168. if self.is_finished():
  169. return (self.finished_at - self._status_change_records[Status.RUNNING.name]).total_seconds()
  170. return (datetime.now() - self._status_change_records[Status.RUNNING.name]).total_seconds()
  171. @property
  172. @_self_reload(_MANAGER_NAME)
  173. def pending_duration(self) -> Optional[float]:
  174. """Get the duration of the job in the pending state in seconds.
  175. Returns:
  176. Optional[float]: The duration of the job in the pending state in seconds.
  177. - If the job is not running, None is returned.
  178. - If the job is not pending, the pending time is the duration
  179. from the submission to the current time.
  180. """
  181. if Status.PENDING.name not in self._status_change_records:
  182. return None
  183. if self.is_finished() or self.is_running():
  184. return (
  185. self._status_change_records[Status.RUNNING.name] - self._status_change_records[Status.PENDING.name]
  186. ).total_seconds()
  187. return (datetime.now() - self._status_change_records[Status.PENDING.name]).total_seconds()
  188. @property
  189. @_self_reload(_MANAGER_NAME)
  190. def blocked_duration(self) -> Optional[float]:
  191. """Get the duration of the job in the blocked state in seconds.
  192. Returns:
  193. Optional[float]: The duration of the job in the blocked state in seconds.
  194. - If the job is not running, None is returned.
  195. - If the job is not blocked, the blocked time is the duration
  196. from the submission to the current time.
  197. """
  198. if Status.BLOCKED.name not in self._status_change_records:
  199. return None
  200. if Status.PENDING.name in self._status_change_records:
  201. return (
  202. self._status_change_records[Status.PENDING.name] - self._status_change_records[Status.BLOCKED.name]
  203. ).total_seconds()
  204. if self.is_finished():
  205. return (self.finished_at - self._status_change_records[Status.BLOCKED.name]).total_seconds()
  206. # If pending time is not recorded, and the job is not finished, the only possible status left is blocked
  207. # which means the current status is blocked.
  208. return (datetime.now() - self._status_change_records[Status.BLOCKED.name]).total_seconds()
  209. @property # type: ignore
  210. @_self_reload(_MANAGER_NAME)
  211. def stacktrace(self) -> List[str]:
  212. return self._stacktrace
  213. @stacktrace.setter # type: ignore
  214. @_self_setter(_MANAGER_NAME)
  215. def stacktrace(self, val):
  216. self._stacktrace = val
  217. @property
  218. def version(self):
  219. return self._version
  220. def __contains__(self, task: "Task"):
  221. return self.task.id == task.id
  222. def __lt__(self, other):
  223. return self.creation_date.timestamp() < other.creation_date.timestamp()
  224. def __le__(self, other):
  225. return self.creation_date.timestamp() <= other.creation_date.timestamp()
  226. def __gt__(self, other):
  227. return self.creation_date.timestamp() > other.creation_date.timestamp()
  228. def __ge__(self, other):
  229. return self.creation_date.timestamp() >= other.creation_date.timestamp()
  230. def __eq__(self, other):
  231. return isinstance(other, Job) and self.id == other.id
  232. @_run_callbacks
  233. def blocked(self):
  234. """Set the status to _blocked_ and notify subscribers."""
  235. self.status = Status.BLOCKED
  236. @_run_callbacks
  237. def pending(self):
  238. """Set the status to _pending_ and notify subscribers."""
  239. self.status = Status.PENDING
  240. @_run_callbacks
  241. def running(self):
  242. """Set the status to _running_ and notify subscribers."""
  243. self.status = Status.RUNNING
  244. @_run_callbacks
  245. def canceled(self):
  246. """Set the status to _canceled_ and notify subscribers."""
  247. self.status = Status.CANCELED
  248. @_run_callbacks
  249. def abandoned(self):
  250. """Set the status to _abandoned_ and notify subscribers."""
  251. self.status = Status.ABANDONED
  252. @_run_callbacks
  253. def failed(self):
  254. """Set the status to _failed_ and notify subscribers."""
  255. self.status = Status.FAILED
  256. @_run_callbacks
  257. def completed(self):
  258. """Set the status to _completed_ and notify subscribers."""
  259. self.status = Status.COMPLETED
  260. self.__logger.info(f"job {self.id} is completed.")
  261. @_run_callbacks
  262. def skipped(self):
  263. """Set the status to _skipped_ and notify subscribers."""
  264. self.status = Status.SKIPPED
  265. def is_failed(self) -> bool:
  266. """Indicate if the job has failed.
  267. Returns:
  268. True if the job has failed.
  269. """
  270. return self.status == Status.FAILED
  271. def is_blocked(self) -> bool:
  272. """Indicate if the job is blocked.
  273. Returns:
  274. True if the job is blocked.
  275. """
  276. return self.status == Status.BLOCKED
  277. def is_canceled(self) -> bool:
  278. """Indicate if the job was canceled.
  279. Returns:
  280. True if the job was canceled.
  281. """
  282. return self.status == Status.CANCELED
  283. def is_abandoned(self) -> bool:
  284. """Indicate if the job was abandoned.
  285. Returns:
  286. True if the job was abandoned.
  287. """
  288. return self.status == Status.ABANDONED
  289. def is_submitted(self) -> bool:
  290. """Indicate if the job is submitted.
  291. Returns:
  292. True if the job is submitted.
  293. """
  294. return self.status == Status.SUBMITTED
  295. def is_completed(self) -> bool:
  296. """Indicate if the job has completed.
  297. Returns:
  298. True if the job has completed.
  299. """
  300. return self.status == Status.COMPLETED
  301. def is_skipped(self) -> bool:
  302. """Indicate if the job was skipped.
  303. Returns:
  304. True if the job was skipped.
  305. """
  306. return self.status == Status.SKIPPED
  307. def is_running(self) -> bool:
  308. """Indicate if the job is running.
  309. Returns:
  310. True if the job is running.
  311. """
  312. return self.status == Status.RUNNING
  313. def is_pending(self) -> bool:
  314. """Indicate if the job is pending.
  315. Returns:
  316. True if the job is pending.
  317. """
  318. return self.status == Status.PENDING
  319. def is_finished(self) -> bool:
  320. """Indicate if the job is finished.
  321. Returns:
  322. True if the job is finished.
  323. """
  324. return self.is_completed() or self.is_failed() or self.is_canceled() or self.is_skipped() or self.is_abandoned()
  325. def _is_finished(self) -> bool:
  326. """Indicate if the job is finished. This function will not trigger the persistence feature like is_finished().
  327. Returns:
  328. True if the job is finished.
  329. """
  330. return self._status in [Status.COMPLETED, Status.FAILED, Status.CANCELED, Status.SKIPPED, Status.ABANDONED]
  331. def _on_status_change(self, *functions):
  332. """Get a notification when the status of the job changes.
  333. Job are assigned different statuses (_submitted_, _pending_, etc.) before being finished.
  334. You can be triggered on each change through this function except for the _submitted_
  335. status.
  336. Parameters:
  337. functions: Callables that will be called on each status change.
  338. """
  339. functions = list(functions)
  340. function = functions.pop()
  341. self._subscribers.append(function)
  342. if functions:
  343. self._on_status_change(*functions)
  344. def __hash__(self):
  345. return hash(self.id)
  346. def _unlock_edit_on_outputs(self):
  347. for dn in self.task.output.values():
  348. dn.unlock_edit()
  349. @staticmethod
  350. def _serialize_subscribers(subscribers: List) -> List:
  351. return _fcts_to_dict(subscribers)
  352. def get_label(self) -> str:
  353. """Returns the job simple label prefixed by its owner label.
  354. Returns:
  355. The label of the job as a string.
  356. """
  357. return self._get_label()
  358. def get_simple_label(self) -> str:
  359. """Returns the job simple label.
  360. Returns:
  361. The simple label of the job as a string.
  362. """
  363. return self._get_simple_label()
  364. def is_deletable(self) -> ReasonCollection:
  365. """Indicate if the job can be deleted.
  366. Returns:
  367. A ReasonCollection object that can function as a Boolean value,
  368. which is True if the job can be deleted. False otherwise.
  369. """
  370. from ... import core as tp
  371. return tp.is_deletable(self)
  372. @_make_event.register(Job)
  373. def _make_event_for_job(
  374. job: Job,
  375. operation: EventOperation,
  376. /,
  377. attribute_name: Optional[str] = None,
  378. attribute_value: Optional[Any] = None,
  379. **kwargs,
  380. ) -> Event:
  381. metadata = {
  382. "creation_date": job._creation_date,
  383. "task_config_id": job._task.config_id,
  384. "version": job._version,
  385. **kwargs,
  386. }
  387. return Event(
  388. entity_type=EventEntityType.JOB,
  389. entity_id=job.id,
  390. operation=operation,
  391. attribute_name=attribute_name,
  392. attribute_value=attribute_value,
  393. metadata=metadata,
  394. )