Browse Source

Enterprise 666 prepare migration feature (#2605)

* Expose scope in task configuration

* scope has moved

* scope has moved

* fix linter

* fix linter

* Apply suggestions
Jean-Robin 1 week ago
parent
commit
6c5fd6ee9e

+ 11 - 15
taipy/core/config/task_config.py

@@ -17,27 +17,13 @@ from taipy.common.config._config import _Config
 from taipy.common.config.common._template_handler import _TemplateHandler as _tpl
 from taipy.common.config.section import Section
 
+from ..common.scope import Scope
 from .data_node_config import DataNodeConfig
 
 
 class TaskConfig(Section):
     """Configuration fields needed to instantiate an actual `Task^`."""
 
-    # Attributes:
-    #     inputs (Union[DataNodeConfig^, List[DataNodeConfig^]]): The optional list of
-    #         `DataNodeConfig^` inputs.<br/>
-    #         The default value is [].
-    #     outputs (Union[DataNodeConfig^, List[DataNodeConfig^]]): The optional list of
-    #         `DataNodeConfig^` outputs.<br/>
-    #         The default value is [].
-    #     skippable (bool): If True, indicates that the task can be skipped if no change has
-    #         been made on inputs.<br/>
-    #         The default value is False.
-    #     function (Callable): User function taking as inputs some parameters compatible with the
-    #         exposed types (*exposed_type* field) of the input data nodes and returning results
-    #         compatible with the exposed types (*exposed_type* field) of the outputs list.<br/>
-    #         The default value is None.
-
     name = "TASK"
 
     _INPUT_KEY = "inputs"
@@ -102,6 +88,16 @@ class TaskConfig(Section):
         """The list of the output data node configurations."""
         return list(self._outputs)
 
+    @property
+    def scope(self) -> Scope:
+        """The lowest scope of the task's data nodes.
+
+        The lowest scope of input and output data node configurations or GLOBAL if there is
+        either no input or no output configuration.
+        """
+        data_node_cfgs = list(self.inputs) + list(self.outputs)
+        return Scope(min(dn.scope for dn in data_node_cfgs)) if len(data_node_cfgs) != 0 else Scope.GLOBAL
+
     @property
     def skippable(self) -> bool:
         """Indicates if the task can be skipped if no change has been made on inputs."""

+ 24 - 8
taipy/core/scenario/scenario.py

@@ -714,38 +714,54 @@ class Scenario(_Entity, Submittable, _Labeled):
     def _get_set_of_tasks(self) -> Set[Task]:
         return set(self.tasks.values())
 
-    def __get_data_nodes(self) -> Dict[str, DataNode]:
-        data_nodes_dict = self.__get_additional_data_nodes()
-        for _, task in self.__get_tasks().items():
+    def _get_existing_data_nodes(self, additional: bool = False) -> Dict[str, DataNode]:
+        if additional:
+            return self.__get_additional_data_nodes(raise_not_existing=False)
+        return self.__get_data_nodes(False)
+
+    def __get_data_nodes(self, raise_not_existing: bool = True) -> Dict[str, DataNode]:
+        data_nodes_dict = self.__get_additional_data_nodes(raise_not_existing)
+        for _, task in self.__get_tasks(raise_not_existing).items():
             data_nodes_dict.update(task.data_nodes)
         return data_nodes_dict
 
-    def __get_additional_data_nodes(self):
+    def __get_additional_data_nodes(self, raise_not_existing: bool=True) -> Dict[str, DataNode]:
         from ..data._data_manager_factory import _DataManagerFactory
 
         additional_data_nodes = {}
+        non_existing_dns: List = []
         data_manager = _DataManagerFactory._build_manager()
 
         for dn_or_id in self._additional_data_nodes:
             dn = data_manager._get(dn_or_id, dn_or_id)
 
             if not isinstance(dn, DataNode):
-                raise NonExistingDataNode(dn_or_id)
+                if raise_not_existing:
+                    raise NonExistingDataNode(dn_or_id)
+                non_existing_dns.append(dn_or_id)
+                continue
             additional_data_nodes[dn.config_id] = dn
+        for dn_id in non_existing_dns:
+            self._additional_data_nodes.discard(dn_id)  # type: ignore[arg-type]
         return additional_data_nodes
 
-    def __get_tasks(self) -> Dict[str, Task]:
+    def __get_tasks(self, raise_not_existing: bool = True) -> Dict[str, Task]:
         from ..task._task_manager_factory import _TaskManagerFactory
 
         _tasks = {}
         task_manager = _TaskManagerFactory._build_manager()
-
+        non_existing_tasks: List = []
         for task_or_id in self._tasks:
             t = task_manager._get(task_or_id, task_or_id)
 
             if not isinstance(t, Task):
-                raise NonExistingTask(task_or_id)
+                if raise_not_existing:
+                    raise NonExistingTask(task_or_id)
+                non_existing_tasks.append(task_or_id)
+                continue
             _tasks[t.config_id] = t
+        for t_id in non_existing_tasks:
+            self._tasks.discard(t_id)  # type: ignore[arg-type]
         return _tasks
 
     @staticmethod

+ 23 - 7
tests/core/config/test_task_config.py

@@ -12,6 +12,7 @@
 import os
 from unittest import mock
 
+from taipy import Scope
 from taipy.common.config import Config
 from taipy.core.config import DataNodeConfig
 from tests.core.utils.named_temporary_file import NamedTemporaryFile
@@ -95,15 +96,30 @@ def test_data_node_instance_when_configure_task_by_overriding_toml():
 
 def test_task_config_creation():
     input_config = Config.configure_data_node("input")
-    output_config = Config.configure_data_node("output")
-    task_config = Config.configure_task("tasks1", print, input_config, output_config)
+    output_config = Config.configure_data_node("output", scope=Scope.GLOBAL)
+    other_config = Config.configure_data_node("other", scope=Scope.CYCLE)
 
-    assert not task_config.skippable
-    assert list(Config.tasks) == ["default", task_config.id]
+    task_config = Config.configure_task("task1", print, input_config, output_config)
 
-    task2 = Config.configure_task("tasks2", print, input_config, output_config, skippable=True)
-    assert task2.skippable
-    assert list(Config.tasks) == ["default", task_config.id, task2.id]
+    assert list(Config.tasks) == ["default", task_config.id]
+    assert not task_config.skippable
+    assert task_config.id == "task1"
+    assert task_config.function == print
+    assert task_config.input_configs == [input_config]
+    assert task_config.output_configs == [output_config]
+    assert task_config.scope == Scope.SCENARIO
+    assert task_config.properties == {}
+
+
+    task_config_2 = Config.configure_task("task2", print, other_config, output_config, skippable=True)
+    assert list(Config.tasks) == ["default", task_config.id, task_config_2.id]
+    assert task_config_2.skippable
+    assert task_config_2.id == "task2"
+    assert task_config_2.function == print
+    assert task_config_2.input_configs == [other_config]
+    assert task_config_2.output_configs == [output_config]
+    assert task_config_2.scope == Scope.CYCLE
+    assert task_config_2.properties == {}
 
 
 def test_task_count():

+ 2 - 2
tools/release/common.py

@@ -65,7 +65,7 @@ class Version:
     def from_string(cls, version: str):
         """Creates a Version from a string.
 
-        Parameters:
+        Arguments:
             version: a version name as a string.<br/>
               The format should be "<major>.<minor>[.<patch>[.<extension>]] where
 
@@ -138,7 +138,7 @@ class Version:
             package-1.2.3.extA is NOT compatible with any sub-package-1.2.*.extB if extA != extB,
                independently of a potential extension index.
 
-        Parameters:
+        Arguments:
             version: the version to check compatibility against.
 
         Returns: