task.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import uuid
  12. from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Union
  13. from taipy.config.common._template_handler import _TemplateHandler as _tpl
  14. from taipy.config.common._validate_id import _validate_id
  15. from taipy.config.common.scope import Scope
  16. from .._entity._entity import _Entity
  17. from .._entity._labeled import _Labeled
  18. from .._entity._properties import _Properties
  19. from .._entity._reload import _Reloader, _self_reload, _self_setter
  20. from .._version._version_manager_factory import _VersionManagerFactory
  21. from ..data.data_node import DataNode
  22. from ..notification.event import Event, EventEntityType, EventOperation, _make_event
  23. from ..submission.submission import Submission
  24. from .task_id import TaskId
  25. class Task(_Entity, _Labeled):
  26. """Hold a user function that will be executed, its parameters and the results.
  27. A `Task` brings together the user code as function, the inputs and the outputs
  28. as data nodes (instances of the `DataNode^` class).
  29. !!! note
  30. It is not recommended to instantiate a `Task` directly. Instead, it should be
  31. created with the `create_scenario()^` function. When creating a `Scenario^`,
  32. the related data nodes and tasks are created automatically. Please refer to
  33. the `Scenario^` class for more information.
  34. A task's attributes (the input data nodes, the output data nodes, the Python
  35. function) are populated based on its task configuration `TaskConfig^`.
  36. !!! Example
  37. ```python
  38. import taipy as tp
  39. from taipy import Config
  40. def by_two(x: int):
  41. return x * 2
  42. # Configure data nodes, tasks and scenarios
  43. input_cfg = Config.configure_data_node("my_input", default_data=2)
  44. result_cfg = Config.configure_data_node("my_result")
  45. task_cfg = Config.configure_task("my_double", function=by_two, input=input_cfg, output=result_cfg)
  46. scenario_cfg = Config.configure_scenario("my_scenario", task_configs=[task_cfg])
  47. # Instantiate a task along with a scenario
  48. sc = tp.create_scenario(scenario_cfg)
  49. # Retrieve task and data nodes from scenario
  50. task_input = sc.my_input
  51. double_task = sc.my_double
  52. task_result = sc.my_result
  53. # Write the input data and submit the task
  54. task_input.write(3)
  55. double_task.submit()
  56. # Read the result
  57. print(task_result.read()) # Output: 6
  58. # Retrieve the list of all tasks
  59. all_tasks = tp.get_tasks()
  60. ```
  61. Attributes:
  62. config_id (str): The identifier of the `TaskConfig^`.
  63. properties (dict[str, Any]): A dictionary of additional properties.
  64. function (callable): The python function to execute. The _function_ must take as parameter the
  65. data referenced by inputs data nodes, and must return the data referenced by outputs data nodes.
  66. input (Union[DataNode^, List[DataNode^]]): The list of inputs.
  67. output (Union[DataNode^, List[DataNode^]]): The list of outputs.
  68. id (str): The unique identifier of the task.
  69. owner_id (str): The identifier of the owner (sequence_id, scenario_id, cycle_id) or None.
  70. parent_ids (Optional[Set[str]]): The set of identifiers of the parent sequences.
  71. version (str): The string indicates the application version of the task to instantiate. If not provided, the
  72. latest version is used.
  73. skippable (bool): If True, indicates that the task can be skipped if no change has been made on inputs. The
  74. default value is _False_.
  75. """
  76. _ID_PREFIX = "TASK"
  77. __ID_SEPARATOR = "_"
  78. _MANAGER_NAME = "task"
  79. def __init__(
  80. self,
  81. config_id: str,
  82. properties: Dict[str, Any],
  83. function: Callable,
  84. input: Optional[Iterable[DataNode]] = None,
  85. output: Optional[Iterable[DataNode]] = None,
  86. id: Optional[TaskId] = None,
  87. owner_id: Optional[str] = None,
  88. parent_ids: Optional[Set[str]] = None,
  89. version: Optional[str] = None,
  90. skippable: bool = False,
  91. ) -> None:
  92. self._config_id = _validate_id(config_id)
  93. self.id = id or TaskId(self.__ID_SEPARATOR.join([self._ID_PREFIX, self.config_id, str(uuid.uuid4())]))
  94. self._owner_id = owner_id
  95. self._parent_ids = parent_ids or set()
  96. self._input = {dn.config_id: dn for dn in input or []}
  97. self._output = {dn.config_id: dn for dn in output or []}
  98. self._function = function
  99. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  100. self._skippable = skippable
  101. self._properties = _Properties(self, **properties)
  102. def __hash__(self):
  103. return hash(self.id)
  104. def __eq__(self, other):
  105. return isinstance(other, Task) and self.id == other.id
  106. def __getstate__(self):
  107. return vars(self)
  108. def __setstate__(self, state):
  109. vars(self).update(state)
  110. def __getattr__(self, attribute_name):
  111. protected_attribute_name = _validate_id(attribute_name)
  112. if protected_attribute_name in self._properties:
  113. return _tpl._replace_templates(self._properties[protected_attribute_name])
  114. if protected_attribute_name in self.input:
  115. return self.input[protected_attribute_name]
  116. if protected_attribute_name in self.output:
  117. return self.output[protected_attribute_name]
  118. raise AttributeError(f"{attribute_name} is not an attribute of task {self.id}")
  119. @property
  120. def properties(self):
  121. self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
  122. return self._properties
  123. @property
  124. def config_id(self):
  125. return self._config_id
  126. @property
  127. def owner_id(self):
  128. return self._owner_id
  129. def get_parents(self):
  130. """Get parents of the task."""
  131. from ... import core as tp
  132. return tp.get_parents(self)
  133. @property # type: ignore
  134. @_self_reload(_MANAGER_NAME)
  135. def parent_ids(self):
  136. return self._parent_ids
  137. @property
  138. def input(self) -> Dict[str, DataNode]:
  139. return self._input
  140. @property
  141. def output(self) -> Dict[str, DataNode]:
  142. return self._output
  143. @property
  144. def data_nodes(self) -> Dict[str, DataNode]:
  145. return {**self.input, **self.output}
  146. @property # type: ignore
  147. @_self_reload(_MANAGER_NAME)
  148. def function(self):
  149. return self._function
  150. @function.setter # type: ignore
  151. @_self_setter(_MANAGER_NAME)
  152. def function(self, val):
  153. self._function = val
  154. @property # type: ignore
  155. @_self_reload(_MANAGER_NAME)
  156. def skippable(self):
  157. return self._skippable
  158. @skippable.setter # type: ignore
  159. @_self_setter(_MANAGER_NAME)
  160. def skippable(self, val):
  161. self._skippable = val
  162. @property
  163. def scope(self) -> Scope:
  164. """Retrieve the lowest scope of the task based on its data nodes.
  165. Returns:
  166. The lowest scope present in input and output data nodes or GLOBAL if there are
  167. either no input or no output.
  168. """
  169. data_nodes = list(self._input.values()) + list(self._output.values())
  170. return Scope(min(dn.scope for dn in data_nodes)) if len(data_nodes) != 0 else Scope.GLOBAL
  171. @property
  172. def version(self):
  173. return self._version
  174. def submit(
  175. self,
  176. callbacks: Optional[List[Callable]] = None,
  177. force: bool = False,
  178. wait: bool = False,
  179. timeout: Optional[Union[float, int]] = None,
  180. **properties,
  181. ) -> Submission:
  182. """Submit the task for execution.
  183. Parameters:
  184. callbacks (List[Callable]): The list of callable functions to be called on status
  185. change.
  186. force (bool): Force execution even if the data nodes are in cache.
  187. wait (bool): Wait for the orchestrated job created from the task submission to be finished in asynchronous
  188. mode.
  189. timeout (Union[float, int]): The maximum number of seconds to wait for the job to be finished before
  190. returning.<br/>
  191. If not provided and *wait* is True, the function waits indefinitely.
  192. **properties (dict[str, any]): A keyworded variable length list of additional arguments.
  193. Returns:
  194. A `Submission^` containing the information of the submission.
  195. """
  196. from ._task_manager_factory import _TaskManagerFactory
  197. return _TaskManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout, **properties)
  198. def get_label(self) -> str:
  199. """Returns the task simple label prefixed by its owner label.
  200. Returns:
  201. The label of the task as a string.
  202. """
  203. return self._get_label()
  204. def get_simple_label(self) -> str:
  205. """Returns the task simple label.
  206. Returns:
  207. The simple label of the task as a string.
  208. """
  209. return self._get_simple_label()
  210. @_make_event.register(Task)
  211. def _make_event_for_task(
  212. task: Task,
  213. operation: EventOperation,
  214. /,
  215. attribute_name: Optional[str] = None,
  216. attribute_value: Optional[Any] = None,
  217. **kwargs,
  218. ) -> Event:
  219. metadata = {"version": task.version, "config_id": task.config_id, **kwargs}
  220. return Event(
  221. entity_type=EventEntityType.TASK,
  222. entity_id=task.id,
  223. operation=operation,
  224. attribute_name=attribute_name,
  225. attribute_value=attribute_value,
  226. metadata=metadata,
  227. )