task_config.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from copy import copy
  12. from typing import Any, Callable, Dict, List, Optional, Union, cast
  13. from taipy.common.config import Config
  14. from taipy.common.config._config import _Config
  15. from taipy.common.config.common._template_handler import _TemplateHandler as _tpl
  16. from taipy.common.config.section import Section
  17. from .data_node_config import DataNodeConfig
  18. class TaskConfig(Section):
  19. """Configuration fields needed to instantiate an actual `Task^`."""
  20. # Attributes:
  21. # inputs (Union[DataNodeConfig^, List[DataNodeConfig^]]): The optional list of
  22. # `DataNodeConfig^` inputs.<br/>
  23. # The default value is [].
  24. # outputs (Union[DataNodeConfig^, List[DataNodeConfig^]]): The optional list of
  25. # `DataNodeConfig^` outputs.<br/>
  26. # The default value is [].
  27. # skippable (bool): If True, indicates that the task can be skipped if no change has
  28. # been made on inputs.<br/>
  29. # The default value is False.
  30. # function (Callable): User function taking as inputs some parameters compatible with the
  31. # exposed types (*exposed_type* field) of the input data nodes and returning results
  32. # compatible with the exposed types (*exposed_type* field) of the outputs list.<br/>
  33. # The default value is None.
  34. name = "TASK"
  35. _INPUT_KEY = "inputs"
  36. _FUNCTION = "function"
  37. _OUTPUT_KEY = "outputs"
  38. _IS_SKIPPABLE_KEY = "skippable"
  39. function: Optional[Callable]
  40. """User function taking as inputs some parameters compatible with the data type
  41. (*exposed_type* field) of the input data nodes and returning results compatible with the
  42. data type (*exposed_type* field) of the outputs list."""
  43. # NOTE: # {task_type: ["required_key1"]}
  44. _REQUIRED_PROPERTIES: Dict[str, List[str]] = {}
  45. def __init__(
  46. self,
  47. id: str,
  48. function: Optional[Callable],
  49. inputs: Optional[Union[DataNodeConfig, List[DataNodeConfig]]] = None,
  50. outputs: Optional[Union[DataNodeConfig, List[DataNodeConfig]]] = None,
  51. skippable: bool = False,
  52. **properties,
  53. ) -> None:
  54. if inputs:
  55. self._inputs = [inputs] if isinstance(inputs, DataNodeConfig) else copy(inputs)
  56. else:
  57. self._inputs = []
  58. if outputs:
  59. self._outputs = [outputs] if isinstance(outputs, DataNodeConfig) else copy(outputs)
  60. else:
  61. self._outputs = []
  62. self._skippable: bool = skippable
  63. self.function: Optional[Callable] = function
  64. super().__init__(id, **properties)
  65. def __copy__(self):
  66. return TaskConfig(
  67. self.id, self.function, copy(self._inputs), copy(self._outputs), self.skippable, **copy(self._properties)
  68. )
  69. def __getattr__(self, item: str) -> Optional[Any]:
  70. return _tpl._replace_templates(self._properties.get(item))
  71. @property
  72. def input_configs(self) -> List[DataNodeConfig]:
  73. """The list of the input data node configurations."""
  74. return list(self._inputs)
  75. @property
  76. def inputs(self) -> List[DataNodeConfig]:
  77. """The list of the input data node configurations."""
  78. return list(self._inputs)
  79. @property
  80. def output_configs(self) -> List[DataNodeConfig]:
  81. """The list of the output data node configurations."""
  82. return list(self._outputs)
  83. @property
  84. def outputs(self) -> List[DataNodeConfig]:
  85. """The list of the output data node configurations."""
  86. return list(self._outputs)
  87. @property
  88. def skippable(self) -> bool:
  89. """Indicates if the task can be skipped if no change has been made on inputs."""
  90. return _tpl._replace_templates(self._skippable)
  91. @classmethod
  92. def default_config(cls) -> "TaskConfig":
  93. """Get the default task configuration.
  94. Returns:
  95. The default task configuration.
  96. """
  97. return TaskConfig(cls._DEFAULT_KEY, None, [], [], False)
  98. def _clean(self) -> None:
  99. self.function = None
  100. self._inputs = []
  101. self._outputs = []
  102. self._skippable = False
  103. self._properties.clear()
  104. def _to_dict(self):
  105. return {
  106. self._FUNCTION: self.function,
  107. self._INPUT_KEY: self._inputs,
  108. self._OUTPUT_KEY: self._outputs,
  109. self._IS_SKIPPABLE_KEY: self._skippable,
  110. **self._properties,
  111. }
  112. @classmethod
  113. def _from_dict(cls, as_dict: Dict[str, Any], id: str, config: Optional[_Config]):
  114. as_dict.pop(cls._ID_KEY, id)
  115. funct = as_dict.pop(cls._FUNCTION, None)
  116. dn_configs: Dict[str, DataNodeConfig] = {}
  117. if config:
  118. dn_configs = cast(Dict[str, DataNodeConfig], config._sections.get(DataNodeConfig.name))
  119. inputs: List[DataNodeConfig] = []
  120. if inputs_as_str := as_dict.pop(cls._INPUT_KEY, None):
  121. inputs = [dn_configs[dn_id] for dn_id in inputs_as_str if dn_id in dn_configs]
  122. outputs: List[DataNodeConfig] = []
  123. if outputs_as_str := as_dict.pop(cls._OUTPUT_KEY, None):
  124. outputs = [dn_configs[ds_id] for ds_id in outputs_as_str if ds_id in dn_configs]
  125. skippable = as_dict.pop(cls._IS_SKIPPABLE_KEY, False)
  126. return TaskConfig(id=id, function=funct, inputs=inputs, outputs=outputs, skippable=skippable, **as_dict)
  127. def _update(self, as_dict, default_section=None):
  128. function = as_dict.pop(self._FUNCTION, None)
  129. if function is not None and not isinstance(function, str):
  130. self.function = function
  131. self._inputs = as_dict.pop(self._INPUT_KEY, self._inputs)
  132. if self._inputs is None and default_section:
  133. self._inputs = default_section._inputs
  134. self._outputs = as_dict.pop(self._OUTPUT_KEY, self._outputs)
  135. if self._outputs is None and default_section:
  136. self._outputs = default_section._outputs
  137. self._skippable = as_dict.pop(self._IS_SKIPPABLE_KEY, self._skippable)
  138. self._properties.update(as_dict)
  139. if default_section:
  140. self._properties = {**default_section.properties, **self._properties}
  141. @staticmethod
  142. def _configure(
  143. id: str,
  144. function: Optional[Callable],
  145. input: Optional[Union[DataNodeConfig, List[DataNodeConfig]]] = None,
  146. output: Optional[Union[DataNodeConfig, List[DataNodeConfig]]] = None,
  147. skippable: bool = False,
  148. **properties,
  149. ) -> "TaskConfig":
  150. """Configure a new task configuration.
  151. Arguments:
  152. id (str): The unique identifier of this task configuration.
  153. function (Callable): The python function called by Taipy to run the task.
  154. input (Optional[Union[DataNodeConfig^, List[DataNodeConfig^]]]): The list of the
  155. function input data node configurations. This can be a unique data node
  156. configuration if there is a single input data node, or None if there are none.
  157. output (Optional[Union[DataNodeConfig^, List[DataNodeConfig^]]]): The list of the
  158. function output data node configurations. This can be a unique data node
  159. configuration if there is a single output data node, or None if there are none.
  160. skippable (bool): If True, indicates that the task can be skipped if no change has
  161. been made on inputs.<br/>
  162. The default value is False.
  163. **properties (dict[str, any]): A keyworded variable length list of additional arguments.
  164. Returns:
  165. The new task configuration.
  166. """
  167. section = TaskConfig(id, function, input, output, skippable, **properties)
  168. Config._register(section)
  169. return Config.sections[TaskConfig.name][id]
  170. @staticmethod
  171. def _set_default_configuration(
  172. function: Optional[Callable],
  173. input: Optional[Union[DataNodeConfig, List[DataNodeConfig]]] = None,
  174. output: Optional[Union[DataNodeConfig, List[DataNodeConfig]]] = None,
  175. skippable: bool = False,
  176. **properties,
  177. ) -> "TaskConfig":
  178. """Set the default values for task configurations.
  179. This function creates the *default task configuration* object,
  180. where all task configuration objects will find their default
  181. values when needed.
  182. Arguments:
  183. function (Callable): The python function called by Taipy to run the task.
  184. input (Optional[Union[DataNodeConfig^, List[DataNodeConfig^]]]): The list of the
  185. input data node configurations. This can be a unique data node
  186. configuration if there is a single input data node, or None if there are none.
  187. output (Optional[Union[DataNodeConfig^, List[DataNodeConfig^]]]): The list of the
  188. output data node configurations. This can be a unique data node
  189. configuration if there is a single output data node, or None if there are none.
  190. skippable (bool): If True, indicates that the task can be skipped if no change has
  191. been made on inputs.<br/>
  192. The default value is False.
  193. **properties (dict[str, any]): A keyworded variable length list of additional
  194. arguments.
  195. Returns:
  196. The default task configuration.
  197. """
  198. section = TaskConfig(_Config.DEFAULT_KEY, function, input, output, skippable, **properties)
  199. Config._register(section)
  200. return Config.sections[TaskConfig.name][_Config.DEFAULT_KEY]