_task_manager.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import itertools
  12. from typing import Callable, List, Optional, Type, Union, cast
  13. from taipy.common.config import Config
  14. from .._entity._entity_ids import _EntityIds
  15. from .._manager._manager import _Manager
  16. from .._orchestrator._abstract_orchestrator import _AbstractOrchestrator
  17. from .._repository._abstract_repository import _AbstractRepository
  18. from .._version._version_manager_factory import _VersionManagerFactory
  19. from .._version._version_mixin import _VersionMixin
  20. from ..common.scope import Scope
  21. from ..common.warn_if_inputs_not_ready import _warn_if_inputs_not_ready
  22. from ..config.task_config import TaskConfig
  23. from ..cycle.cycle_id import CycleId
  24. from ..data._data_manager_factory import _DataManagerFactory
  25. from ..exceptions.exceptions import NonExistingTask
  26. from ..notification import EventEntityType, EventOperation, Notifier, _make_event
  27. from ..reason import (
  28. DataNodeEditInProgress,
  29. DataNodeIsNotWritten,
  30. EntityDoesNotExist,
  31. EntityIsNotSubmittableEntity,
  32. ReasonCollection,
  33. )
  34. from ..scenario.scenario_id import ScenarioId
  35. from ..sequence.sequence_id import SequenceId
  36. from ..submission.submission import Submission
  37. from ..task.task import Task
  38. from .task_id import TaskId
  39. class _TaskManager(_Manager[Task], _VersionMixin):
  40. _ENTITY_NAME = Task.__name__
  41. _repository: _AbstractRepository
  42. _EVENT_ENTITY_TYPE = EventEntityType.TASK
  43. @classmethod
  44. def _orchestrator(cls) -> Type[_AbstractOrchestrator]:
  45. from .._orchestrator._orchestrator_factory import _OrchestratorFactory
  46. return _OrchestratorFactory._build_orchestrator()
  47. @classmethod
  48. def _create(cls, task: Task) -> None:
  49. for dn in itertools.chain(task.input.values(), task.output.values()):
  50. _DataManagerFactory._build_manager()._repository._save(dn)
  51. cls._repository._save(task)
  52. @classmethod
  53. def _get_owner_id(
  54. cls, scope, cycle_id, scenario_id
  55. ) -> Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]:
  56. if scope == Scope.SCENARIO:
  57. return scenario_id
  58. elif scope == Scope.CYCLE:
  59. return cycle_id
  60. else:
  61. return None
  62. @classmethod
  63. def _bulk_get_or_create(
  64. cls,
  65. task_configs: List[TaskConfig],
  66. cycle_id: Optional[CycleId] = None,
  67. scenario_id: Optional[ScenarioId] = None,
  68. ) -> List[Task]:
  69. data_node_configs = set()
  70. for task_config in task_configs:
  71. data_node_configs.update([Config.data_nodes[dnc.id] for dnc in task_config.input_configs])
  72. data_node_configs.update([Config.data_nodes[dnc.id] for dnc in task_config.output_configs])
  73. data_nodes = _DataManagerFactory._build_manager()._bulk_get_or_create(
  74. list(data_node_configs), cycle_id, scenario_id
  75. )
  76. tasks_configs_and_owner_id = []
  77. for task_config in task_configs:
  78. task_dn_configs = [Config.data_nodes[dnc.id] for dnc in task_config.output_configs] + [
  79. Config.data_nodes[dnc.id] for dnc in task_config.input_configs
  80. ]
  81. task_config_data_nodes = [data_nodes[dn_config] for dn_config in task_dn_configs]
  82. scope = min(dn.scope for dn in task_config_data_nodes) if len(task_config_data_nodes) != 0 else Scope.GLOBAL
  83. owner_id = cls._get_owner_id(scope, cycle_id, scenario_id)
  84. tasks_configs_and_owner_id.append((task_config, owner_id))
  85. tasks_by_config = cls._repository._get_by_configs_and_owner_ids( # type: ignore
  86. tasks_configs_and_owner_id, cls._build_filters_with_version(None)
  87. )
  88. tasks = []
  89. for task_config, owner_id in tasks_configs_and_owner_id:
  90. if task := tasks_by_config.get((task_config, owner_id)):
  91. tasks.append(task)
  92. else:
  93. version = _VersionManagerFactory._build_manager()._get_latest_version()
  94. inputs = [
  95. data_nodes[input_config]
  96. for input_config in [Config.data_nodes[dnc.id] for dnc in task_config.input_configs]
  97. ]
  98. outputs = [
  99. data_nodes[output_config]
  100. for output_config in [Config.data_nodes[dnc.id] for dnc in task_config.output_configs]
  101. ]
  102. skippable = task_config.skippable
  103. task = Task(
  104. str(task_config.id),
  105. dict(**task_config._properties),
  106. cast(Callable, task_config.function),
  107. inputs,
  108. outputs,
  109. owner_id=owner_id,
  110. parent_ids=set(),
  111. version=version,
  112. skippable=skippable,
  113. )
  114. for dn in set(inputs + outputs):
  115. dn._parent_ids.update([task.id])
  116. cls._create(task)
  117. Notifier.publish(_make_event(task, EventOperation.CREATION))
  118. tasks.append(task)
  119. return tasks
  120. @classmethod
  121. def _get_all(cls, version_number: Optional[str] = None) -> List[Task]:
  122. """
  123. Returns all entities.
  124. """
  125. filters = cls._build_filters_with_version(version_number)
  126. return cls._repository._load_all(filters)
  127. @classmethod
  128. def _hard_delete(cls, task_id: TaskId) -> None:
  129. task = cls._get(task_id)
  130. entity_ids_to_delete = cls._get_children_entity_ids(task)
  131. entity_ids_to_delete.task_ids.add(task.id)
  132. cls._delete_entities_of_multiple_types(entity_ids_to_delete)
  133. @classmethod
  134. def _get_children_entity_ids(cls, task: Task) -> _EntityIds:
  135. entity_ids = _EntityIds()
  136. from ..job._job_manager_factory import _JobManagerFactory
  137. from ..submission._submission_manager_factory import _SubmissionManagerFactory
  138. jobs = _JobManagerFactory._build_manager()._get_all()
  139. for job in jobs:
  140. if job.task.id == task.id:
  141. entity_ids.job_ids.add(job.id)
  142. submissions = _SubmissionManagerFactory._build_manager()._get_all()
  143. submitted_entity_ids = list(entity_ids.task_ids)
  144. for submission in submissions:
  145. if submission.entity_id in submitted_entity_ids:
  146. entity_ids.submission_ids.add(submission.id)
  147. return entity_ids
  148. @classmethod
  149. def _is_submittable(cls, task: Union[Task, TaskId]) -> ReasonCollection:
  150. reason_collection = ReasonCollection()
  151. if isinstance(task, str):
  152. task_id = task
  153. task = cls._get(task)
  154. if task is None:
  155. reason_collection._add_reason(task_id, EntityDoesNotExist(task_id))
  156. if not isinstance(task, Task):
  157. reason_collection._add_reason(str(task), EntityIsNotSubmittableEntity(str(task)))
  158. else:
  159. data_manager = _DataManagerFactory._build_manager()
  160. for node in task.input.values():
  161. node = data_manager._get(node)
  162. if node._edit_in_progress:
  163. reason_collection._add_reason(node.id, DataNodeEditInProgress(node.id))
  164. if not node._last_edit_date:
  165. reason_collection._add_reason(node.id, DataNodeIsNotWritten(node.id))
  166. return reason_collection
  167. @classmethod
  168. def _submit(
  169. cls,
  170. task: Union[TaskId, Task],
  171. callbacks: Optional[List[Callable]] = None,
  172. force: bool = False,
  173. wait: bool = False,
  174. timeout: Union[float, int, None] = None,
  175. check_inputs_are_ready: bool = True,
  176. **properties,
  177. ) -> Submission:
  178. task_id = task.id if isinstance(task, Task) else task
  179. task = cls._get(task_id)
  180. if task is None:
  181. raise NonExistingTask(task_id)
  182. if check_inputs_are_ready:
  183. _warn_if_inputs_not_ready(task.input.values())
  184. submission = cls._orchestrator().submit_task(
  185. task, callbacks=callbacks, force=force, wait=wait, timeout=timeout, **properties
  186. )
  187. Notifier.publish(_make_event(task, EventOperation.SUBMISSION))
  188. return submission
  189. @classmethod
  190. def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None) -> List[Task]:
  191. """
  192. Get all tasks by its config id.
  193. """
  194. filters = cls._build_filters_with_version(version_number)
  195. if not filters:
  196. filters = [{}]
  197. for fil in filters:
  198. fil.update({"config_id": config_id})
  199. return cls._repository._load_all(filters)
  200. @classmethod
  201. def _can_duplicate(cls, task: Union[Task, TaskId]) -> ReasonCollection:
  202. reason_collector = ReasonCollection()
  203. if isinstance(task, Task):
  204. task_id = task.id
  205. else:
  206. task_id = task
  207. if not cls._repository._exists(task_id):
  208. reason_collector._add_reason(task_id, EntityDoesNotExist(task_id))
  209. return reason_collector