_job_manager.py 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import uuid
  12. from typing import Callable, Iterable, List, Optional, Union
  13. from .._manager._manager import _Manager
  14. from .._repository._abstract_repository import _AbstractRepository
  15. from .._version._version_manager_factory import _VersionManagerFactory
  16. from .._version._version_mixin import _VersionMixin
  17. from ..exceptions.exceptions import JobNotDeletedException
  18. from ..notification import EventEntityType, EventOperation, Notifier, _make_event
  19. from ..task.task import Task
  20. from .job import Job
  21. from .job_id import JobId
  22. class _JobManager(_Manager[Job], _VersionMixin):
  23. _ENTITY_NAME = Job.__name__
  24. _ID_PREFIX = "JOB_"
  25. _repository: _AbstractRepository
  26. _EVENT_ENTITY_TYPE = EventEntityType.JOB
  27. @classmethod
  28. def _get_all(cls, version_number: Optional[str] = None) -> List[Job]:
  29. """
  30. Returns all entities.
  31. """
  32. filters = cls._build_filters_with_version(version_number)
  33. return cls._repository._load_all(filters)
  34. @classmethod
  35. def _create(
  36. cls, task: Task, callbacks: Iterable[Callable], submit_id: str, submit_entity_id: str, force=False
  37. ) -> Job:
  38. version = _VersionManagerFactory._build_manager()._get_latest_version()
  39. job = Job(
  40. id=JobId(f"{Job._ID_PREFIX}_{task.config_id}_{uuid.uuid4()}"),
  41. task=task,
  42. submit_id=submit_id,
  43. submit_entity_id=submit_entity_id,
  44. force=force,
  45. version=version,
  46. )
  47. Notifier.publish(_make_event(job, EventOperation.CREATION))
  48. job._on_status_change(*callbacks)
  49. cls._set(job)
  50. return job
  51. @classmethod
  52. def _delete(cls, job: Job, force=False):
  53. if cls._is_deletable(job) or force:
  54. super()._delete(job.id)
  55. from .._orchestrator._dispatcher._job_dispatcher import _JobDispatcher
  56. _JobDispatcher._pop_dispatched_process(job.id)
  57. else:
  58. err = JobNotDeletedException(job.id)
  59. cls._logger.error(err)
  60. raise err
  61. @classmethod
  62. def _cancel(cls, job: Union[str, Job]):
  63. job = cls._get(job) if isinstance(job, str) else job
  64. from .._orchestrator._orchestrator_factory import _OrchestratorFactory
  65. _OrchestratorFactory._build_orchestrator().cancel_job(job)
  66. @classmethod
  67. def _get_latest(cls, task: Task) -> Optional[Job]:
  68. jobs_of_task = list(filter(lambda job: task in job, cls._get_all()))
  69. if len(jobs_of_task) == 0:
  70. return None
  71. if len(jobs_of_task) == 1:
  72. return jobs_of_task[0]
  73. else:
  74. return max(jobs_of_task)
  75. @classmethod
  76. def _is_deletable(cls, job: Union[Job, JobId]) -> bool:
  77. if isinstance(job, str):
  78. job = cls._get(job)
  79. if not job.is_finished():
  80. return False
  81. return True