_task_manager.py 8.8 KB


  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from typing import Callable, List, Optional, Type, Union, cast
  12. from taipy.config import Config
  13. from taipy.config.common.scope import Scope
  14. from .._entity._entity_ids import _EntityIds
  15. from .._manager._manager import _Manager
  16. from .._orchestrator._abstract_orchestrator import _AbstractOrchestrator
  17. from .._repository._abstract_repository import _AbstractRepository
  18. from .._version._version_manager_factory import _VersionManagerFactory
  19. from .._version._version_mixin import _VersionMixin
  20. from ..common.warn_if_inputs_not_ready import _warn_if_inputs_not_ready
  21. from ..config.task_config import TaskConfig
  22. from ..cycle.cycle_id import CycleId
  23. from ..data._data_manager_factory import _DataManagerFactory
  24. from ..exceptions.exceptions import NonExistingTask
  25. from ..notification import EventEntityType, EventOperation, Notifier, _make_event
  26. from ..reason import DataNodeEditInProgress, DataNodeIsNotWritten, EntityIsNotSubmittableEntity, ReasonCollection
  27. from ..scenario.scenario_id import ScenarioId
  28. from ..sequence.sequence_id import SequenceId
  29. from ..submission.submission import Submission
  30. from ..task.task import Task
  31. from .task_id import TaskId
  32. class _TaskManager(_Manager[Task], _VersionMixin):
  33. _ENTITY_NAME = Task.__name__
  34. _repository: _AbstractRepository
  35. _EVENT_ENTITY_TYPE = EventEntityType.TASK
  36. @classmethod
  37. def _orchestrator(cls) -> Type[_AbstractOrchestrator]:
  38. from .._orchestrator._orchestrator_factory import _OrchestratorFactory
  39. return _OrchestratorFactory._build_orchestrator()
  40. @classmethod
  41. def _set(cls, task: Task) -> None:
  42. cls.__save_data_nodes(task.input.values())
  43. cls.__save_data_nodes(task.output.values())
  44. super()._set(task)
  45. @classmethod
  46. def _bulk_get_or_create(
  47. cls,
  48. task_configs: List[TaskConfig],
  49. cycle_id: Optional[CycleId] = None,
  50. scenario_id: Optional[ScenarioId] = None,
  51. ) -> List[Task]:
  52. data_node_configs = set()
  53. for task_config in task_configs:
  54. data_node_configs.update([Config.data_nodes[dnc.id] for dnc in task_config.input_configs])
  55. data_node_configs.update([Config.data_nodes[dnc.id] for dnc in task_config.output_configs])
  56. data_nodes = _DataManagerFactory._build_manager()._bulk_get_or_create(
  57. list(data_node_configs), cycle_id, scenario_id
  58. )
  59. tasks_configs_and_owner_id = []
  60. for task_config in task_configs:
  61. task_dn_configs = [Config.data_nodes[dnc.id] for dnc in task_config.output_configs] + [
  62. Config.data_nodes[dnc.id] for dnc in task_config.input_configs
  63. ]
  64. task_config_data_nodes = [data_nodes[dn_config] for dn_config in task_dn_configs]
  65. scope = min(dn.scope for dn in task_config_data_nodes) if len(task_config_data_nodes) != 0 else Scope.GLOBAL
  66. owner_id: Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]
  67. if scope == Scope.SCENARIO:
  68. owner_id = scenario_id
  69. elif scope == Scope.CYCLE:
  70. owner_id = cycle_id
  71. else:
  72. owner_id = None
  73. tasks_configs_and_owner_id.append((task_config, owner_id))
  74. tasks_by_config = cls._repository._get_by_configs_and_owner_ids( # type: ignore
  75. tasks_configs_and_owner_id, cls._build_filters_with_version(None)
  76. )
  77. tasks = []
  78. for task_config, owner_id in tasks_configs_and_owner_id:
  79. if task := tasks_by_config.get((task_config, owner_id)):
  80. tasks.append(task)
  81. else:
  82. version = _VersionManagerFactory._build_manager()._get_latest_version()
  83. inputs = [
  84. data_nodes[input_config]
  85. for input_config in [Config.data_nodes[dnc.id] for dnc in task_config.input_configs]
  86. ]
  87. outputs = [
  88. data_nodes[output_config]
  89. for output_config in [Config.data_nodes[dnc.id] for dnc in task_config.output_configs]
  90. ]
  91. skippable = task_config.skippable
  92. task = Task(
  93. str(task_config.id),
  94. dict(**task_config._properties),
  95. cast(Callable, task_config.function),
  96. inputs,
  97. outputs,
  98. owner_id=owner_id,
  99. parent_ids=set(),
  100. version=version,
  101. skippable=skippable,
  102. )
  103. for dn in set(inputs + outputs):
  104. dn._parent_ids.update([task.id])
  105. cls._set(task)
  106. Notifier.publish(_make_event(task, EventOperation.CREATION))
  107. tasks.append(task)
  108. return tasks
  109. @classmethod
  110. def _get_all(cls, version_number: Optional[str] = None) -> List[Task]:
  111. """
  112. Returns all entities.
  113. """
  114. filters = cls._build_filters_with_version(version_number)
  115. return cls._repository._load_all(filters)
  116. @classmethod
  117. def __save_data_nodes(cls, data_nodes) -> None:
  118. data_manager = _DataManagerFactory._build_manager()
  119. for i in data_nodes:
  120. data_manager._set(i)
  121. @classmethod
  122. def _hard_delete(cls, task_id: TaskId) -> None:
  123. task = cls._get(task_id)
  124. entity_ids_to_delete = cls._get_children_entity_ids(task)
  125. entity_ids_to_delete.task_ids.add(task.id)
  126. cls._delete_entities_of_multiple_types(entity_ids_to_delete)
  127. @classmethod
  128. def _get_children_entity_ids(cls, task: Task) -> _EntityIds:
  129. entity_ids = _EntityIds()
  130. from ..job._job_manager_factory import _JobManagerFactory
  131. from ..submission._submission_manager_factory import _SubmissionManagerFactory
  132. jobs = _JobManagerFactory._build_manager()._get_all()
  133. for job in jobs:
  134. if job.task.id == task.id:
  135. entity_ids.job_ids.add(job.id)
  136. submissions = _SubmissionManagerFactory._build_manager()._get_all()
  137. submitted_entity_ids = list(entity_ids.task_ids)
  138. for submission in submissions:
  139. if submission.entity_id in submitted_entity_ids:
  140. entity_ids.submission_ids.add(submission.id)
  141. return entity_ids
  142. @classmethod
  143. def _is_submittable(cls, task: Union[Task, TaskId]) -> ReasonCollection:
  144. if isinstance(task, str):
  145. task = cls._get(task)
  146. reason_collection = ReasonCollection()
  147. if not isinstance(task, Task):
  148. task = str(task)
  149. reason_collection._add_reason(task, EntityIsNotSubmittableEntity(task))
  150. else:
  151. data_manager = _DataManagerFactory._build_manager()
  152. for node in task.input.values():
  153. node = data_manager._get(node)
  154. if node._edit_in_progress:
  155. reason_collection._add_reason(node.id, DataNodeEditInProgress(node.id))
  156. if not node._last_edit_date:
  157. reason_collection._add_reason(node.id, DataNodeIsNotWritten(node.id))
  158. return reason_collection
  159. @classmethod
  160. def _submit(
  161. cls,
  162. task: Union[TaskId, Task],
  163. callbacks: Optional[List[Callable]] = None,
  164. force: bool = False,
  165. wait: bool = False,
  166. timeout: Optional[Union[float, int]] = None,
  167. check_inputs_are_ready: bool = True,
  168. **properties,
  169. ) -> Submission:
  170. task_id = task.id if isinstance(task, Task) else task
  171. task = cls._get(task_id)
  172. if task is None:
  173. raise NonExistingTask(task_id)
  174. if check_inputs_are_ready:
  175. _warn_if_inputs_not_ready(task.input.values())
  176. submission = cls._orchestrator().submit_task(
  177. task, callbacks=callbacks, force=force, wait=wait, timeout=timeout, **properties
  178. )
  179. Notifier.publish(_make_event(task, EventOperation.SUBMISSION))
  180. return submission
  181. @classmethod
  182. def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None) -> List[Task]:
  183. """
  184. Get all tasks by its config id.
  185. """
  186. filters = cls._build_filters_with_version(version_number)
  187. if not filters:
  188. filters = [{}]
  189. for fil in filters:
  190. fil.update({"config_id": config_id})
  191. return cls._repository._load_all(filters)