job.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. __all__ = ["Job"]
  12. from datetime import datetime
  13. from typing import TYPE_CHECKING, Any, Callable, List, Optional
  14. from taipy.logger._taipy_logger import _TaipyLogger
  15. from .._entity._entity import _Entity
  16. from .._entity._labeled import _Labeled
  17. from .._entity._reload import _self_reload, _self_setter
  18. from .._version._version_manager_factory import _VersionManagerFactory
  19. from ..common._utils import _fcts_to_dict
  20. from ..notification.event import Event, EventEntityType, EventOperation, _make_event
  21. from .job_id import JobId
  22. from .status import Status
  23. if TYPE_CHECKING:
  24. from ..task.task import Task
  25. def _run_callbacks(fn):
  26. def __run_callbacks(job):
  27. fn(job)
  28. _TaipyLogger._get_logger().debug(f"{job.id} status has changed to {job.status}.")
  29. for fct in job._subscribers:
  30. fct(job)
  31. return __run_callbacks
  32. class Job(_Entity, _Labeled):
  33. """Execution of a `Task^`.
  34. Task, Sequence, and Scenario entities can be submitted for execution. The submission
  35. of a scenario triggers the submission of all the contained tasks. Similarly, the submission
  36. of a sequence also triggers the execution of all the ordered tasks.
  37. Every time a task is submitted for execution, a new *Job* is created. A job represents a
  38. single execution of a task. It holds all the information related to the task execution,
  39. including the **creation date**, the execution `Status^`, and the **stacktrace** of any
  40. exception that may be raised by the user function.
  41. In addition, a job notifies scenario or sequence subscribers on its status change.
  42. Attributes:
  43. id (str): The identifier of this job.
  44. task (Task^): The task of this job.
  45. force (bool): Enforce the job's execution whatever the output data nodes are in cache or
  46. not.
  47. status (Status^): The current status of this job.
  48. creation_date (datetime): The date of this job's creation.
  49. stacktrace (List[str]): The list of stacktraces of the exceptions raised during the
  50. execution.
  51. version (str): The string indicates the application version of the job to instantiate.
  52. If not provided, the latest version is used.
  53. """
  54. _MANAGER_NAME = "job"
  55. _ID_PREFIX = "JOB"
  56. def __init__(self, id: JobId, task: "Task", submit_id: str, submit_entity_id: str, force=False, version=None):
  57. self.id = id
  58. self._task = task
  59. self._force = force
  60. self._status = Status.SUBMITTED
  61. self._creation_date = datetime.now()
  62. self._submit_id: str = submit_id
  63. self._submit_entity_id: str = submit_entity_id
  64. self._subscribers: List[Callable] = []
  65. self._stacktrace: List[str] = []
  66. self.__logger = _TaipyLogger._get_logger()
  67. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  68. def get_event_context(self):
  69. return {"task_config_id": self._task.config_id}
  70. @property # type: ignore
  71. @_self_reload(_MANAGER_NAME)
  72. def task(self):
  73. return self._task
  74. @task.setter # type: ignore
  75. @_self_setter(_MANAGER_NAME)
  76. def task(self, val):
  77. self._task = val
  78. @property
  79. def owner_id(self) -> str:
  80. return self.task.id
  81. @property # type: ignore
  82. @_self_reload(_MANAGER_NAME)
  83. def force(self):
  84. return self._force
  85. @force.setter # type: ignore
  86. @_self_setter(_MANAGER_NAME)
  87. def force(self, val):
  88. self._force = val
  89. @property
  90. def submit_id(self):
  91. return self._submit_id
  92. @property
  93. def submit_entity_id(self):
  94. return self._submit_entity_id
  95. @property # type: ignore
  96. def submit_entity(self):
  97. from ..taipy import get as tp_get
  98. return tp_get(self._submit_entity_id)
  99. @property # type: ignore
  100. @_self_reload(_MANAGER_NAME)
  101. def status(self):
  102. return self._status
  103. @status.setter # type: ignore
  104. @_self_setter(_MANAGER_NAME)
  105. def status(self, val):
  106. self._status = val
  107. @property # type: ignore
  108. @_self_reload(_MANAGER_NAME)
  109. def creation_date(self):
  110. return self._creation_date
  111. @creation_date.setter # type: ignore
  112. @_self_setter(_MANAGER_NAME)
  113. def creation_date(self, val):
  114. self._creation_date = val
  115. @property # type: ignore
  116. @_self_reload(_MANAGER_NAME)
  117. def stacktrace(self) -> List[str]:
  118. return self._stacktrace
  119. @stacktrace.setter # type: ignore
  120. @_self_setter(_MANAGER_NAME)
  121. def stacktrace(self, val):
  122. self._stacktrace = val
  123. @property
  124. def version(self):
  125. return self._version
  126. def __contains__(self, task: "Task"):
  127. return self.task.id == task.id
  128. def __lt__(self, other):
  129. return self.creation_date.timestamp() < other.creation_date.timestamp()
  130. def __le__(self, other):
  131. return self.creation_date.timestamp() <= other.creation_date.timestamp()
  132. def __gt__(self, other):
  133. return self.creation_date.timestamp() > other.creation_date.timestamp()
  134. def __ge__(self, other):
  135. return self.creation_date.timestamp() >= other.creation_date.timestamp()
  136. def __eq__(self, other):
  137. return isinstance(other, Job) and self.id == other.id
  138. @_run_callbacks
  139. def blocked(self):
  140. """Set the status to _blocked_ and notify subscribers."""
  141. self.status = Status.BLOCKED
  142. @_run_callbacks
  143. def pending(self):
  144. """Set the status to _pending_ and notify subscribers."""
  145. self.status = Status.PENDING
  146. @_run_callbacks
  147. def running(self):
  148. """Set the status to _running_ and notify subscribers."""
  149. self.status = Status.RUNNING
  150. @_run_callbacks
  151. def canceled(self):
  152. """Set the status to _canceled_ and notify subscribers."""
  153. self.status = Status.CANCELED
  154. @_run_callbacks
  155. def abandoned(self):
  156. """Set the status to _abandoned_ and notify subscribers."""
  157. self.status = Status.ABANDONED
  158. @_run_callbacks
  159. def failed(self):
  160. """Set the status to _failed_ and notify subscribers."""
  161. self.status = Status.FAILED
  162. @_run_callbacks
  163. def completed(self):
  164. """Set the status to _completed_ and notify subscribers."""
  165. self.status = Status.COMPLETED
  166. self.__logger.info(f"job {self.id} is completed.")
  167. @_run_callbacks
  168. def skipped(self):
  169. """Set the status to _skipped_ and notify subscribers."""
  170. self.status = Status.SKIPPED
  171. def is_failed(self) -> bool:
  172. """Indicate if the job has failed.
  173. Returns:
  174. True if the job has failed.
  175. """
  176. return self.status == Status.FAILED
  177. def is_blocked(self) -> bool:
  178. """Indicate if the job is blocked.
  179. Returns:
  180. True if the job is blocked.
  181. """
  182. return self.status == Status.BLOCKED
  183. def is_canceled(self) -> bool:
  184. """Indicate if the job was canceled.
  185. Returns:
  186. True if the job was canceled.
  187. """
  188. return self.status == Status.CANCELED
  189. def is_abandoned(self) -> bool:
  190. """Indicate if the job was abandoned.
  191. Returns:
  192. True if the job was abandoned.
  193. """
  194. return self.status == Status.ABANDONED
  195. def is_submitted(self) -> bool:
  196. """Indicate if the job is submitted.
  197. Returns:
  198. True if the job is submitted.
  199. """
  200. return self.status == Status.SUBMITTED
  201. def is_completed(self) -> bool:
  202. """Indicate if the job has completed.
  203. Returns:
  204. True if the job has completed.
  205. """
  206. return self.status == Status.COMPLETED
  207. def is_skipped(self) -> bool:
  208. """Indicate if the job was skipped.
  209. Returns:
  210. True if the job was skipped.
  211. """
  212. return self.status == Status.SKIPPED
  213. def is_running(self) -> bool:
  214. """Indicate if the job is running.
  215. Returns:
  216. True if the job is running.
  217. """
  218. return self.status == Status.RUNNING
  219. def is_pending(self) -> bool:
  220. """Indicate if the job is pending.
  221. Returns:
  222. True if the job is pending.
  223. """
  224. return self.status == Status.PENDING
  225. def is_finished(self) -> bool:
  226. """Indicate if the job is finished.
  227. Returns:
  228. True if the job is finished.
  229. """
  230. return self.is_completed() or self.is_failed() or self.is_canceled() or self.is_skipped() or self.is_abandoned()
  231. def _is_finished(self) -> bool:
  232. """Indicate if the job is finished. This function will not trigger the persistence feature like is_finished().
  233. Returns:
  234. True if the job is finished.
  235. """
  236. return self._status in [Status.COMPLETED, Status.FAILED, Status.CANCELED, Status.SKIPPED, Status.ABANDONED]
  237. def _on_status_change(self, *functions):
  238. """Get a notification when the status of the job changes.
  239. Job are assigned different statuses (_submitted_, _pending_, etc.) before being finished.
  240. You can be triggered on each change through this function except for the _submitted_
  241. status.
  242. Parameters:
  243. functions: Callables that will be called on each status change.
  244. """
  245. functions = list(functions)
  246. function = functions.pop()
  247. self._subscribers.append(function)
  248. if functions:
  249. self._on_status_change(*functions)
  250. def __hash__(self):
  251. return hash(self.id)
  252. def _unlock_edit_on_outputs(self):
  253. for dn in self.task.output.values():
  254. dn.unlock_edit()
  255. @staticmethod
  256. def _serialize_subscribers(subscribers: List) -> List:
  257. return _fcts_to_dict(subscribers)
  258. def get_label(self) -> str:
  259. """Returns the job simple label prefixed by its owner label.
  260. Returns:
  261. The label of the job as a string.
  262. """
  263. return self._get_label()
  264. def get_simple_label(self) -> str:
  265. """Returns the job simple label.
  266. Returns:
  267. The simple label of the job as a string.
  268. """
  269. return self._get_simple_label()
  270. def is_deletable(self) -> bool:
  271. """Indicate if the job can be deleted.
  272. Returns:
  273. True if the job can be deleted. False otherwise.
  274. """
  275. from ... import core as tp
  276. return tp.is_deletable(self)
  277. @_make_event.register(Job)
  278. def _make_event_for_job(
  279. job: Job,
  280. operation: EventOperation,
  281. /,
  282. attribute_name: Optional[str] = None,
  283. attribute_value: Optional[Any] = None,
  284. **kwargs,
  285. ) -> Event:
  286. metadata = {
  287. "creation_date": job._creation_date,
  288. "task_config_id": job._task.config_id,
  289. "version": job._version,
  290. **kwargs,
  291. }
  292. return Event(
  293. entity_type=EventEntityType.JOB,
  294. entity_id=job.id,
  295. operation=operation,
  296. attribute_name=attribute_name,
  297. attribute_value=attribute_value,
  298. metadata=metadata,
  299. )