scenario_config.py 20 KB


  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from collections import defaultdict
  12. from copy import copy
  13. from typing import Any, Callable, Dict, List, Optional, Union
  14. import networkx as nx
  15. from taipy.common.config import Config
  16. from taipy.common.config._config import _Config
  17. from taipy.common.config.common._template_handler import _TemplateHandler as _tpl
  18. from taipy.common.config.common._validate_id import _validate_id
  19. from taipy.common.config.section import Section
  20. from ..common.frequency import Frequency
  21. from .data_node_config import DataNodeConfig
  22. from .task_config import TaskConfig
  23. class ScenarioConfig(Section):
  24. """Configuration fields needed to instantiate an actual `Scenario^`."""
  25. name = "SCENARIO"
  26. _SEQUENCES_KEY = "sequences"
  27. _TASKS_KEY = "tasks"
  28. _ADDITIONAL_DATA_NODES_KEY = "additional_data_nodes"
  29. _FREQUENCY_KEY = "frequency"
  30. _COMPARATOR_KEY = "comparators"
  31. frequency: Optional[Frequency]
  32. """The frequency of the scenario's cycle. The default value is None."""
  33. comparators: Dict[str, List[Callable]]
  34. """The comparator functions used to compare scenarios.
  35. The default value is None.
  36. Each comparator function is attached to a scenario's data node configuration.
  37. The key of the dictionary parameter corresponds to the data node configuration id.
  38. The value is a list of functions that are applied to all the data nodes instantiated
  39. from the data node configuration attached to the comparator.
  40. """
  41. sequences: Dict[str, List[TaskConfig]]
  42. """Dictionary of sequence descriptions. The default value is None."""
  43. def __init__(
  44. self,
  45. id: str,
  46. tasks: Optional[Union[TaskConfig, List[TaskConfig]]] = None,
  47. additional_data_nodes: Optional[Union[DataNodeConfig, List[DataNodeConfig]]] = None,
  48. frequency: Optional[Frequency] = None,
  49. comparators: Optional[Dict[str, Union[List[Callable], Callable]]] = None,
  50. sequences: Optional[Dict[str, List[TaskConfig]]] = None,
  51. **properties,
  52. ):
  53. if tasks:
  54. self._tasks = [tasks] if isinstance(tasks, TaskConfig) else copy(tasks)
  55. else:
  56. self._tasks = []
  57. if additional_data_nodes:
  58. self._additional_data_nodes = (
  59. [additional_data_nodes]
  60. if isinstance(additional_data_nodes, DataNodeConfig)
  61. else copy(additional_data_nodes)
  62. )
  63. else:
  64. self._additional_data_nodes = []
  65. self.sequences = sequences if sequences else {}
  66. self.frequency = frequency
  67. self.comparators = defaultdict(list)
  68. if comparators:
  69. for k, v in comparators.items():
  70. if isinstance(v, list):
  71. self.comparators[_validate_id(k)].extend(v)
  72. else:
  73. self.comparators[_validate_id(k)].append(v)
  74. super().__init__(id, **properties)
  75. self.__build_datanode_configs_ranks()
  76. def __copy__(self):
  77. comp = None if self.comparators is None else self.comparators
  78. return ScenarioConfig(
  79. self.id,
  80. copy(self._tasks),
  81. copy(self._additional_data_nodes),
  82. self.frequency,
  83. copy(comp),
  84. copy(self.sequences),
  85. **copy(self._properties),
  86. )
  87. def __getattr__(self, item: str) -> Optional[Any]:
  88. return _tpl._replace_templates(self._properties.get(item))
  89. @property
  90. def task_configs(self) -> List[TaskConfig]:
  91. """List of task configurations used by this scenario configuration."""
  92. return self._tasks
  93. @property
  94. def tasks(self) -> List[TaskConfig]:
  95. """List of task configurations used by this scenario configuration."""
  96. return self._tasks
  97. @property
  98. def additional_data_node_configs(self) -> List[DataNodeConfig]:
  99. """List of additional data nodes used by this scenario configuration."""
  100. return self._additional_data_nodes
  101. @property
  102. def additional_data_nodes(self) -> List[DataNodeConfig]:
  103. """List of additional data nodes used by this scenario configuration."""
  104. return self._additional_data_nodes
  105. @property
  106. def data_node_configs(self) -> List[DataNodeConfig]:
  107. """List of all data nodes used by this scenario configuration."""
  108. return self.__get_all_unique_data_nodes()
  109. @property
  110. def data_nodes(self) -> List[DataNodeConfig]:
  111. """List of all data nodes used by this scenario configuration."""
  112. return self.__get_all_unique_data_nodes()
  113. def add_comparator(self, dn_config_id: str, comparator: Callable) -> None:
  114. """Add a comparator to the scenario configuration.
  115. Arguments:
  116. dn_config_id (str): The data node configuration id to which the comparator
  117. will be applied.
  118. comparator (Callable): The comparator function to be added.
  119. """
  120. self.comparators[dn_config_id].append(comparator)
  121. def delete_comparator(self, dn_config_id: str) -> None:
  122. """Delete a comparator from the scenario configuration."""
  123. if dn_config_id in self.comparators:
  124. del self.comparators[dn_config_id]
  125. def add_sequences(self, sequences: Dict[str, List[TaskConfig]]) -> None:
  126. """Add sequence descriptions to the scenario configuration.
  127. When a `Scenario^` is instantiated from this configuration, the
  128. sequence descriptions are used to add new sequences to the scenario.
  129. Arguments:
  130. sequences (Dict[str, List[TaskConfig]]): Dictionary of sequence descriptions.
  131. """
  132. self.sequences.update(sequences)
  133. def remove_sequences(self, sequence_names: Union[str, List[str]]) -> None:
  134. """Remove sequence descriptions from the scenario configuration.
  135. Arguments:
  136. sequence_names (Union[str, List[str]]): The name of the sequence or a list
  137. of sequence names.
  138. """
  139. if isinstance(sequence_names, List):
  140. for sequence_name in sequence_names:
  141. self.sequences.pop(sequence_name)
  142. else:
  143. self.sequences.pop(sequence_names)
  144. @classmethod
  145. def default_config(cls) -> "ScenarioConfig":
  146. """Get a scenario configuration with all the default values.
  147. Returns:
  148. A scenario configuration with all the default values.
  149. """
  150. return ScenarioConfig(cls._DEFAULT_KEY, [], [], None, {})
  151. def draw(self, file_path: Optional[str] = None) -> None:
  152. """
  153. Export the scenario configuration graph as a PNG file.
  154. This function uses the `matplotlib` library to draw the scenario configuration graph.
  155. `matplotlib` must be installed independently of `taipy` as it is not a dependency.
  156. If `matplotlib` is not installed, the function will log an error message, and do nothing.
  157. Arguments:
  158. file_path (Optional[str]): The path to save the PNG file.
  159. If not provided, the file will be saved with the scenario configuration id.
  160. """
  161. from importlib import util
  162. from taipy.common.logger._taipy_logger import _TaipyLogger
  163. logger = _TaipyLogger._get_logger()
  164. if not util.find_spec("matplotlib"):
  165. logger.error("Cannot draw the scenario configuration as `matplotlib` is not installed.")
  166. return
  167. import matplotlib.pyplot as plt
  168. from taipy.core._entity._dag import _DAG
  169. graph = self.__build_nx_dag()
  170. positioned_nodes = _DAG(graph).nodes.values()
  171. pos = {node.entity: (node.x, node.y) for node in positioned_nodes}
  172. labls = {node.entity: node.entity.id for node in positioned_nodes}
  173. # Draw the graph
  174. plt.figure(figsize=(10, 10))
  175. nx.draw_networkx_nodes(graph, pos,
  176. nodelist=[node for node in graph.nodes if isinstance(node, DataNodeConfig)],
  177. node_color="skyblue",
  178. node_shape="s",
  179. node_size=2000)
  180. nx.draw_networkx_nodes(graph, pos,
  181. nodelist=[node for node in graph.nodes if isinstance(node, TaskConfig)],
  182. node_color="orange",
  183. node_shape="D",
  184. node_size=2000)
  185. nx.draw_networkx_labels(graph, pos, labels=labls)
  186. nx.draw_networkx_edges(graph, pos, node_size=2000, edge_color="black", arrowstyle="->", arrowsize=25)
  187. # Save the graph as a PNG file
  188. path = file_path or f"{self.id}.png"
  189. plt.savefig(path)
  190. plt.close() # Close the plot to avoid display
  191. logger.info(f"The graph image of the scenario configuration `{self.id}` is exported: {path}")
  192. def _clean(self):
  193. self._tasks = []
  194. self._additional_data_nodes = []
  195. self.frequency = None
  196. self.comparators = {}
  197. self.sequences = {}
  198. self._properties = {}
  199. def _to_dict(self) -> Dict[str, Any]:
  200. return {
  201. self._COMPARATOR_KEY: self.comparators,
  202. self._TASKS_KEY: self._tasks,
  203. self._ADDITIONAL_DATA_NODES_KEY: self._additional_data_nodes,
  204. self._FREQUENCY_KEY: self.frequency,
  205. self._SEQUENCES_KEY: self.sequences,
  206. **self._properties,
  207. }
  208. @classmethod
  209. def _from_dict(cls, as_dict: Dict[str, Any], id: str,
  210. config: Optional[_Config] = None) -> "ScenarioConfig": # type: ignore
  211. as_dict.pop(cls._ID_KEY, id)
  212. tasks = cls.__get_task_configs(as_dict.pop(cls._TASKS_KEY, []), config)
  213. additional_data_node_ids = as_dict.pop(cls._ADDITIONAL_DATA_NODES_KEY, [])
  214. additional_data_nodes = cls.__get_additional_data_node_configs(additional_data_node_ids, config)
  215. frequency = as_dict.pop(cls._FREQUENCY_KEY, None)
  216. comparators = as_dict.pop(cls._COMPARATOR_KEY, {})
  217. sequences = as_dict.pop(cls._SEQUENCES_KEY, {})
  218. for sequence_name, sequence_tasks in sequences.items():
  219. sequences[sequence_name] = cls.__get_task_configs(sequence_tasks, config)
  220. return ScenarioConfig(
  221. id=id,
  222. tasks=tasks,
  223. additional_data_nodes=additional_data_nodes,
  224. frequency=frequency,
  225. comparators=comparators,
  226. sequences=sequences,
  227. **as_dict,
  228. )
  229. def _update(self, as_dict: Dict[str, Any], default_section=None):
  230. self._tasks = as_dict.pop(self._TASKS_KEY, self._tasks)
  231. if self._tasks is None and default_section:
  232. self._tasks = default_section._tasks
  233. self._additional_data_nodes = as_dict.pop(self._ADDITIONAL_DATA_NODES_KEY, self._additional_data_nodes)
  234. if self._additional_data_nodes is None and default_section:
  235. self._additional_data_nodes = default_section._additional_data_nodes
  236. self.frequency = as_dict.pop(self._FREQUENCY_KEY, self.frequency)
  237. if self.frequency is None and default_section:
  238. self.frequency = default_section.frequency
  239. self.comparators = as_dict.pop(self._COMPARATOR_KEY, self.comparators)
  240. if self.comparators is None and default_section:
  241. self.comparators = default_section.comparators
  242. self.sequences = as_dict.pop(self._SEQUENCES_KEY, self.sequences)
  243. if self.sequences is None and default_section:
  244. self.sequences = default_section.sequences
  245. self._properties.update(as_dict)
  246. if default_section:
  247. self._properties = {**default_section.properties, **self._properties}
  248. @staticmethod
  249. def _types_to_register() -> List[type]:
  250. return [Frequency]
  251. @staticmethod
  252. def _configure(
  253. id: str,
  254. task_configs: Optional[List[TaskConfig]] = None,
  255. additional_data_node_configs: Optional[List[DataNodeConfig]] = None,
  256. frequency: Optional[Frequency] = None,
  257. comparators: Optional[Dict[str, Union[List[Callable], Callable]]] = None,
  258. sequences: Optional[Dict[str, List[TaskConfig]]] = None,
  259. **properties,
  260. ) -> "ScenarioConfig":
  261. """Configure a new scenario configuration.
  262. Arguments:
  263. id (str): The unique identifier of the new scenario configuration.
  264. task_configs (Optional[List[TaskConfig^]]): The list of task configurations used by this
  265. scenario configuration. The default value is None.
  266. additional_data_node_configs (Optional[List[DataNodeConfig^]]): The list of additional data nodes
  267. related to this scenario configuration. The default value is None.
  268. frequency (Optional[Frequency^]): The scenario frequency.<br/>
  269. It corresponds to the recurrence of the scenarios instantiated from this
  270. configuration. Based on this frequency each scenario will be attached to the
  271. relevant cycle.
  272. comparators (Optional[Dict[str, Union[List[Callable], Callable]]]): The list of
  273. functions used to compare scenarios. A comparator function is attached to a
  274. scenario's data node configuration. The key of the dictionary parameter
  275. corresponds to the data node configuration id. During the scenarios'
  276. comparison, each comparator is applied to all the data nodes instantiated from
  277. the data node configuration attached to the comparator. See
  278. `(taipy.)compare_scenarios()^` more details.
  279. sequences (Optional[Dict[str, List[TaskConfig]]]): Dictionary of sequence descriptions.
  280. The default value is None.
  281. **properties (dict[str, any]): A keyworded variable length list of additional arguments.
  282. Returns:
  283. The new scenario configuration.
  284. """
  285. section = ScenarioConfig(
  286. id,
  287. task_configs,
  288. additional_data_node_configs,
  289. frequency=frequency,
  290. comparators=comparators,
  291. sequences=sequences,
  292. **properties,
  293. )
  294. Config._register(section)
  295. return Config.sections[ScenarioConfig.name][id]
  296. @staticmethod
  297. def _set_default_configuration(
  298. task_configs: Optional[List[TaskConfig]] = None,
  299. additional_data_node_configs: List[DataNodeConfig] = None,
  300. frequency: Optional[Frequency] = None,
  301. comparators: Optional[Dict[str, Union[List[Callable], Callable]]] = None,
  302. sequences: Optional[Dict[str, List[TaskConfig]]] = None,
  303. **properties,
  304. ) -> "ScenarioConfig":
  305. """Set the default values for scenario configurations.
  306. This function creates the *default scenario configuration* object,
  307. where all scenario configuration objects will find their default
  308. values when needed.
  309. Arguments:
  310. task_configs (Optional[List[TaskConfig^]]): The list of task configurations used by this
  311. scenario configuration.
  312. additional_data_node_configs (Optional[List[DataNodeConfig^]]): The list of additional data nodes
  313. related to this scenario configuration.
  314. frequency (Optional[Frequency^]): The scenario frequency.
  315. It corresponds to the recurrence of the scenarios instantiated from this
  316. configuration. Based on this frequency each scenario will be attached to
  317. the relevant cycle.
  318. comparators (Optional[Dict[str, Union[List[Callable], Callable]]]): The list of
  319. functions used to compare scenarios. A comparator function is attached to a
  320. scenario's data node configuration. The key of the dictionary parameter
  321. corresponds to the data node configuration id. During the scenarios'
  322. comparison, each comparator is applied to all the data nodes instantiated from
  323. the data node configuration attached to the comparator. See
  324. `taipy.compare_scenarios()^` more details.
  325. sequences (Optional[Dict[str, List[TaskConfig]]]): Dictionary of sequences. The default value is None.
  326. **properties (dict[str, any]): A keyworded variable length list of additional arguments.
  327. Returns:
  328. The new default scenario configuration.
  329. """
  330. section = ScenarioConfig(
  331. _Config.DEFAULT_KEY,
  332. task_configs,
  333. additional_data_node_configs,
  334. frequency=frequency,
  335. comparators=comparators,
  336. sequences=sequences,
  337. **properties,
  338. )
  339. Config._register(section)
  340. return Config.sections[ScenarioConfig.name][_Config.DEFAULT_KEY]
  341. def __get_all_unique_data_nodes(self) -> List[DataNodeConfig]:
  342. data_node_configs = set(self._additional_data_nodes)
  343. for task in self._tasks:
  344. data_node_configs.update(task.inputs)
  345. data_node_configs.update(task.outputs)
  346. return list(data_node_configs)
  347. @staticmethod
  348. def __get_task_configs(task_config_ids: List[str], config: Optional[_Config]):
  349. task_configs = set()
  350. if config:
  351. if task_config_section := config._sections.get(TaskConfig.name):
  352. for task_config_id in task_config_ids:
  353. if task_config := task_config_section.get(task_config_id, None):
  354. task_configs.add(task_config)
  355. return list(task_configs)
  356. @staticmethod
  357. def __get_additional_data_node_configs(additional_data_node_ids: List[str], config: Optional[_Config]):
  358. additional_data_node_configs = set()
  359. if config:
  360. if data_node_config_section := config._sections.get(DataNodeConfig.name):
  361. for additional_data_node_id in additional_data_node_ids:
  362. if additional_data_node_config := data_node_config_section.get(additional_data_node_id):
  363. additional_data_node_configs.add(additional_data_node_config)
  364. return list(additional_data_node_configs)
  365. def __build_nx_dag(self) -> nx.DiGraph:
  366. g = nx.DiGraph()
  367. for task in set(self.tasks):
  368. if has_input := task.inputs:
  369. for predecessor in task.inputs:
  370. g.add_edges_from([(predecessor, task)])
  371. if has_output := task.outputs:
  372. for successor in task.outputs:
  373. g.add_edges_from([(task, successor)])
  374. if not has_input and not has_output:
  375. g.add_node(task)
  376. return g
  377. def __build_datanode_configs_ranks(self):
  378. # build the DAG
  379. dag = self.__build_nx_dag()
  380. # Remove tasks with no input
  381. to_remove = [t for t, degree in dict(dag.in_degree).items() if degree == 0 and isinstance(t, TaskConfig)]
  382. dag.remove_nodes_from(to_remove)
  383. # get data nodes in the dag
  384. dn_cfgs = [nodes for nodes in nx.topological_generations(dag) if (DataNodeConfig in (type(n) for n in nodes))]
  385. # assign ranks to data nodes configs starting from 1
  386. rank = 1
  387. for same_rank_datanode_cfgs in dn_cfgs:
  388. for dn_cfg in same_rank_datanode_cfgs:
  389. dn_cfg._ranks[self.id] = rank
  390. rank += 1
  391. # additional data nodes (not in the dag) have a rank of 0
  392. for add_dn_cfg in self._additional_data_nodes:
  393. add_dn_cfg._ranks[self.id] = 0