Sfoglia il codice sorgente

#enterprise-666 prepare migration feature into release/4.1 (#2604)

* Change version to 4.1.0.dev0

* Make core event registration able to support multiple topics (#2505)

* Make core event registration able to support multiple topics

Co-authored-by: Đỗ Trường Giang <do.giang@avaiga.com>

(cherry picked from commit c0ba439faba937ac4f16b09118ff02646ecf2838)

* Implement EventConsumer API

* Implement UTs for EventConsumer

* Delete wrong push

* Backport simplified build process (#2575)

* Backport simplified build process

* Change version to 4.1.0

* Update __init__.py

* Update __init__.py

* Make Ruff happy

* Feedback from Eric and Florian: Systematically pass the gui as param

* fix: limit pipenv version to <2025.0.0 (#2596)

(cherry picked from commit 068f6bbe57928d0f293a750fead5633a1ae51e54)

* Expose scope in task configuration

* fix linter

* fix linter

* Apply suggestions from code review

Co-authored-by: Đỗ Trường Giang <do.giang@avaiga.com>

---------

Co-authored-by: Đỗ Trường Giang <do.giang@avaiga.com>
Jean-Robin 1 settimana fa
parent
commit
afe72633e0

+ 11 - 15
taipy/core/config/task_config.py

@@ -15,6 +15,7 @@ from typing import Any, Callable, Dict, List, Optional, Union, cast
 from taipy.common.config import Config
 from taipy.common.config._config import _Config
 from taipy.common.config.common._template_handler import _TemplateHandler as _tpl
+from taipy.common.config.common.scope import Scope
 from taipy.common.config.section import Section
 
 from .data_node_config import DataNodeConfig
@@ -23,21 +24,6 @@ from .data_node_config import DataNodeConfig
 class TaskConfig(Section):
     """Configuration fields needed to instantiate an actual `Task^`."""
 
-    # Attributes:
-    #     inputs (Union[DataNodeConfig^, List[DataNodeConfig^]]): The optional list of
-    #         `DataNodeConfig^` inputs.<br/>
-    #         The default value is [].
-    #     outputs (Union[DataNodeConfig^, List[DataNodeConfig^]]): The optional list of
-    #         `DataNodeConfig^` outputs.<br/>
-    #         The default value is [].
-    #     skippable (bool): If True, indicates that the task can be skipped if no change has
-    #         been made on inputs.<br/>
-    #         The default value is False.
-    #     function (Callable): User function taking as inputs some parameters compatible with the
-    #         exposed types (*exposed_type* field) of the input data nodes and returning results
-    #         compatible with the exposed types (*exposed_type* field) of the outputs list.<br/>
-    #         The default value is None.
-
     name = "TASK"
 
     _INPUT_KEY = "inputs"
@@ -99,6 +85,16 @@ class TaskConfig(Section):
         """The list of the output data node configurations."""
         return list(self._outputs)
 
+    @property
+    def scope(self) -> Scope:
+        """The lowest scope of the task's data nodes.
+
+        The lowest scope of input and output data node configurations or GLOBAL if there is
+        either no input or no output configuration.
+        """
+        data_node_cfgs = list(self.inputs) + list(self.outputs)
+        return Scope(min(dn.scope for dn in data_node_cfgs)) if len(data_node_cfgs) != 0 else Scope.GLOBAL
+
     @property
     def skippable(self) -> bool:
         """Indicates if the task can be skipped if no change has been made on inputs."""

+ 24 - 8
taipy/core/scenario/scenario.py

@@ -638,38 +638,54 @@ class Scenario(_Entity, Submittable, _Labeled):
     def _get_set_of_tasks(self) -> Set[Task]:
         return set(self.tasks.values())
 
-    def __get_data_nodes(self) -> Dict[str, DataNode]:
-        data_nodes_dict = self.__get_additional_data_nodes()
-        for _, task in self.__get_tasks().items():
+    def _get_existing_data_nodes(self, additional: bool = False) -> Dict[str, DataNode]:
+        if additional:
+            return self.__get_additional_data_nodes(raise_not_existing=False)
+        return self.__get_data_nodes(False)
+
+    def __get_data_nodes(self, raise_not_existing: bool = True) -> Dict[str, DataNode]:
+        data_nodes_dict = self.__get_additional_data_nodes(raise_not_existing)
+        for _, task in self.__get_tasks(raise_not_existing).items():
             data_nodes_dict.update(task.data_nodes)
         return data_nodes_dict
 
-    def __get_additional_data_nodes(self):
+    def __get_additional_data_nodes(self, raise_not_existing: bool=True) -> Dict[str, DataNode]:
         from ..data._data_manager_factory import _DataManagerFactory
 
         additional_data_nodes = {}
+        non_existing_dns: List = []
         data_manager = _DataManagerFactory._build_manager()
 
         for dn_or_id in self._additional_data_nodes:
             dn = data_manager._get(dn_or_id, dn_or_id)
 
             if not isinstance(dn, DataNode):
-                raise NonExistingDataNode(dn_or_id)
+                if raise_not_existing:
+                    raise NonExistingDataNode(dn_or_id)
+                non_existing_dns.append(dn_or_id)
+                continue
             additional_data_nodes[dn.config_id] = dn
+        for dn_id in non_existing_dns:
+            self._additional_data_nodes.discard(dn_id)  # type: ignore[arg-type]
         return additional_data_nodes
 
-    def __get_tasks(self) -> Dict[str, Task]:
+    def __get_tasks(self, raise_not_existing: bool = True) -> Dict[str, Task]:
         from ..task._task_manager_factory import _TaskManagerFactory
 
         _tasks = {}
         task_manager = _TaskManagerFactory._build_manager()
-
+        non_existing_tasks: List = []
         for task_or_id in self._tasks:
             t = task_manager._get(task_or_id, task_or_id)
 
             if not isinstance(t, Task):
-                raise NonExistingTask(task_or_id)
+                if raise_not_existing:
+                    raise NonExistingTask(task_or_id)
+                non_existing_tasks.append(task_or_id)
+                continue
             _tasks[t.config_id] = t
+        for t_id in non_existing_tasks:
+            self._tasks.discard(t_id)  # type: ignore[arg-type]
         return _tasks
 
     @staticmethod

+ 23 - 7
tests/core/config/test_task_config.py

@@ -12,6 +12,7 @@
 import os
 from unittest import mock
 
+from taipy import Scope
 from taipy.common.config import Config
 from taipy.core.config import DataNodeConfig
 from tests.core.utils.named_temporary_file import NamedTemporaryFile
@@ -95,15 +96,30 @@ def test_data_node_instance_when_configure_task_by_overriding_toml():
 
 def test_task_config_creation():
     input_config = Config.configure_data_node("input")
-    output_config = Config.configure_data_node("output")
-    task_config = Config.configure_task("tasks1", print, input_config, output_config)
+    output_config = Config.configure_data_node("output", scope=Scope.GLOBAL)
+    other_config = Config.configure_data_node("other", scope=Scope.CYCLE)
 
-    assert not task_config.skippable
-    assert list(Config.tasks) == ["default", task_config.id]
+    task_config = Config.configure_task("task1", print, input_config, output_config)
 
-    task2 = Config.configure_task("tasks2", print, input_config, output_config, skippable=True)
-    assert task2.skippable
-    assert list(Config.tasks) == ["default", task_config.id, task2.id]
+    assert list(Config.tasks) == ["default", task_config.id]
+    assert not task_config.skippable
+    assert task_config.id == "task1"
+    assert task_config.function == print
+    assert task_config.input_configs == [input_config]
+    assert task_config.output_configs == [output_config]
+    assert task_config.scope == Scope.SCENARIO
+    assert task_config.properties == {}
+
+
+    task_config_2 = Config.configure_task("task2", print, other_config, output_config, skippable=True)
+    assert list(Config.tasks) == ["default", task_config.id, task_config_2.id]
+    assert task_config_2.skippable
+    assert task_config_2.id == "task2"
+    assert task_config_2.function == print
+    assert task_config_2.input_configs == [other_config]
+    assert task_config_2.output_configs == [output_config]
+    assert task_config_2.scope == Scope.CYCLE
+    assert task_config_2.properties == {}
 
 
 def test_task_count():