123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- # Copyright 2021-2024 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- from threading import Lock
- from typing import List, Optional, Union
- from taipy.logger._taipy_logger import _TaipyLogger
- from .._entity._entity_ids import _EntityIds
- from .._manager._manager import _Manager
- from .._repository._abstract_repository import _AbstractRepository
- from .._version._version_mixin import _VersionMixin
- from ..exceptions.exceptions import SubmissionNotDeletedException
- from ..job.job import Job, Status
- from ..notification import EventEntityType, EventOperation, Notifier, _make_event
- from ..scenario.scenario import Scenario
- from ..sequence.sequence import Sequence
- from ..submission.submission import Submission, SubmissionId, SubmissionStatus
- from ..task.task import Task
- class _SubmissionManager(_Manager[Submission], _VersionMixin):
- _ENTITY_NAME = Submission.__name__
- _repository: _AbstractRepository
- _EVENT_ENTITY_TYPE = EventEntityType.SUBMISSION
- __lock = Lock()
- __logger = _TaipyLogger._get_logger()
- @classmethod
- def _get_all(cls, version_number: Optional[str] = None) -> List[Submission]:
- """
- Returns all entities.
- """
- filters = cls._build_filters_with_version(version_number)
- return cls._repository._load_all(filters)
- @classmethod
- def _create(cls, entity_id: str, entity_type: str, entity_config: Optional[str], **properties) -> Submission:
- submission = Submission(
- entity_id=entity_id, entity_type=entity_type, entity_config_id=entity_config, properties=properties
- )
- cls._set(submission)
- Notifier.publish(_make_event(submission, EventOperation.CREATION))
- return submission
- @classmethod
- def _update_submission_status(cls, submission: Submission, job: Job) -> None:
- with cls.__lock:
- submission = cls._get(submission)
- if submission._submission_status == SubmissionStatus.FAILED:
- return
- job_status = job.status
- if job_status == Status.FAILED:
- submission._submission_status = SubmissionStatus.FAILED
- cls._set(submission)
- cls.__logger.debug(
- f"{job.id} status is {job_status}. Submission status set to `{submission._submission_status}`."
- )
- return
- if job_status == Status.CANCELED:
- submission._is_canceled = True
- elif job_status == Status.BLOCKED:
- submission._blocked_jobs.add(job.id)
- submission._pending_jobs.discard(job.id)
- elif job_status == Status.PENDING or job_status == Status.SUBMITTED:
- submission._pending_jobs.add(job.id)
- submission._blocked_jobs.discard(job.id)
- elif job_status == Status.RUNNING:
- submission._running_jobs.add(job.id)
- submission._pending_jobs.discard(job.id)
- elif job_status == Status.COMPLETED or job_status == Status.SKIPPED:
- submission._is_completed = True # type: ignore
- submission._blocked_jobs.discard(job.id)
- submission._pending_jobs.discard(job.id)
- submission._running_jobs.discard(job.id)
- elif job_status == Status.ABANDONED:
- submission._is_abandoned = True # type: ignore
- submission._running_jobs.discard(job.id)
- submission._blocked_jobs.discard(job.id)
- submission._pending_jobs.discard(job.id)
- cls._set(submission)
- # The submission_status is set later to make sure notification for updating
- # the submission_status attribute is triggered
- if submission._is_canceled:
- cls.__set_submission_status(submission, SubmissionStatus.CANCELED, job)
- elif submission._is_abandoned:
- cls.__set_submission_status(submission, SubmissionStatus.UNDEFINED, job)
- elif submission._running_jobs:
- cls.__set_submission_status(submission, SubmissionStatus.RUNNING, job)
- elif submission._pending_jobs:
- cls.__set_submission_status(submission, SubmissionStatus.PENDING, job)
- elif submission._blocked_jobs:
- cls.__set_submission_status(submission, SubmissionStatus.BLOCKED, job)
- elif submission._is_completed:
- cls.__set_submission_status(submission, SubmissionStatus.COMPLETED, job)
- else:
- cls.__set_submission_status(submission, SubmissionStatus.UNDEFINED, job)
- cls.__logger.debug(
- f"{job.id} status is {job_status}. Submission status set to `{submission._submission_status}`"
- )
- @classmethod
- def __set_submission_status(cls, submission: Submission, new_submission_status: SubmissionStatus, job: Job) -> None:
- if not submission._is_in_context:
- submission = cls._get(submission)
- _current_submission_status = submission._submission_status
- submission._submission_status = new_submission_status
- cls._set(submission)
- if _current_submission_status != submission._submission_status:
- event = _make_event(
- submission,
- EventOperation.UPDATE,
- "submission_status",
- submission._submission_status,
- job_triggered_submission_status_changed=job.id,
- )
- if not submission._is_in_context:
- Notifier.publish(event)
- else:
- submission._in_context_attributes_changed_collector.append(event)
- @classmethod
- def _get_latest(cls, entity: Union[Scenario, Sequence, Task]) -> Optional[Submission]:
- entity_id = entity.id if not isinstance(entity, str) else entity
- submissions_of_task = list(filter(lambda submission: submission.entity_id == entity_id, cls._get_all()))
- if len(submissions_of_task) == 0:
- return None
- if len(submissions_of_task) == 1:
- return submissions_of_task[0]
- else:
- return max(submissions_of_task)
- @classmethod
- def _delete(cls, submission: Union[Submission, SubmissionId]) -> None:
- if isinstance(submission, str):
- submission = cls._get(submission)
- if cls._is_deletable(submission):
- super()._delete(submission.id)
- else:
- err = SubmissionNotDeletedException(submission.id)
- cls._logger.error(err)
- raise err
- @classmethod
- def _hard_delete(cls, submission_id: SubmissionId) -> None:
- submission = cls._get(submission_id)
- entity_ids_to_delete = cls._get_children_entity_ids(submission)
- entity_ids_to_delete.submission_ids.add(submission.id)
- cls._delete_entities_of_multiple_types(entity_ids_to_delete)
- @classmethod
- def _get_children_entity_ids(cls, submission: Submission) -> _EntityIds:
- entity_ids = _EntityIds()
- for job in submission.jobs:
- entity_ids.job_ids.add(job.id)
- return entity_ids
- @classmethod
- def _is_deletable(cls, submission: Union[Submission, SubmissionId]) -> bool:
- if isinstance(submission, str):
- submission = cls._get(submission)
- return submission.is_finished() or submission.submission_status == SubmissionStatus.UNDEFINED
|