_job_manager.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import uuid
  12. from typing import Callable, Iterable, List, Optional, Union
  13. from .._manager._manager import _Manager
  14. from .._repository._abstract_repository import _AbstractRepository
  15. from .._version._version_manager_factory import _VersionManagerFactory
  16. from .._version._version_mixin import _VersionMixin
  17. from ..exceptions.exceptions import JobNotDeletedException
  18. from ..notification import EventEntityType, EventOperation, Notifier, _make_event
  19. from ..reason import JobIsNotFinished, ReasonCollection
  20. from ..task.task import Task
  21. from .job import Job
  22. from .job_id import JobId
  23. class _JobManager(_Manager[Job], _VersionMixin):
  24. _ENTITY_NAME = Job.__name__
  25. _ID_PREFIX = "JOB_"
  26. _repository: _AbstractRepository
  27. _EVENT_ENTITY_TYPE = EventEntityType.JOB
  28. @classmethod
  29. def _get_all(cls, version_number: Optional[str] = None) -> List[Job]:
  30. """
  31. Returns all entities.
  32. """
  33. filters = cls._build_filters_with_version(version_number)
  34. return cls._repository._load_all(filters)
  35. @classmethod
  36. def _create(
  37. cls, task: Task, callbacks: Iterable[Callable], submit_id: str, submit_entity_id: str, force=False
  38. ) -> Job:
  39. version = _VersionManagerFactory._build_manager()._get_latest_version()
  40. job = Job(
  41. id=JobId(f"{Job._ID_PREFIX}_{task.config_id}_{uuid.uuid4()}"),
  42. task=task,
  43. submit_id=submit_id,
  44. submit_entity_id=submit_entity_id,
  45. force=force,
  46. version=version,
  47. )
  48. job._on_status_change(*callbacks)
  49. cls._set(job)
  50. Notifier.publish(_make_event(job, EventOperation.CREATION))
  51. return job
  52. @classmethod
  53. def _delete(cls, job: Union[Job, JobId], force=False) -> None:
  54. if isinstance(job, str):
  55. job = cls._get(job)
  56. if cls._is_deletable(job) or force:
  57. super()._delete(job.id)
  58. else:
  59. err = JobNotDeletedException(job.id)
  60. cls._logger.error(err)
  61. raise err
  62. @classmethod
  63. def _cancel(cls, job: Union[str, Job]) -> None:
  64. job = cls._get(job) if isinstance(job, str) else job
  65. from .._orchestrator._orchestrator_factory import _OrchestratorFactory
  66. _OrchestratorFactory._build_orchestrator().cancel_job(job)
  67. @classmethod
  68. def _get_latest(cls, task: Task) -> Optional[Job]:
  69. jobs_of_task = list(filter(lambda job: task in job, cls._get_all()))
  70. if len(jobs_of_task) == 0:
  71. return None
  72. if len(jobs_of_task) == 1:
  73. return jobs_of_task[0]
  74. else:
  75. return max(jobs_of_task)
  76. @classmethod
  77. def _is_deletable(cls, job: Union[Job, JobId]) -> ReasonCollection:
  78. reason_collector = ReasonCollection()
  79. if isinstance(job, str):
  80. job = cls._get(job)
  81. if not job.is_finished():
  82. reason_collector._add_reason(job.id, JobIsNotFinished(job.id))
  83. return reason_collector