submission.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import threading
  12. import uuid
  13. from datetime import datetime
  14. from typing import Any, Dict, List, Optional, Set, Union
  15. from .._entity._entity import _Entity
  16. from .._entity._labeled import _Labeled
  17. from .._entity._properties import _Properties
  18. from .._entity._reload import _Reloader, _self_reload, _self_setter
  19. from .._version._version_manager_factory import _VersionManagerFactory
  20. from ..job.job import Job, JobId, Status
  21. from ..notification.event import Event, EventEntityType, EventOperation, _make_event
  22. from .submission_id import SubmissionId
  23. from .submission_status import SubmissionStatus
  24. class Submission(_Entity, _Labeled):
  25. """Hold the jobs and submission status when a Scenario^, Sequence^ or Task^ is submitted.
  26. Attributes:
  27. entity_id (str): The identifier of the entity that was submitted.
  28. id (str): The identifier of the `Submission^` entity.
  29. jobs (Optional[Union[List[Job], List[JobId]]]): A list of jobs.
  30. properties (dict[str, Any]): A dictionary of additional properties.
  31. creation_date (Optional[datetime]): The date of this submission's creation.
  32. submission_status (Optional[SubmissionStatus]): The current status of this submission.
  33. version (Optional[str]): The string indicates the application version of the submission to instantiate.
  34. If not provided, the latest version is used.
  35. """
  36. _ID_PREFIX = "SUBMISSION"
  37. _MANAGER_NAME = "submission"
  38. __SEPARATOR = "_"
  39. lock = threading.Lock()
  40. def __init__(
  41. self,
  42. entity_id: str,
  43. entity_type: str,
  44. entity_config_id: Optional[str] = None,
  45. id: Optional[str] = None,
  46. jobs: Optional[Union[List[Job], List[JobId]]] = None,
  47. properties: Optional[Dict[str, Any]] = None,
  48. creation_date: Optional[datetime] = None,
  49. submission_status: Optional[SubmissionStatus] = None,
  50. version: Optional[str] = None,
  51. ):
  52. self._entity_id = entity_id
  53. self._entity_type = entity_type
  54. self._entity_config_id = entity_config_id
  55. self.id = id or self.__new_id()
  56. self._jobs: Union[List[Job], List[JobId], List] = jobs or []
  57. self._creation_date = creation_date or datetime.now()
  58. self._submission_status = submission_status or SubmissionStatus.SUBMITTED
  59. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  60. properties = properties or {}
  61. self._properties = _Properties(self, **properties.copy())
  62. self._is_abandoned = False
  63. self._is_completed = False
  64. self._is_canceled = False
  65. self._running_jobs: Set = set()
  66. self._blocked_jobs: Set = set()
  67. self._pending_jobs: Set = set()
  68. @staticmethod
  69. def __new_id() -> str:
  70. """Generate a unique Submission identifier."""
  71. return SubmissionId(Submission.__SEPARATOR.join([Submission._ID_PREFIX, str(uuid.uuid4())]))
  72. @property
  73. def entity_id(self) -> str:
  74. return self._entity_id
  75. @property
  76. def entity_type(self) -> str:
  77. return self._entity_type
  78. @property
  79. def entity_config_id(self) -> Optional[str]:
  80. return self._entity_config_id
  81. @property
  82. def properties(self):
  83. self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
  84. return self._properties
  85. @property
  86. def creation_date(self):
  87. return self._creation_date
  88. def get_label(self) -> str:
  89. """Returns the submission simple label prefixed by its owner label.
  90. Returns:
  91. The label of the submission as a string.
  92. """
  93. return self._get_label()
  94. def get_simple_label(self) -> str:
  95. """Returns the submission simple label.
  96. Returns:
  97. The simple label of the submission as a string.
  98. """
  99. return self._get_simple_label()
  100. @property # type: ignore
  101. @_self_reload(_MANAGER_NAME)
  102. def jobs(self) -> List[Job]:
  103. from ..job._job_manager_factory import _JobManagerFactory
  104. jobs = []
  105. job_manager = _JobManagerFactory._build_manager()
  106. for job in self._jobs:
  107. jobs.append(job_manager._get(job))
  108. return jobs
  109. @jobs.setter # type: ignore
  110. @_self_setter(_MANAGER_NAME)
  111. def jobs(self, jobs: Union[List[Job], List[JobId]]):
  112. self._jobs = jobs
  113. def __hash__(self):
  114. return hash(self.id)
  115. def __eq__(self, other):
  116. return self.id == other.id
  117. @property # type: ignore
  118. @_self_reload(_MANAGER_NAME)
  119. def submission_status(self) -> SubmissionStatus:
  120. return self._submission_status
  121. @submission_status.setter # type: ignore
  122. @_self_setter(_MANAGER_NAME)
  123. def submission_status(self, submission_status):
  124. self._submission_status = submission_status
  125. @property # type: ignore
  126. @_self_reload(_MANAGER_NAME)
  127. def is_abandoned(self) -> bool:
  128. return self._is_abandoned
  129. @is_abandoned.setter # type: ignore
  130. @_self_setter(_MANAGER_NAME)
  131. def is_abandoned(self, val):
  132. self._is_abandoned = val
  133. @property # type: ignore
  134. @_self_reload(_MANAGER_NAME)
  135. def is_completed(self) -> bool:
  136. return self._is_completed
  137. @is_completed.setter # type: ignore
  138. @_self_setter(_MANAGER_NAME)
  139. def is_completed(self, val):
  140. self._is_completed = val
  141. @property # type: ignore
  142. @_self_reload(_MANAGER_NAME)
  143. def is_canceled(self) -> bool:
  144. return self._is_canceled
  145. @is_canceled.setter # type: ignore
  146. @_self_setter(_MANAGER_NAME)
  147. def is_canceled(self, val):
  148. self._is_canceled = val
  149. def __lt__(self, other):
  150. return self.creation_date.timestamp() < other.creation_date.timestamp()
  151. def __le__(self, other):
  152. return self.creation_date.timestamp() <= other.creation_date.timestamp()
  153. def __gt__(self, other):
  154. return self.creation_date.timestamp() > other.creation_date.timestamp()
  155. def __ge__(self, other):
  156. return self.creation_date.timestamp() >= other.creation_date.timestamp()
  157. def _update_submission_status(self, job: Job):
  158. from ._submission_manager_factory import _SubmissionManagerFactory
  159. submission_manager = _SubmissionManagerFactory._build_manager()
  160. submission = submission_manager._get(self)
  161. if submission._submission_status == SubmissionStatus.FAILED:
  162. return
  163. with self.lock:
  164. job_status = job.status
  165. if job_status == Status.FAILED:
  166. submission._submission_status = SubmissionStatus.FAILED
  167. _SubmissionManagerFactory._build_manager()._set(submission)
  168. return
  169. if job_status == Status.CANCELED:
  170. submission._is_canceled = True
  171. elif job_status == Status.BLOCKED:
  172. submission._blocked_jobs.add(job.id)
  173. submission._pending_jobs.discard(job.id)
  174. elif job_status == Status.PENDING or job_status == Status.SUBMITTED:
  175. submission._pending_jobs.add(job.id)
  176. submission._blocked_jobs.discard(job.id)
  177. elif job_status == Status.RUNNING:
  178. submission._running_jobs.add(job.id)
  179. submission._pending_jobs.discard(job.id)
  180. elif job_status == Status.COMPLETED or job_status == Status.SKIPPED:
  181. submission._is_completed = True # type: ignore
  182. submission._blocked_jobs.discard(job.id)
  183. submission._pending_jobs.discard(job.id)
  184. submission._running_jobs.discard(job.id)
  185. elif job_status == Status.ABANDONED:
  186. submission._is_abandoned = True # type: ignore
  187. submission._running_jobs.discard(job.id)
  188. submission._blocked_jobs.discard(job.id)
  189. submission._pending_jobs.discard(job.id)
  190. submission_manager._set(submission)
  191. # The submission_status is set later to make sure notification for updating
  192. # the submission_status attribute is triggered
  193. if submission._is_canceled:
  194. submission.submission_status = SubmissionStatus.CANCELED
  195. elif submission._is_abandoned:
  196. submission.submission_status = SubmissionStatus.UNDEFINED
  197. elif submission._running_jobs:
  198. submission.submission_status = SubmissionStatus.RUNNING
  199. elif submission._pending_jobs:
  200. submission.submission_status = SubmissionStatus.PENDING
  201. elif submission._blocked_jobs:
  202. submission.submission_status = SubmissionStatus.BLOCKED
  203. elif submission._is_completed:
  204. submission.submission_status = SubmissionStatus.COMPLETED
  205. else:
  206. submission.submission_status = SubmissionStatus.UNDEFINED
  207. def is_finished(self) -> bool:
  208. """Indicate if the submission is finished.
  209. Returns:
  210. True if the submission is finished.
  211. """
  212. return self.submission_status in [
  213. SubmissionStatus.COMPLETED,
  214. SubmissionStatus.FAILED,
  215. SubmissionStatus.CANCELED,
  216. ]
  217. def is_deletable(self) -> bool:
  218. """Indicate if the submission can be deleted.
  219. Returns:
  220. True if the submission can be deleted. False otherwise.
  221. """
  222. from ... import core as tp
  223. return tp.is_deletable(self)
  224. @_make_event.register(Submission)
  225. def _make_event_for_submission(
  226. submission: Submission,
  227. operation: EventOperation,
  228. /,
  229. attribute_name: Optional[str] = None,
  230. attribute_value: Optional[Any] = None,
  231. **kwargs,
  232. ) -> Event:
  233. metadata = {
  234. "origin_entity_id": submission.entity_id,
  235. "origin_entity_type": submission.entity_type,
  236. "origin_entity_config_id": submission.entity_config_id,
  237. "creation_date": submission.creation_date,
  238. "version": submission._version,
  239. **kwargs,
  240. }
  241. return Event(
  242. entity_type=EventEntityType.SUBMISSION,
  243. entity_id=submission.id,
  244. operation=operation,
  245. attribute_name=attribute_name,
  246. attribute_value=attribute_value,
  247. metadata=metadata,
  248. )