_submission_manager.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from threading import Lock
  12. from typing import List, Optional, Union
  13. from taipy.logger._taipy_logger import _TaipyLogger
  14. from .._entity._entity_ids import _EntityIds
  15. from .._manager._manager import _Manager
  16. from .._repository._abstract_repository import _AbstractRepository
  17. from .._version._version_mixin import _VersionMixin
  18. from ..exceptions.exceptions import SubmissionNotDeletedException
  19. from ..job.job import Job, Status
  20. from ..notification import EventEntityType, EventOperation, Notifier, _make_event
  21. from ..scenario.scenario import Scenario
  22. from ..sequence.sequence import Sequence
  23. from ..submission.submission import Submission, SubmissionId, SubmissionStatus
  24. from ..task.task import Task
  25. class _SubmissionManager(_Manager[Submission], _VersionMixin):
  26. _ENTITY_NAME = Submission.__name__
  27. _repository: _AbstractRepository
  28. _EVENT_ENTITY_TYPE = EventEntityType.SUBMISSION
  29. __lock = Lock()
  30. __logger = _TaipyLogger._get_logger()
  31. @classmethod
  32. def _get_all(cls, version_number: Optional[str] = None) -> List[Submission]:
  33. """
  34. Returns all entities.
  35. """
  36. filters = cls._build_filters_with_version(version_number)
  37. return cls._repository._load_all(filters)
  38. @classmethod
  39. def _create(cls, entity_id: str, entity_type: str, entity_config: Optional[str], **properties) -> Submission:
  40. submission = Submission(
  41. entity_id=entity_id, entity_type=entity_type, entity_config_id=entity_config, properties=properties
  42. )
  43. cls._set(submission)
  44. Notifier.publish(_make_event(submission, EventOperation.CREATION))
  45. return submission
  46. @classmethod
  47. def _update_submission_status(cls, submission: Submission, job: Job) -> None:
  48. with cls.__lock:
  49. submission = cls._get(submission)
  50. if submission._submission_status == SubmissionStatus.FAILED:
  51. return
  52. job_status = job.status
  53. if job_status == Status.FAILED:
  54. submission._submission_status = SubmissionStatus.FAILED
  55. cls._set(submission)
  56. cls.__logger.debug(
  57. f"{job.id} status is {job_status}. Submission status set to `{submission._submission_status}`."
  58. )
  59. return
  60. if job_status == Status.CANCELED:
  61. submission._is_canceled = True
  62. elif job_status == Status.BLOCKED:
  63. submission._blocked_jobs.add(job.id)
  64. submission._pending_jobs.discard(job.id)
  65. elif job_status == Status.PENDING or job_status == Status.SUBMITTED:
  66. submission._pending_jobs.add(job.id)
  67. submission._blocked_jobs.discard(job.id)
  68. elif job_status == Status.RUNNING:
  69. submission._running_jobs.add(job.id)
  70. submission._pending_jobs.discard(job.id)
  71. elif job_status == Status.COMPLETED or job_status == Status.SKIPPED:
  72. submission._is_completed = True # type: ignore
  73. submission._blocked_jobs.discard(job.id)
  74. submission._pending_jobs.discard(job.id)
  75. submission._running_jobs.discard(job.id)
  76. elif job_status == Status.ABANDONED:
  77. submission._is_abandoned = True # type: ignore
  78. submission._running_jobs.discard(job.id)
  79. submission._blocked_jobs.discard(job.id)
  80. submission._pending_jobs.discard(job.id)
  81. cls._set(submission)
  82. # The submission_status is set later to make sure notification for updating
  83. # the submission_status attribute is triggered
  84. if submission._is_canceled:
  85. cls.__set_submission_status(submission, SubmissionStatus.CANCELED, job)
  86. elif submission._is_abandoned:
  87. cls.__set_submission_status(submission, SubmissionStatus.UNDEFINED, job)
  88. elif submission._running_jobs:
  89. cls.__set_submission_status(submission, SubmissionStatus.RUNNING, job)
  90. elif submission._pending_jobs:
  91. cls.__set_submission_status(submission, SubmissionStatus.PENDING, job)
  92. elif submission._blocked_jobs:
  93. cls.__set_submission_status(submission, SubmissionStatus.BLOCKED, job)
  94. elif submission._is_completed:
  95. cls.__set_submission_status(submission, SubmissionStatus.COMPLETED, job)
  96. else:
  97. cls.__set_submission_status(submission, SubmissionStatus.UNDEFINED, job)
  98. cls.__logger.debug(
  99. f"{job.id} status is {job_status}. Submission status set to `{submission._submission_status}`"
  100. )
  101. @classmethod
  102. def __set_submission_status(cls, submission: Submission, new_submission_status: SubmissionStatus, job: Job) -> None:
  103. if not submission._is_in_context:
  104. submission = cls._get(submission)
  105. _current_submission_status = submission._submission_status
  106. submission._submission_status = new_submission_status
  107. cls._set(submission)
  108. if _current_submission_status != submission._submission_status:
  109. event = _make_event(
  110. submission,
  111. EventOperation.UPDATE,
  112. "submission_status",
  113. submission._submission_status,
  114. job_triggered_submission_status_changed=job.id,
  115. )
  116. if not submission._is_in_context:
  117. Notifier.publish(event)
  118. else:
  119. submission._in_context_attributes_changed_collector.append(event)
  120. @classmethod
  121. def _get_latest(cls, entity: Union[Scenario, Sequence, Task]) -> Optional[Submission]:
  122. entity_id = entity.id if not isinstance(entity, str) else entity
  123. submissions_of_task = list(filter(lambda submission: submission.entity_id == entity_id, cls._get_all()))
  124. if len(submissions_of_task) == 0:
  125. return None
  126. if len(submissions_of_task) == 1:
  127. return submissions_of_task[0]
  128. else:
  129. return max(submissions_of_task)
  130. @classmethod
  131. def _delete(cls, submission: Union[Submission, SubmissionId]) -> None:
  132. if isinstance(submission, str):
  133. submission = cls._get(submission)
  134. if cls._is_deletable(submission):
  135. super()._delete(submission.id)
  136. else:
  137. err = SubmissionNotDeletedException(submission.id)
  138. cls._logger.error(err)
  139. raise err
  140. @classmethod
  141. def _hard_delete(cls, submission_id: SubmissionId) -> None:
  142. submission = cls._get(submission_id)
  143. entity_ids_to_delete = cls._get_children_entity_ids(submission)
  144. entity_ids_to_delete.submission_ids.add(submission.id)
  145. cls._delete_entities_of_multiple_types(entity_ids_to_delete)
  146. @classmethod
  147. def _get_children_entity_ids(cls, submission: Submission) -> _EntityIds:
  148. entity_ids = _EntityIds()
  149. for job in submission.jobs:
  150. entity_ids.job_ids.add(job.id)
  151. return entity_ids
  152. @classmethod
  153. def _is_deletable(cls, submission: Union[Submission, SubmissionId]) -> bool:
  154. if isinstance(submission, str):
  155. submission = cls._get(submission)
  156. return submission.is_finished() or submission.submission_status == SubmissionStatus.UNDEFINED