1
0
Эх сурвалжийг харах

feature/#397 scenario duplication (#2373)

* draft for scenario duplication

* added cloning data files

* added tests for copying data files

* added tests for cloning entities

* fixed prevent replacing current in_memory entity with cloned entity

* added checking existing task and cycle

* added tests for cloning with same or different cycle

* fixed failing tests

* added remove Taipy clone prefix when cloning multiple times and add can_duplicate functions

* clean up code

* expose duplicate and can duplicate function in taipy

* make linter happy

* add duplicating sequences

* minor refactor

* minor refactor

* Apply suggestions from code review

* Update taipy/core/data/_file_datanode_mixin.py

* Update taipy/core/data/_file_datanode_mixin.py

* Refactoring: Create Duplicator service

* minor changes

* minor cleaning

* handling properties._entity_owner

* making reasons messages consistent

* FIx duplicator attribute initialization and parent ids updates

* Move data duplication from data node to data duplicator.

* Make Ruff happy

* linter

* linter

* linter

* linter

* ruff

* Apply suggestions from code review

Co-authored-by: Đỗ Trường Giang <do.giang@avaiga.com>

* using a better exception

* optimize imports

* optimize imports

---------

Co-authored-by: Toan Quach <shiro@192.168.1.8.non-exists.ptr.local>
Co-authored-by: Toan Quach <shiro@Shiros-MacBook-Pro.local>
Co-authored-by: Jean-Robin <jeanrobin.medori@avaiga.com>
Co-authored-by: Đỗ Trường Giang <do.giang@avaiga.com>
Toan Quach 2 сар өмнө
parent
commit
2d4ec9c5d8
30 өөрчлөгдсөн 1399 нэмэгдсэн , 163 устгасан
  1. 6 2
      taipy/core/_repository/_filesystem_repository.py
  2. 2 2
      taipy/core/_version/_version_manager.py
  3. 45 0
      taipy/core/data/_data_duplicator.py
  4. 34 14
      taipy/core/data/_data_manager.py
  5. 32 12
      taipy/core/data/_file_datanode_mixin.py
  6. 29 20
      taipy/core/data/data_node.py
  7. 4 0
      taipy/core/exceptions/exceptions.py
  8. 30 18
      taipy/core/reason/reason.py
  9. 217 0
      taipy/core/scenario/_scenario_duplicator.py
  10. 36 0
      taipy/core/scenario/_scenario_manager.py
  11. 38 0
      taipy/core/scenario/scenario.py
  12. 40 8
      taipy/core/taipy.py
  13. 23 7
      taipy/core/task/_task_manager.py
  14. 11 18
      taipy/core/task/task.py
  15. 15 15
      tests/core/_entity/test_ready_to_run_property.py
  16. 2 2
      tests/core/_manager/test_manager.py
  17. 8 8
      tests/core/data/test_csv_data_node.py
  18. 20 3
      tests/core/data/test_data_manager.py
  19. 9 9
      tests/core/data/test_excel_data_node.py
  20. 49 0
      tests/core/data/test_file_datanode_mixin.py
  21. 4 4
      tests/core/data/test_json_data_node.py
  22. 8 8
      tests/core/data/test_parquet_data_node.py
  23. 4 4
      tests/core/data/test_pickle_data_node.py
  24. 1 1
      tests/core/job/test_job_manager.py
  25. 640 0
      tests/core/scenario/test_scenario_duplicator.py
  26. 44 5
      tests/core/scenario/test_scenario_manager.py
  27. 1 1
      tests/core/sequence/test_sequence_manager.py
  28. 1 1
      tests/core/submission/test_submission_manager.py
  29. 20 1
      tests/core/task/test_task_manager.py
  30. 26 0
      tests/core/test_taipy.py

+ 6 - 2
taipy/core/_repository/_filesystem_repository.py

@@ -191,10 +191,14 @@ class _FileSystemRepository(_AbstractRepository[ModelType, Entity]):
         return None
 
     def __match_file_and_get_entity(self, filepath, config_and_owner_ids, filters):
-        if match := [(c, p) for c, p in config_and_owner_ids if c.id in filepath.name]:
+        if match := [(c, p) for c, p in config_and_owner_ids if (c if isinstance(c, str) else c.id) in filepath.name]:
             for config, owner_id in match:
                 for fil in filters:
-                    fil.update({"config_id": config.id, "owner_id": owner_id})
+                    if isinstance(config, str):
+                        config_id = config
+                    else:
+                        config_id = config.id
+                    fil.update({"config_id": config_id, "owner_id": owner_id})
 
                 if data := self.__filter_by(filepath, filters):
                     return config, owner_id, self.__file_content_to_entity(data)

+ 2 - 2
taipy/core/_version/_version_manager.py

@@ -18,6 +18,7 @@ from taipy.common.config.exceptions.exceptions import InconsistentEnvVariableErr
 from taipy.common.logger._taipy_logger import _TaipyLogger
 
 from .._manager._manager import _Manager
+from .._repository._abstract_repository import _AbstractRepository
 from ..exceptions.exceptions import (
     ConfigCoreVersionMismatched,
     ConflictedConfigurationError,
@@ -27,7 +28,6 @@ from ..exceptions.exceptions import (
 )
 from ..reason import ReasonCollection
 from ._version import _Version
-from ._version_fs_repository import _VersionFSRepository
 
 
 class _VersionManager(_Manager[_Version]):
@@ -41,7 +41,7 @@ class _VersionManager(_Manager[_Version]):
 
     _DEFAULT_VERSION = _LATEST_VERSION
 
-    _repository: _VersionFSRepository
+    _repository: _AbstractRepository
 
     @classmethod
     def _get(cls, entity: Union[str, _Version], default=None) -> _Version:

+ 45 - 0
taipy/core/data/_data_duplicator.py

@@ -0,0 +1,45 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from ..data.data_node import DataNode
+from ..exceptions import WrongDataNodeType
+from ._file_datanode_mixin import _FileDataNodeMixin
+
+
+class _DataDuplicator:
+    """A service to duplicate data nodes data."""
+
+    def __init__(self, src: DataNode):
+        self.src: DataNode = src
+
+    def can_duplicate(self) -> bool:
+        """Check if the data node can be duplicated.
+
+        Returns:
+            bool: True if the data node can be duplicated, False otherwise.
+        """
+        return isinstance(self.src, _FileDataNodeMixin)
+
+    def duplicate_data(self, dest: DataNode):
+        """Duplicate the src data to the data of the destination data node.
+
+        Parameters:
+            dest (DataNode): The destination data node.
+
+        Raises:
+            NotImplementedError: If the data node type is not supported yet.
+            WrongDataNodeType: If the source and destination data nodes have different storage types.
+        """
+        if isinstance(self.src, _FileDataNodeMixin):
+            if self.src.storage_type() != dest.storage_type():
+                raise WrongDataNodeType("Source and destination data nodes must have the same storage type.")
+            self.src._duplicate_file(dest)
+        else:
+            raise NotImplementedError(f"Data node type '{self.src.storage_type()}' not supported for duplication yet.")

+ 34 - 14
taipy/core/data/_data_manager.py

@@ -16,16 +16,18 @@ from taipy.common.config import Config
 from taipy.common.config._config import _Config
 
 from .._manager._manager import _Manager
+from .._repository._abstract_repository import _AbstractRepository
 from .._version._version_mixin import _VersionMixin
 from ..common.scope import Scope
 from ..config.data_node_config import DataNodeConfig
 from ..cycle.cycle_id import CycleId
 from ..exceptions.exceptions import InvalidDataNodeType
 from ..notification import Event, EventEntityType, EventOperation, Notifier, _make_event
-from ..reason import NotGlobalScope, ReasonCollection, WrongConfigType
+from ..reason import EntityDoesNotExist, NotGlobalScope, ReasonCollection, WrongConfigType
+from ..reason.reason import DataIsNotDuplicable
 from ..scenario.scenario_id import ScenarioId
 from ..sequence.sequence_id import SequenceId
-from ._data_fs_repository import _DataFSRepository
+from ._data_duplicator import _DataDuplicator
 from ._file_datanode_mixin import _FileDataNodeMixin
 from .data_node import DataNode
 from .data_node_id import DataNodeId
@@ -35,7 +37,18 @@ class _DataManager(_Manager[DataNode], _VersionMixin):
     _DATA_NODE_CLASS_MAP = DataNode._class_map()  # type: ignore
     _ENTITY_NAME = DataNode.__name__
     _EVENT_ENTITY_TYPE = EventEntityType.DATA_NODE
-    _repository: _DataFSRepository
+    _repository: _AbstractRepository
+
+    @classmethod
+    def _get_owner_id(
+        cls, scope, cycle_id, scenario_id
+    ) -> Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]:
+        if scope == Scope.SCENARIO:
+            return scenario_id
+        elif scope == Scope.CYCLE:
+            return cycle_id
+        else:
+            return None
 
     @classmethod
     def _bulk_get_or_create(
@@ -47,20 +60,11 @@ class _DataManager(_Manager[DataNode], _VersionMixin):
         data_node_configs = [Config.data_nodes[dnc.id] for dnc in data_node_configs]
         dn_configs_and_owner_id = []
         for dn_config in data_node_configs:
-            scope = dn_config.scope
-            owner_id: Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]
-            if scope == Scope.SCENARIO:
-                owner_id = scenario_id
-            elif scope == Scope.CYCLE:
-                owner_id = cycle_id
-            else:
-                owner_id = None
+            owner_id = cls._get_owner_id(dn_config.scope, cycle_id, scenario_id)
             dn_configs_and_owner_id.append((dn_config, owner_id))
-
         data_nodes = cls._repository._get_by_configs_and_owner_ids(
             dn_configs_and_owner_id, cls._build_filters_with_version(None)
         )
-
         return {
             dn_config: data_nodes.get((dn_config, owner_id)) or cls._create_and_set(dn_config, owner_id, None)
             for dn_config, owner_id in dn_configs_and_owner_id
@@ -166,7 +170,7 @@ class _DataManager(_Manager[DataNode], _VersionMixin):
     @classmethod
     def _get_by_config_id(cls, config_id: str, version_number: Optional[str] = None) -> List[DataNode]:
         """
-        Get all datanodes by its config id.
+        Get all data nodes by its config id.
         """
         filters = cls._build_filters_with_version(version_number)
         if not filters:
@@ -174,3 +178,19 @@ class _DataManager(_Manager[DataNode], _VersionMixin):
         for fil in filters:
             fil.update({"config_id": config_id})
         return cls._repository._load_all(filters)
+
+    @classmethod
+    def _can_duplicate(cls, dn: Union[DataNodeId, DataNode]) -> ReasonCollection:
+        if isinstance(dn, DataNode):
+            dn_id = dn.id
+        else:
+            dn_id = dn
+        reason_collector = ReasonCollection()
+        if not cls._repository._exists(dn_id):
+            reason_collector._add_reason(dn_id, EntityDoesNotExist(dn_id))
+            return reason_collector
+        if not isinstance(dn, DataNode):
+            dn = cls._get(dn)
+        if not _DataDuplicator(dn).can_duplicate():
+            reason_collector._add_reason(dn_id, DataIsNotDuplicable(dn_id))
+        return reason_collector

+ 32 - 12
taipy/core/data/_file_datanode_mixin.py

@@ -42,6 +42,7 @@ class _FileDataNodeMixin:
     _PATH_KEY = "path"
     _DEFAULT_PATH_KEY = "default_path"
     _IS_GENERATED_KEY = "is_generated"
+    __TAIPY_DUPLICATE = "DUPLICATE_OF"
 
     __logger = _TaipyLogger._get_logger()
 
@@ -109,12 +110,14 @@ class _FileDataNodeMixin:
 
         return ""
 
-    def _upload(self,
-                path: str,
-                upload_checker: Optional[Callable[[str, Any], bool]] = None,
-                editor_id: Optional[str] = None,
-                comment: Optional[str] = None,
-                **kwargs: Any) -> ReasonCollection:
+    def _upload(
+        self,
+        path: str,
+        upload_checker: Optional[Callable[[str, Any], bool]] = None,
+        editor_id: Optional[str] = None,
+        comment: Optional[str] = None,
+        **kwargs: Any,
+    ) -> ReasonCollection:
         """Upload a file data to the data node.
 
         Arguments:
@@ -136,11 +139,15 @@ class _FileDataNodeMixin:
         from ._data_manager_factory import _DataManagerFactory
 
         reasons = ReasonCollection()
-        if (editor_id
-            and self.edit_in_progress # type: ignore[attr-defined]
-            and self.editor_id != editor_id # type: ignore[attr-defined]
-            and (not self.editor_expiration_date # type: ignore[attr-defined]
-                 or self.editor_expiration_date > datetime.now())):  # type: ignore[attr-defined]
+        if (
+            editor_id
+            and self.edit_in_progress  # type: ignore[attr-defined]
+            and self.editor_id != editor_id  # type: ignore[attr-defined]
+            and (
+                not self.editor_expiration_date  # type: ignore[attr-defined]
+                or self.editor_expiration_date > datetime.now()  # type: ignore[attr-defined]
+            )
+        ):
             reasons._add_reason(self.id, DataNodeEditInProgress(self.id))  # type: ignore[attr-defined]
             return reasons
 
@@ -161,7 +168,8 @@ class _FileDataNodeMixin:
                 self.__logger.error(
                     f"Error with the upload checker `{upload_checker.__name__}` "
                     f"while checking `{up_path.name}` file for upload to the data "
-                    f"node `{self.id}`:") # type: ignore[attr-defined]
+                    f"node `{self.id}`:"  # type: ignore[attr-defined]
+                )
                 self.__logger.error(f"Error: {err}")
                 can_upload = False
 
@@ -212,3 +220,15 @@ class _FileDataNodeMixin:
         if os.path.exists(old_path):
             shutil.move(old_path, new_path)
         return new_path
+
+    def _duplicate_file(self, dest: DataNode):
+        if os.path.exists(self._path):
+            folder_path, base_name = os.path.split(self._path)
+            new_path = os.path.join(folder_path, f"{dest.id}_{self.__TAIPY_DUPLICATE}_{base_name}")
+            if os.path.isdir(self._path):
+                shutil.copytree(self._path, new_path)
+            else:
+                shutil.copy(self._path, new_path)
+            normalize_path = _normalize_path(new_path)
+            dest._path = normalize_path
+            dest._properties[self._PATH_KEY] = normalize_path

+ 29 - 20
taipy/core/data/data_node.py

@@ -433,22 +433,27 @@ class DataNode(_Entity, _Labeled):
                 corresponding to this write.
         """
         from ._data_manager_factory import _DataManagerFactory
-        if (editor_id
+
+        if (
+            editor_id
             and self.edit_in_progress
             and self.editor_id != editor_id
-            and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
+            and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())
+        ):
             raise DataNodeIsBeingEdited(self.id, self.editor_id)
         self._append(data)
         self.track_edit(editor_id=editor_id, comment=comment, **kwargs)
         self.unlock_edit()
         _DataManagerFactory._build_manager()._set(self)
 
-    def write(self,
-              data,
-              job_id: Optional[JobId] = None,
-              editor_id: Optional[str] = None,
-              comment: Optional[str] = None,
-              **kwargs: Any):
+    def write(
+        self,
+        data,
+        job_id: Optional[JobId] = None,
+        editor_id: Optional[str] = None,
+        comment: Optional[str] = None,
+        **kwargs: Any,
+    ):
         """Write some data to this data node.
 
         once the data is written, the data node is unlocked and the edit is tracked.
@@ -461,10 +466,12 @@ class DataNode(_Entity, _Labeled):
             **kwargs (Any): Extra information to attach to the edit document
                 corresponding to this write.
         """
-        if (editor_id
+        if (
+            editor_id
             and self.edit_in_progress
             and self.editor_id != editor_id
-            and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
+            and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())
+        ):
             raise DataNodeIsBeingEdited(self.id, self.editor_id)
         self._write(data)
         self.track_edit(job_id=job_id, editor_id=editor_id, comment=comment, **kwargs)
@@ -473,12 +480,14 @@ class DataNode(_Entity, _Labeled):
 
         _DataManagerFactory._build_manager()._set(self)
 
-    def track_edit(self,
-                   job_id: Optional[str] = None,
-                   editor_id: Optional[str] = None,
-                   timestamp: Optional[datetime] = None,
-                   comment: Optional[str] = None,
-                   **options: Any):
+    def track_edit(
+        self,
+        job_id: Optional[str] = None,
+        editor_id: Optional[str] = None,
+        timestamp: Optional[datetime] = None,
+        comment: Optional[str] = None,
+        **options: Any,
+    ):
         """Creates and adds a new entry in the edits attribute without writing the data.
 
         Arguments:
@@ -627,15 +636,15 @@ class DataNode(_Entity, _Labeled):
             If the data node config is not part of the scenario config, 0xfffc is returned as an infinite rank.
         """
         if not scenario_config_id:
-            return 0xfffb
+            return 0xFFFB
         dn_config = Config.data_nodes.get(self._config_id, None)
         if not dn_config:
             self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` is not found.")
-            return 0xfffd
+            return 0xFFFD
         if not dn_config._ranks:
             self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` has no rank.")
-            return 0xfffe
-        return dn_config._ranks.get(scenario_config_id, 0xfffc)
+            return 0xFFFE
+        return dn_config._ranks.get(scenario_config_id, 0xFFFC)
 
     @abstractmethod
     def _read(self):

+ 4 - 0
taipy/core/exceptions/exceptions.py

@@ -43,6 +43,10 @@ class MissingRequiredProperty(Exception):
     """Raised if a required property is missing when creating a Data Node."""
 
 
+class WrongDataNodeType(Exception):
+    """Raised if a data node storage type is incorrect."""
+
+
 class InvalidDataNodeType(Exception):
     """Raised if a data node storage type does not exist."""
 

+ 30 - 18
taipy/core/reason/reason.py

@@ -56,6 +56,18 @@ class _DataNodeReasonMixin:
         return _DataManagerFactory._build_manager()._get(self.datanode_id)
 
 
+class DataIsNotDuplicable(Reason, _DataNodeReasonMixin):
+    """
+    The data node can be duplicated but not its data
+
+    Attributes:
+        datanode_id (str): The identifier of the `DataNode^`.
+    """
+
+    def __init__(self, datanode_id: str):
+        Reason.__init__(self, f"Data of data node '{datanode_id}' is not duplicable")
+        _DataNodeReasonMixin.__init__(self, datanode_id)
+
 class DataNodeEditInProgress(Reason, _DataNodeReasonMixin):
     """
     A `DataNode^` is being edited, which prevents specific actions from being performed.
@@ -65,7 +77,7 @@ class DataNodeEditInProgress(Reason, _DataNodeReasonMixin):
     """
 
     def __init__(self, datanode_id: str):
-        Reason.__init__(self, f"DataNode {datanode_id} is being edited")
+        Reason.__init__(self, f"DataNode '{datanode_id}' is being edited")
         _DataNodeReasonMixin.__init__(self, datanode_id)
 
 
@@ -78,7 +90,7 @@ class DataNodeIsNotWritten(Reason, _DataNodeReasonMixin):
     """
 
     def __init__(self, datanode_id: str):
-        Reason.__init__(self, f"DataNode {datanode_id} is not written")
+        Reason.__init__(self, f"DataNode '{datanode_id}' is not written")
         _DataNodeReasonMixin.__init__(self, datanode_id)
 
 
@@ -91,7 +103,7 @@ class EntityIsNotSubmittableEntity(Reason):
     """
 
     def __init__(self, entity_id: str):
-        Reason.__init__(self, f"Entity {entity_id} is not a submittable entity")
+        Reason.__init__(self, f"Entity '{entity_id}' is not a submittable entity")
 
 
 class WrongConfigType(Reason):
@@ -105,9 +117,9 @@ class WrongConfigType(Reason):
 
     def __init__(self, config_id: str, config_type: Optional[str]):
         if config_type:
-            reason = f'Object "{config_id}" must be a valid {config_type}'
+            reason = f"Object '{config_id}' must be a valid {config_type}"
         else:
-            reason = f'Object "{config_id}" is not a valid config to be created'
+            reason = f"Object '{config_id}' is not a valid config to be created"
 
         Reason.__init__(self, reason)
 
@@ -121,7 +133,7 @@ class NotGlobalScope(Reason):
     """
 
     def __init__(self, config_id: str):
-        Reason.__init__(self, f'Data node config "{config_id}" does not have GLOBAL scope')
+        Reason.__init__(self, f"Data node config '{config_id}' does not have GLOBAL scope")
 
 
 class UploadFileCanNotBeRead(Reason, _DataNodeReasonMixin):
@@ -136,8 +148,8 @@ class UploadFileCanNotBeRead(Reason, _DataNodeReasonMixin):
     def __init__(self, file_name: str, datanode_id: str):
         Reason.__init__(
             self,
-            f"The uploaded file {file_name} can not be read, "
-            f'therefore is not a valid data file for data node "{datanode_id}"',
+            f"The uploaded file '{file_name}' can not be read, "
+            f"therefore is not a valid data file for data node '{datanode_id}'",
         )
         _DataNodeReasonMixin.__init__(self, datanode_id)
 
@@ -152,7 +164,7 @@ class NoFileToDownload(Reason, _DataNodeReasonMixin):
 
     def __init__(self, file_path: str, datanode_id: str):
         Reason.__init__(
-            self, f"Path '{file_path}' from data node '{datanode_id}'" f" does not exist and cannot be downloaded"
+            self, f"Path '{file_path}' from data node '{datanode_id}' does not exist and cannot be downloaded"
         )
         _DataNodeReasonMixin.__init__(self, datanode_id)
 
@@ -167,7 +179,7 @@ class NotAFile(Reason, _DataNodeReasonMixin):
 
     def __init__(self, file_path: str, datanode_id: str):
         Reason.__init__(
-            self, f"Path '{file_path}' from data node '{datanode_id}'" f" is not a file and can t be downloaded"
+            self, f"Path '{file_path}' from data node '{datanode_id}' is not a file and can t be downloaded"
         )
         _DataNodeReasonMixin.__init__(self, datanode_id)
 
@@ -182,7 +194,7 @@ class InvalidUploadFile(Reason, _DataNodeReasonMixin):
     """
 
     def __init__(self, file_name: str, datanode_id: str):
-        Reason.__init__(self, f'The uploaded file {file_name} has invalid data for data node "{datanode_id}"')
+        Reason.__init__(self, f"The uploaded file '{file_name}' has invalid data for data node '{datanode_id}'")
         _DataNodeReasonMixin.__init__(self, datanode_id)
 
 
@@ -195,7 +207,7 @@ class EntityDoesNotExist(Reason, _DataNodeReasonMixin):
     """
 
     def __init__(self, entity_id: str):
-        Reason.__init__(self, f"Entity {entity_id} does not exist in the repository")
+        Reason.__init__(self, f"Entity '{entity_id}' does not exist in the repository")
 
 
 class JobIsNotFinished(Reason, _DataNodeReasonMixin):
@@ -207,7 +219,7 @@ class JobIsNotFinished(Reason, _DataNodeReasonMixin):
     """
 
     def __init__(self, job_id: str):
-        Reason.__init__(self, f"The job {job_id} is not finished yet")
+        Reason.__init__(self, f"The job '{job_id}' is not finished yet")
 
 
 class EntityIsNotAScenario(Reason, _DataNodeReasonMixin):
@@ -219,7 +231,7 @@ class EntityIsNotAScenario(Reason, _DataNodeReasonMixin):
     """
 
     def __init__(self, entity_id: str):
-        Reason.__init__(self, f"The entity {entity_id} is not a scenario")
+        Reason.__init__(self, f"The entity '{entity_id}' is not a scenario")
 
 
 class ScenarioIsThePrimaryScenario(Reason, _DataNodeReasonMixin):
@@ -232,7 +244,7 @@ class ScenarioIsThePrimaryScenario(Reason, _DataNodeReasonMixin):
     """
 
     def __init__(self, scenario_id: str, cycle: str):
-        Reason.__init__(self, f"The scenario {scenario_id} is the primary scenario of cycle {cycle}")
+        Reason.__init__(self, f"The scenario '{scenario_id}' is the primary scenario of cycle '{cycle}'")
 
 
 class ScenarioDoesNotBelongToACycle(Reason, _DataNodeReasonMixin):
@@ -244,7 +256,7 @@ class ScenarioDoesNotBelongToACycle(Reason, _DataNodeReasonMixin):
     """
 
     def __init__(self, scenario_id: str):
-        Reason.__init__(self, f"The scenario {scenario_id} does not belong to any cycle")
+        Reason.__init__(self, f"The scenario '{scenario_id}' does not belong to any cycle")
 
 
 class SubmissionIsNotFinished(Reason, _DataNodeReasonMixin):
@@ -256,7 +268,7 @@ class SubmissionIsNotFinished(Reason, _DataNodeReasonMixin):
     """
 
     def __init__(self, submission_id: str):
-        Reason.__init__(self, f"The submission {submission_id} is not finished yet")
+        Reason.__init__(self, f"The submission '{submission_id}' is not finished yet")
 
 
 class SubmissionStatusIsUndefined(Reason, _DataNodeReasonMixin):
@@ -268,4 +280,4 @@ class SubmissionStatusIsUndefined(Reason, _DataNodeReasonMixin):
     """
 
     def __init__(self, submission_id: str):
-        Reason.__init__(self, f"The status of submission {submission_id} is undefined")
+        Reason.__init__(self, f"The status of submission '{submission_id}' is undefined")

+ 217 - 0
taipy/core/scenario/_scenario_duplicator.py

@@ -0,0 +1,217 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from datetime import datetime
+from typing import Dict, Optional, Set, Union
+
+from taipy.common.config import Config
+
+from ..common._listattributes import _ListAttributes
+from ..common.scope import Scope
+from ..cycle._cycle_manager_factory import _CycleManagerFactory
+from ..data._data_duplicator import _DataDuplicator
+from ..data._data_manager_factory import _DataManagerFactory
+from ..data.data_node import DataNode
+from ..notification import EventOperation, Notifier, _make_event
+from ..sequence.sequence import Sequence
+from ..task._task_manager_factory import _TaskManagerFactory
+from ..task.task import Task
+from .scenario import Scenario
+
+
+class _ScenarioDuplicator:
+    """A service to duplicate a scenario and related entities."""
+
+    def __init__(self, scenario: Scenario, data_to_duplicate: Union[bool, Set[str]]=True):
+        self.scenario: Scenario = scenario
+        if data_to_duplicate is True:
+            self.data_to_duplicate: Set[str] = set(self.scenario.data_nodes.keys())
+        elif isinstance(data_to_duplicate, set):
+            self.data_to_duplicate = data_to_duplicate
+        else:
+            self.data_to_duplicate = set()
+
+        self.new_scenario: Scenario = None  # type: ignore
+        self.new_cycle_id: Optional[str] = None
+        self.new_tasks: Dict[str, Task] = {}
+        self.new_data_nodes: Dict[str, DataNode] = {}
+
+        from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
+        self.__scenario_manager = _ScenarioManagerFactory._build_manager()
+        self.__cycle_manager = _CycleManagerFactory._build_manager()
+        self.__task_manager = _TaskManagerFactory._build_manager()
+        self.__data_manager = _DataManagerFactory._build_manager()
+
+    def duplicate(self, new_creation_date: Optional[datetime]=None, new_name: Optional[str]=None) -> Scenario:
+        """Create a duplicated scenario with its related entities
+
+        Create a scenario with the same configuration as the original scenario, but with
+        a new creation date and name. Creation events are published for the new scenario,
+        tasks, and data nodes. The data nodes are duplicated if the `data_to_duplicate`
+        is set to True or a set of data node configuration ids. The new scenario is returned.
+
+        Arguments:
+            new_creation_date (Optional[datetime]): The creation date of the new scenario.
+                If not provided, the current date and time is used.
+            new_name (Optional[str]): The name of the new scenario. If not provided, the
+                name of the original scenario is used.
+
+        Returns:
+            The newly created scenario.
+        """
+        self.new_scenario = self.__init_new_scenario(new_creation_date or datetime.now(), new_name)
+        for dn in self.scenario.additional_data_nodes.values():
+            self.new_scenario._additional_data_nodes.add(self._duplicate_datanode(dn).id)  # type: ignore
+        for task in self.scenario.tasks.values():
+            self.new_scenario._tasks.add(self._duplicate_task(task).id)  # type: ignore
+        self._duplicate_sequences()
+        self.__scenario_manager._set(self.new_scenario)
+        Notifier.publish(_make_event(self.new_scenario, EventOperation.CREATION))
+        return self.new_scenario
+
+    def _duplicate_task(self, task: Task) -> Task:
+        if task.scope == Scope.GLOBAL:
+            # Task and children data nodes already exist. No need to duplicate.
+            self.new_tasks[task.config_id] = task
+            task._parent_ids.update([self.new_scenario.id])
+            self.__task_manager._repository._save(task) # Through the repository so we don't set data nodes
+            Notifier.publish(_make_event(task, EventOperation.UPDATE, "parent_ids", task._parent_ids))
+            return task
+        if task.scope == Scope.CYCLE and self.scenario.cycle.id == self.new_cycle_id:
+            # Task and children data nodes already exist. No need to duplicate.
+            self.new_tasks[task.config_id] = task
+            task._parent_ids.update([self.new_scenario.id])
+            self.__task_manager._repository._save(task) # Through the repository so we don't set data nodes
+            Notifier.publish(_make_event(task, EventOperation.UPDATE, "parent_ids", task._parent_ids))
+            return task
+        if task.scope == Scope.CYCLE:
+            existing_tasks = self.__task_manager._repository._get_by_configs_and_owner_ids(  # type: ignore
+                [(task.config_id, self.new_cycle_id)],
+                self.__task_manager._build_filters_with_version(None))
+            if existing_tasks:
+                # Task and children data nodes already exist. No need to duplicate.
+                existing_t = existing_tasks[(task.config_id,self.new_cycle_id)]
+                self.new_tasks[task.config_id] = existing_t
+                existing_t._parent_ids.update([self.new_scenario.id])
+                self.__task_manager._repository._save(existing_t)  # Don't set data nodes
+                Notifier.publish(_make_event(existing_t, EventOperation.UPDATE, "parent_ids", existing_t._parent_ids))
+                return existing_t
+
+        new_task = self.__init_new_task(task)
+        for input in task.input.values():
+            new_task._input[input.config_id] = self._duplicate_datanode(input, new_task)
+        for output in task.output.values():
+            new_task._output[output.config_id] = self._duplicate_datanode(output, new_task)
+        self.new_tasks[task.config_id] = new_task
+
+        self.__task_manager._set(new_task)
+        Notifier.publish(_make_event(new_task, EventOperation.CREATION))
+        return new_task
+
+    def _duplicate_datanode(self, dn: DataNode, task: Optional[Task]=None) -> DataNode:
+        if dn.config_id in self.new_data_nodes:
+            # Data node already created from another task. No need to duplicate.
+            new_dn = self.new_data_nodes[dn.config_id]
+            new_dn._parent_ids.update([task.id]) if task else new_dn._parent_ids.update([self.new_scenario.id])
+            self.__data_manager._set(new_dn)
+            Notifier.publish(_make_event(new_dn, EventOperation.UPDATE, "parent_ids", new_dn._parent_ids))
+            return new_dn
+        if dn.scope == Scope.GLOBAL:
+            # Data node already exists. No need to duplicate.
+            dn._parent_ids.update([task.id]) if task else dn._parent_ids.update([self.new_scenario.id])
+            self.__data_manager._set(dn)
+            Notifier.publish(_make_event(dn, EventOperation.UPDATE, "parent_ids", dn._parent_ids))
+            return dn
+        if dn.scope == Scope.CYCLE and self.scenario.cycle.id == self.new_cycle_id:
+            # Data node already exists. No need to duplicate.
+            dn._parent_ids.update([task.id]) if task else dn._parent_ids.update([self.new_scenario.id])
+            self.__data_manager._set(dn)
+            Notifier.publish(_make_event(dn, EventOperation.UPDATE, "parent_ids", dn._parent_ids))
+            return dn
+        if dn.scope == Scope.CYCLE:
+            existing_dns = self.__data_manager._repository._get_by_configs_and_owner_ids(  # type: ignore
+                [(dn.config_id, self.new_cycle_id)],
+                self.__data_manager._build_filters_with_version(None))
+            if existing_dns.get((dn.config_id, self.new_cycle_id)):
+                ex_dn = existing_dns[(dn.config_id, self.new_cycle_id)]
+                # A cycle data node with same config and same cycle owner already exist. No need to duplicate it.
+                ex_dn._parent_ids.update([task.id]) if task else ex_dn._parent_ids.update([self.new_scenario.id])
+                self.__data_manager._set(ex_dn)
+                Notifier.publish(_make_event(ex_dn, EventOperation.UPDATE, "parent_ids", ex_dn._parent_ids))
+                return ex_dn
+
+        new_dn = self.__init_new_datanode(dn, task)
+        if new_dn._config_id in self.data_to_duplicate:
+            duplicator = _DataDuplicator(dn)
+            if duplicator.can_duplicate():
+                duplicator.duplicate_data(new_dn)
+
+        self.new_data_nodes[dn.config_id] = new_dn
+        self.__data_manager._set(new_dn)
+        Notifier.publish(_make_event(new_dn, EventOperation.CREATION))
+        return new_dn
+
+    def _duplicate_sequences(self):
+        new_sequences = {}
+        for seq_name, seq_data in self.scenario._sequences.items():
+            new_sequence_id = Sequence._new_id(seq_name, self.new_scenario.id)
+            new_sequence = {Scenario._SEQUENCE_PROPERTIES_KEY: seq_data[Scenario._SEQUENCE_PROPERTIES_KEY],
+                            Scenario._SEQUENCE_TASKS_KEY: []}  # We do not want to duplicate the subscribers
+            for task in seq_data[Scenario._SEQUENCE_TASKS_KEY]:
+                new_task = self.new_tasks[task.config_id]
+                new_task._parent_ids.update([new_sequence_id])
+                self.__task_manager._set(new_task)
+                new_sequence[Scenario._SEQUENCE_TASKS_KEY].append(self.new_tasks[task.config_id])
+            new_sequences[seq_name] = new_sequence
+        self.new_scenario._sequences = new_sequences
+
+    def __init_new_scenario(self, new_creation_date: datetime, new_name: Optional[str]) -> Scenario:
+        self.new_scenario = self.__scenario_manager._get(self.scenario)
+        self.new_scenario.id = self.new_scenario._new_id(self.scenario.config_id)
+        self.new_scenario._creation_date = new_creation_date
+        if frequency:= Config.scenarios[self.scenario.config_id].frequency:
+            cycle = self.__cycle_manager._get_or_create(frequency, new_creation_date)
+            self.new_scenario._cycle = cycle
+            self.new_scenario._primary_scenario = len(self.__scenario_manager._get_all_by_cycle(cycle)) == 0
+            self.new_cycle_id = cycle.id
+        else:
+            self.new_scenario._primary_scenario = False
+        if hasattr(self.new_scenario._properties, "_entity_owner"):
+            self.new_scenario._properties._entity_owner = self.new_scenario
+        if new_name:
+            self.new_scenario._properties["name"] = new_name
+        self.new_scenario._subscribers = _ListAttributes(self.new_scenario, [])
+
+        self.new_scenario._tasks = set()  # To be potentially updated later
+        self.new_scenario._sequences = {}  # To be potentially updated later
+        self.new_scenario._additional_data_nodes = set()  # To be potentially updated later
+        return self.new_scenario
+
+    def __init_new_task(self, task: Task) -> Task:
+        new_task = self.__task_manager._get(task)
+        new_task.id = new_task._new_id(task.config_id)
+        new_task._owner_id = self.__task_manager._get_owner_id(task.scope, self.new_cycle_id, self.new_scenario.id)
+        new_task._parent_ids = {self.new_scenario.id}
+        if hasattr(new_task._properties, "_entity_owner"):
+            new_task._properties._entity_owner = new_task
+        new_task._input = {}  # To be potentially updated later
+        new_task._output = {}  # To be potentially updated later
+        return new_task
+
+    def __init_new_datanode(self, dn: DataNode, task: Optional[Task]=None) -> DataNode:
+        new_dn = self.__data_manager._get(dn)
+        new_dn.id = DataNode._new_id(dn._config_id)
+        new_dn._owner_id = self.new_scenario.id if dn.scope == Scope.SCENARIO else self.new_cycle_id
+        new_dn._parent_ids = {task.id} if task else {self.new_scenario.id}
+        if hasattr(new_dn._properties, "_entity_owner"):
+            new_dn._properties._entity_owner = new_dn
+        new_dn._last_edit_date = None  # To be potentially updated later
+        new_dn._edits = []  # To be potentially updated later
+        return new_dn

+ 36 - 0
taipy/core/scenario/_scenario_manager.py

@@ -51,6 +51,7 @@ from ..reason import (
 from ..submission._submission_manager_factory import _SubmissionManagerFactory
 from ..submission.submission import Submission
 from ..task._task_manager_factory import _TaskManagerFactory
+from ._scenario_duplicator import _ScenarioDuplicator
 from .scenario import Scenario
 from .scenario_id import ScenarioId
 
@@ -521,3 +522,38 @@ class _ScenarioManager(_Manager[Scenario], _VersionMixin):
         for fil in filters:
             fil.update({"config_id": config_id})
         return cls._repository._load_all(filters)
+
+    @classmethod
+    def _duplicate(
+        cls, scenario: Scenario, new_creation_date: Optional[datetime] = None, new_name: Optional[str] = None
+    ) -> Scenario:
+        """Create a duplicated scenario with its related entities.
+
+        Duplicate a scenario, publish a creation event and return the newly created
+        scenario.
+
+        Arguments:
+            scenario (Scenario): The scenario to duplicate.
+            new_creation_date (Optional[datetime]): The creation date of the new scenario.
+                If not provided, the current date and time is used.
+            new_name (Optional[str]): The name of the new scenario. If not provided, the
+                name of the original scenario is used.
+
+        Returns:
+            The newly created scenario.
+        """
+        reasons = cls._can_duplicate(scenario)
+        if not reasons:
+            raise Exception(reasons.reasons)
+        return _ScenarioDuplicator(scenario).duplicate(new_creation_date, new_name)
+
+    @classmethod
+    def _can_duplicate(cls, scenario: Union[str, Scenario]) -> ReasonCollection:
+        reason_collector = ReasonCollection()
+        if isinstance(scenario, Scenario):
+            scenario_id = scenario.id
+        else:
+            scenario_id = str(scenario)  # type: ignore
+        if not cls._repository._exists(scenario_id):
+            reason_collector._add_reason(scenario_id, EntityDoesNotExist(scenario_id))
+        return reason_collector

+ 38 - 0
taipy/core/scenario/scenario.py

@@ -414,6 +414,44 @@ class Scenario(_Entity, Submittable, _Labeled):
 
         return _ScenarioManagerFactory._build_manager()._submit(self, callbacks, force, wait, timeout, **properties)
 
+    def can_duplicate(self) -> ReasonCollection:
+        """Indicate if a scenario can be duplicated.
+
+        Arguments:
+            entity (Union[str, Scenario]): The scenario or its id to check if it can be duplicated.
+
+        Returns:
+            True if the given scenario can be duplicated. False otherwise.
+        """
+        from ._scenario_manager_factory import _ScenarioManagerFactory
+
+        return _ScenarioManagerFactory._build_manager()._can_duplicate(self)
+
+    def duplicate(
+        self,
+        new_creation_date: Optional[datetime] = None,
+        new_name: Optional[str] = None,
+    ) -> "Scenario":
+        """Duplicate the scenario and return the new one.
+
+        This method duplicates the scenario, optionally setting a new creation date and name.
+        The nested tasks and data nodes are duplicated as well. If the scenario belongs to a
+        cycle, the cycle (corresponding to the creation_date and the configuration frequency
+        attribute) is created if it does not exist yet.
+
+        Arguments:
+            new_creation_date (Optional[datetime.datetime]): The creation date of the new scenario.
+                If None, the current date and time is used.
+            new_name (Optional[str]): The displayable name of the new scenario.
+                If None, the name of the current scenario is used.
+
+        Returns:
+            Scenario: The newly duplicated scenario.
+        """
+        from ._scenario_manager_factory import _ScenarioManagerFactory
+
+        return _ScenarioManagerFactory._build_manager()._duplicate(self, new_creation_date, new_name)
+
     def set_primary(self) -> None:
         """Promote the scenario as the primary scenario of its cycle.
 

+ 40 - 8
taipy/core/taipy.py

@@ -1060,13 +1060,45 @@ def get_entities_by_config_id(
     Returns:
         The list of all entities by the config id.
     """
+    if scenarios := _ScenarioManagerFactory._build_manager()._get_by_config_id(config_id):
+        return scenarios
+    if tasks := _TaskManagerFactory._build_manager()._get_by_config_id(config_id):
+        return tasks
+    if data_nodes := _DataManagerFactory._build_manager()._get_by_config_id(config_id):
+        return data_nodes
+    return scenarios
+
+
+def can_duplicate(entity: Union[str, Scenario]) -> ReasonCollection:
+    """Indicate if a scenario can be duplicated.
+
+    Arguments:
+        entity (Union[str, Scenario]): The scenario or its id to check if it can be duplicated.
+
+    Returns:
+        True if the given scenario can be duplicated. False otherwise.
+    """
+    return _ScenarioManagerFactory._build_manager()._can_duplicate(entity)
+
 
-    entities: List = []
+def duplicate_scenario(
+    scenario: Scenario, new_creation_date: Optional[datetime] = None, new_name: Optional[str] = None
+) -> Scenario:
+    """Duplicate an existing scenario and return a new scenario.
+
+    This function duplicates the provided scenario, optionally setting a new creation date and name.
+
+    If the scenario belongs to a cycle, the cycle (corresponding to the creation_date and the configuration
+    frequency attribute) is created if it does not exist yet.
+
+    Arguments:
+        scenario (Scenario): The scenario to duplicate.
+        new_creation_date (Optional[datetime.datetime]): The creation date of the new scenario.
+            If None, the current date and time is used.
+        new_name (Optional[str]): The displayable name of the new scenario.
+
+    Returns:
+        Scenario: The newly duplicated scenario.
+    """
 
-    if entities := _ScenarioManagerFactory._build_manager()._get_by_config_id(config_id):
-        return entities
-    if entities := _TaskManagerFactory._build_manager()._get_by_config_id(config_id):
-        return entities
-    if entities := _DataManagerFactory._build_manager()._get_by_config_id(config_id):
-        return entities
-    return entities
+    return _ScenarioManagerFactory._build_manager()._duplicate(scenario, new_creation_date, new_name)

+ 23 - 7
taipy/core/task/_task_manager.py

@@ -57,6 +57,17 @@ class _TaskManager(_Manager[Task], _VersionMixin):
         cls.__save_data_nodes(task.output.values())
         super()._set(task)
 
+    @classmethod
+    def _get_owner_id(
+        cls, scope, cycle_id, scenario_id
+    ) -> Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]:
+        if scope == Scope.SCENARIO:
+            return scenario_id
+        elif scope == Scope.CYCLE:
+            return cycle_id
+        else:
+            return None
+
     @classmethod
     def _bulk_get_or_create(
         cls,
@@ -79,13 +90,7 @@ class _TaskManager(_Manager[Task], _VersionMixin):
             ]
             task_config_data_nodes = [data_nodes[dn_config] for dn_config in task_dn_configs]
             scope = min(dn.scope for dn in task_config_data_nodes) if len(task_config_data_nodes) != 0 else Scope.GLOBAL
-            owner_id: Union[Optional[SequenceId], Optional[ScenarioId], Optional[CycleId]]
-            if scope == Scope.SCENARIO:
-                owner_id = scenario_id
-            elif scope == Scope.CYCLE:
-                owner_id = cycle_id
-            else:
-                owner_id = None
+            owner_id = cls._get_owner_id(scope, cycle_id, scenario_id)
 
             tasks_configs_and_owner_id.append((task_config, owner_id))
 
@@ -226,3 +231,14 @@ class _TaskManager(_Manager[Task], _VersionMixin):
         for fil in filters:
             fil.update({"config_id": config_id})
         return cls._repository._load_all(filters)
+
+    @classmethod
+    def _can_duplicate(cls, task: Union[Task, TaskId]) -> ReasonCollection:
+        reason_collector = ReasonCollection()
+        if isinstance(task, Task):
+            task_id = task.id
+        else:
+            task_id = task
+        if not cls._repository._exists(task_id):
+            reason_collector._add_reason(task_id, EntityDoesNotExist(task_id))
+        return reason_collector

+ 11 - 18
taipy/core/task/task.py

@@ -76,22 +76,6 @@ class Task(_Entity, _Labeled):
             # Retrieve the list of all tasks
             all_tasks = tp.get_tasks()
         ```
-
-    Attributes:
-        config_id (str): The identifier of the `TaskConfig^`.
-        properties (dict[str, Any]): A dictionary of additional properties.
-        function (callable): The python function to execute. The _function_ must take as parameter the
-            data referenced by inputs data nodes, and must return the data referenced by outputs data nodes.
-        input (Union[DataNode^, List[DataNode^]]): The list of inputs.
-        output (Union[DataNode^, List[DataNode^]]): The list of outputs.
-        id (str): The unique identifier of the task.
-        owner_id (str):  The identifier of the owner (sequence_id, scenario_id, cycle_id) or None.
-        parent_ids (Optional[Set[str]]): The set of identifiers of the parent sequences.
-        version (str): The string indicates the application version of the task to instantiate. If not provided, the
-            latest version is used.
-        skippable (bool): If True, indicates that the task can be skipped if no change has been made on inputs. The
-            default value is _False_.
-
     """
 
     _ID_PREFIX = "TASK"
@@ -116,7 +100,7 @@ class Task(_Entity, _Labeled):
         skippable: bool = False,
     ) -> None:
         self._config_id = _validate_id(config_id)
-        self.id = id or TaskId(self.__ID_SEPARATOR.join([self._ID_PREFIX, self.config_id, str(uuid.uuid4())]))
+        self.id = id or self._new_id(config_id)
         self._owner_id = owner_id
         self._parent_ids = parent_ids or set()
         self._input = {dn.config_id: dn for dn in input or []}
@@ -127,6 +111,11 @@ class Task(_Entity, _Labeled):
         self._properties = _Properties(self, **properties)
         self._init_done = True
 
+    @staticmethod
+    def _new_id(config_id: str) -> TaskId:
+        """Generate a unique task identifier."""
+        return TaskId(Task.__ID_SEPARATOR.join([Task._ID_PREFIX, config_id, str(uuid.uuid4())]))
+
     def __hash__(self) -> int:
         return hash(self.id)
 
@@ -198,7 +187,11 @@ class Task(_Entity, _Labeled):
     @property  # type: ignore
     @_self_reload(_MANAGER_NAME)
     def function(self) -> Callable:
-        """The python function to execute."""
+        """The python function to execute.
+
+        The _function_ must take as parameter the data referenced by inputs data nodes,
+        and must return the data referenced by outputs data nodes.
+        """
         return self._function
 
     @function.setter  # type: ignore

+ 15 - 15
tests/core/_entity/test_ready_to_run_property.py

@@ -91,22 +91,22 @@ def test_scenario_not_submittable_if_one_input_edit_in_progress():
     dn_config_1 = Config.configure_in_memory_data_node("dn_1", 10)
     task_config = Config.configure_task("task", print, [dn_config_1], [])
     scenario_config = Config.configure_scenario("sc", [task_config])
-    scenario = scenario_manager._create(scenario_config)
-    dn_1 = scenario.dn_1
+    s = scenario_manager._create(scenario_config)
+    dn_1 = s.dn_1
     dn_1.lock_edit()
 
     assert not dn_1.is_ready_for_reading
-    assert not scenario_manager._is_submittable(scenario)
-    assert isinstance(scenario_manager._is_submittable(scenario), ReasonCollection)
+    assert not scenario_manager._is_submittable(s)
+    assert isinstance(scenario_manager._is_submittable(s), ReasonCollection)
 
-    assert scenario.id in _ReadyToRunProperty._submittable_id_datanodes
-    assert dn_1.id in _ReadyToRunProperty._submittable_id_datanodes[scenario.id]._reasons
+    assert s.id in _ReadyToRunProperty._submittable_id_datanodes
+    assert dn_1.id in _ReadyToRunProperty._submittable_id_datanodes[s.id]._reasons
     assert dn_1.id in _ReadyToRunProperty._datanode_id_submittables
-    assert scenario.id in _ReadyToRunProperty._datanode_id_submittables[dn_1.id]
-    assert _ReadyToRunProperty._submittable_id_datanodes[scenario.id]._reasons[dn_1.id] == {
+    assert s.id in _ReadyToRunProperty._datanode_id_submittables[dn_1.id]
+    assert _ReadyToRunProperty._submittable_id_datanodes[s.id]._reasons[dn_1.id] == {
         DataNodeEditInProgress(dn_1.id)
     }
-    assert _ReadyToRunProperty._submittable_id_datanodes[scenario.id].reasons == f"DataNode {dn_1.id} is being edited."
+    assert _ReadyToRunProperty._submittable_id_datanodes[s.id].reasons == f"DataNode '{dn_1.id}' is being edited."
 
 
 def test_scenario_not_submittable_for_multiple_reasons():
@@ -142,8 +142,8 @@ def test_scenario_not_submittable_for_multiple_reasons():
         DataNodeIsNotWritten(dn_2.id),
     }
     reason_str = _ReadyToRunProperty._submittable_id_datanodes[scenario.id].reasons
-    assert f"DataNode {dn_2.id} is being edited" in reason_str
-    assert f"DataNode {dn_2.id} is not written" in reason_str
+    assert f"DataNode '{dn_2.id}' is being edited" in reason_str
+    assert f"DataNode '{dn_2.id}' is not written" in reason_str
 
 
 def test_writing_input_remove_reasons():
@@ -166,8 +166,8 @@ def test_writing_input_remove_reasons():
         DataNodeIsNotWritten(dn_1.id),
     }
     reason_str = _ReadyToRunProperty._submittable_id_datanodes[scenario.id].reasons
-    assert f"DataNode {dn_1.id} is being edited" in reason_str
-    assert f"DataNode {dn_1.id} is not written" in reason_str
+    assert f"DataNode '{dn_1.id}' is being edited" in reason_str
+    assert f"DataNode '{dn_1.id}' is not written" in reason_str
 
     dn_1.write(10)
     assert scenario_manager._is_submittable(scenario)
@@ -192,8 +192,8 @@ def __assert_not_submittable_becomes_submittable_when_dn_edited(entity, manager,
         DataNodeIsNotWritten(dn.id),
     }
     reason_str = _ReadyToRunProperty._submittable_id_datanodes[entity.id].reasons
-    assert f"DataNode {dn.id} is being edited" in reason_str
-    assert f"DataNode {dn.id} is not written" in reason_str
+    assert f"DataNode '{dn.id}' is being edited" in reason_str
+    assert f"DataNode '{dn.id}' is not written" in reason_str
 
     dn.write("ANY VALUE")
     assert manager._is_submittable(entity)

+ 2 - 2
tests/core/_manager/test_manager.py

@@ -168,7 +168,7 @@ class TestManager:
 
         rc = MockManager._is_editable("some_entity")
         assert not rc
-        assert "Entity some_entity does not exist in the repository." in rc.reasons
+        assert "Entity 'some_entity' does not exist in the repository." in rc.reasons
 
     def test_is_readable(self):
         m = MockEntity("uuid", "Foo")
@@ -177,4 +177,4 @@ class TestManager:
 
         rc = MockManager._is_editable("some_entity")
         assert not rc
-        assert "Entity some_entity does not exist in the repository." in rc.reasons
+        assert "Entity 'some_entity' does not exist in the repository." in rc.reasons

+ 8 - 8
tests/core/data/test_csv_data_node.py

@@ -349,8 +349,8 @@ class TestCSVDataNode:
         reasons = dn._upload(not_exists_csv_path, upload_checker=check_data_column)
         assert bool(reasons) is False
         assert (
-            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.csv can not be read,"
-            f' therefore is not a valid data file for data node "{dn.id}"'
+            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file 'not_exists.csv' can not be read,"
+            f" therefore is not a valid data file for data node '{dn.id}'"
         )
 
         not_csv_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_csv").strpath
@@ -360,7 +360,7 @@ class TestCSVDataNode:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.not_csv has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.not_csv' has invalid data for data node '{dn.id}'"
         )
 
         wrong_format_csv_path = tmpdir_factory.mktemp("data").join("wrong_format_df.csv").strpath
@@ -370,7 +370,7 @@ class TestCSVDataNode:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.csv has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.csv' has invalid data for data node '{dn.id}'"
         )
 
         assert_frame_equal(dn.read(), old_data)  # The content of the dn should not change when upload fails
@@ -399,8 +399,8 @@ class TestCSVDataNode:
         reasons = dn._upload(not_exists_csv_path, upload_checker=check_data_is_positive)
         assert bool(reasons) is False
         assert (
-            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.csv can not be read"
-            f', therefore is not a valid data file for data node "{dn.id}"'
+            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file 'not_exists.csv' can not be read"
+            f", therefore is not a valid data file for data node '{dn.id}'"
         )
 
         not_csv_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_csv").strpath
@@ -410,7 +410,7 @@ class TestCSVDataNode:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.not_csv has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.not_csv' has invalid data for data node '{dn.id}'"
         )
 
         wrong_format_csv_path = tmpdir_factory.mktemp("data").join("wrong_format_df.csv").strpath
@@ -420,7 +420,7 @@ class TestCSVDataNode:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.csv has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.csv' has invalid data for data node '{dn.id}'"
         )
 
         np.array_equal(dn.read(), old_data)  # The content of the dn should not change when upload fails

+ 20 - 3
tests/core/data/test_data_manager.py

@@ -24,7 +24,7 @@ from taipy.core.data.data_node_id import DataNodeId
 from taipy.core.data.in_memory import InMemoryDataNode
 from taipy.core.data.pickle import PickleDataNode
 from taipy.core.exceptions.exceptions import InvalidDataNodeType, ModelNotFound
-from taipy.core.reason import NotGlobalScope, WrongConfigType
+from taipy.core.reason import EntityDoesNotExist, NotGlobalScope, WrongConfigType
 from tests.core.utils.named_temporary_file import NamedTemporaryFile
 
 
@@ -65,13 +65,13 @@ class TestDataManager:
         assert reasons._reasons[dn_config.id] == {NotGlobalScope(dn_config.id)}
         assert (
             str(list(reasons._reasons[dn_config.id])[0])
-            == f'Data node config "{dn_config.id}" does not have GLOBAL scope'
+            == f"Data node config '{dn_config.id}' does not have GLOBAL scope"
         )
 
         reasons = _DataManager._can_create(1)
         assert bool(reasons) is False
         assert reasons._reasons["1"] == {WrongConfigType("1", DataNodeConfig.__name__)}
-        assert str(list(reasons._reasons["1"])[0]) == 'Object "1" must be a valid DataNodeConfig'
+        assert str(list(reasons._reasons["1"])[0]) == "Object '1' must be a valid DataNodeConfig"
 
     def test_create_data_node_with_name_provided(self):
         dn_config = Config.configure_data_node(id="dn", foo="bar", name="acb")
@@ -731,3 +731,20 @@ class TestDataManager:
 
         assert len(_DataManager._get_by_config_id(dn_config_1.id)) == 3
         assert len(_DataManager._get_by_config_id(dn_config_2.id)) == 2
+
+    def test_can_duplicate(self):
+        dn_config = Config.configure_data_node("dn_1")
+        dn = _DataManager._create_and_set(dn_config, None, None)
+
+        reasons = _DataManager._can_duplicate(dn.id)
+        assert bool(reasons)
+        assert reasons._reasons == {}
+
+        reasons = _DataManager._can_duplicate(dn)
+        assert bool(reasons)
+        assert reasons._reasons == {}
+
+        reasons = _DataManager._can_duplicate("1")
+        assert not bool(reasons)
+        assert reasons._reasons["1"] == {EntityDoesNotExist("1")}
+        assert str(list(reasons._reasons["1"])[0]) == "Entity '1' does not exist in the repository"

+ 9 - 9
tests/core/data/test_excel_data_node.py

@@ -573,8 +573,8 @@ class TestExcelDataNode:
         reasons = dn._upload(not_exists_xlsx_path, upload_checker=check_data_column)
         assert bool(reasons) is False
         assert (
-            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.xlsx can not be read,"
-            f' therefore is not a valid data file for data node "{dn.id}"'
+            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file 'not_exists.xlsx' can not be read,"
+            f" therefore is not a valid data file for data node '{dn.id}'"
         )
 
         not_xlsx_path = tmpdir_factory.mktemp("data").join("wrong_format_df.xlsm").strpath
@@ -584,7 +584,7 @@ class TestExcelDataNode:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.xlsm has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.xlsm' has invalid data for data node '{dn.id}'"
         )
 
         wrong_format_xlsx_path = tmpdir_factory.mktemp("data").join("wrong_format_df.xlsx").strpath
@@ -594,7 +594,7 @@ class TestExcelDataNode:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.xlsx has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.xlsx' has invalid data for data node '{dn.id}'"
         )
 
         assert_frame_equal(dn.read()["Sheet1"], old_data)  # The content of the dn should not change when upload fails
@@ -623,18 +623,18 @@ class TestExcelDataNode:
         reasons = dn._upload(not_exists_xlsx_path, upload_checker=check_data_is_positive)
         assert bool(reasons) is False
         assert (
-            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.xlsx can not be read,"
-            f' therefore is not a valid data file for data node "{dn.id}"'
+            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file 'not_exists.xlsx' can not be read,"
+            f" therefore is not a valid data file for data node '{dn.id}'"
         )
 
         wrong_format_not_excel_path = tmpdir_factory.mktemp("data").join("wrong_format_df.xlsm").strpath
         pd.DataFrame(old_data).to_excel(wrong_format_not_excel_path, index=False)
-        # The upload should fail when the file is not a excel
+        # The upload should fail when the file is not an Excel
         reasons = dn._upload(wrong_format_not_excel_path, upload_checker=check_data_is_positive)
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.xlsm has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.xlsm' has invalid data for data node '{dn.id}'"
         )
 
         not_xlsx_path = tmpdir_factory.mktemp("data").join("wrong_format_df.xlsx").strpath
@@ -643,7 +643,7 @@ class TestExcelDataNode:
         reasons = dn._upload(not_xlsx_path, upload_checker=check_data_is_positive)
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.xlsx has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.xlsx' has invalid data for data node '{dn.id}'"
         )
 
         np.array_equal(dn.read()["Sheet1"], old_data)  # The content of the dn should not change when upload fails

+ 49 - 0
tests/core/data/test_file_datanode_mixin.py

@@ -0,0 +1,49 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import filecmp
+import os
+import pathlib
+
+from taipy import Scope
+from taipy.core.common._utils import _normalize_path
+from taipy.core.data._data_manager import _DataManager
+from taipy.core.data.csv import CSVDataNode
+
+
+def test_duplicate_data():
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample", "example.csv")
+    src = CSVDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "pandas"})
+    copy = CSVDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "pandas"})
+    copy_2 = CSVDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "pandas"})
+    copy_copy = CSVDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "pandas"})
+    _DataManager._set(src)
+
+    src._duplicate_file(copy)
+    _DataManager._set(copy)
+    assert _normalize_path(src.path) == _normalize_path(path)
+    assert _normalize_path(src.path) != _normalize_path(copy.path)
+    assert filecmp.cmp(path, copy.path)
+    assert src.path.count("DUPLICATE_OF") == 0
+    assert copy.path.count("DUPLICATE_OF") == 1
+
+    src._duplicate_file(copy_2)
+    _DataManager._set(copy_2)
+    assert _normalize_path(copy.path) != _normalize_path(copy_2.path)
+    assert copy_2.path.count("DUPLICATE_OF") == 1
+
+    copy._duplicate_file(copy_copy)
+    _DataManager._set(copy_copy)
+    assert copy_copy.path.count("DUPLICATE_OF") == 2
+
+    os.unlink(copy.path)
+    os.unlink(copy_2.path)
+    os.unlink(copy_copy.path)

+ 4 - 4
tests/core/data/test_json_data_node.py

@@ -460,8 +460,8 @@ class TestJSONDataNode:
         reasons = dn._upload(not_exists_json_path, upload_checker=check_data_keys)
         assert bool(reasons) is False
         assert (
-            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.json can not be read,"
-            f' therefore is not a valid data file for data node "{dn.id}"'
+            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file 'not_exists.json' can not be read,"
+            f" therefore is not a valid data file for data node '{dn.id}'"
         )
 
         not_json_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_json").strpath
@@ -472,7 +472,7 @@ class TestJSONDataNode:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.not_json has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.not_json' has invalid data for data node '{dn.id}'"
         )
 
         wrong_format_json_path = tmpdir_factory.mktemp("data").join("wrong_format_df.json").strpath
@@ -483,7 +483,7 @@ class TestJSONDataNode:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.json has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.json' has invalid data for data node '{dn.id}'"
         )
 
         assert dn.read() == old_data  # The content of the dn should not change when upload fails

+ 8 - 8
tests/core/data/test_parquet_data_node.py

@@ -320,8 +320,8 @@ class TestParquetDataNode:
         reasons = dn._upload(not_exists_parquet_path, upload_checker=check_data_column)
         assert bool(reasons) is False
         assert (
-            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.parquet can not be read,"
-            f' therefore is not a valid data file for data node "{dn.id}"'
+            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file 'not_exists.parquet' can not be read,"
+            f" therefore is not a valid data file for data node '{dn.id}'"
         )
 
         not_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_parquet").strpath
@@ -331,7 +331,7 @@ class TestParquetDataNode:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.not_parquet has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.not_parquet' has invalid data for data node '{dn.id}'"
         )
 
         wrong_format_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.parquet").strpath
@@ -343,7 +343,7 @@ class TestParquetDataNode:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.parquet has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.parquet' has invalid data for data node '{dn.id}'"
         )
 
         assert_frame_equal(dn.read(), old_data)  # The content of the dn should not change when upload fails
@@ -372,8 +372,8 @@ class TestParquetDataNode:
         reasons = dn._upload(not_exists_parquet_path, upload_checker=check_data_is_positive)
         assert bool(reasons) is False
         assert (
-            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.parquet can not be read,"
-            f' therefore is not a valid data file for data node "{dn.id}"'
+            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file 'not_exists.parquet' can not be read,"
+            f" therefore is not a valid data file for data node '{dn.id}'"
         )
 
         not_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_parquet").strpath
@@ -382,7 +382,7 @@ class TestParquetDataNode:
         reasons = dn._upload(not_parquet_path, upload_checker=check_data_is_positive)
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.not_parquet has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.not_parquet' has invalid data for data node '{dn.id}'"
         )
 
         wrong_format_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.parquet").strpath
@@ -393,7 +393,7 @@ class TestParquetDataNode:
         reasons = dn._upload(wrong_format_parquet_path, upload_checker=check_data_is_positive)
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.parquet has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.parquet' has invalid data for data node '{dn.id}'"
         )
 
         np.array_equal(dn.read(), old_data)  # The content of the dn should not change when upload fails

+ 4 - 4
tests/core/data/test_pickle_data_node.py

@@ -273,8 +273,8 @@ class TestPickleDataNodeEntity:
         reasons = dn._upload(not_exists_json_path, upload_checker=check_data_column)
         assert bool(reasons) is False
         assert (
-            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.json can not be read,"
-            f' therefore is not a valid data file for data node "{dn.id}"'
+            str(list(reasons._reasons[dn.id])[0]) == "The uploaded file 'not_exists.json' can not be read,"
+            f" therefore is not a valid data file for data node '{dn.id}'"
         )
 
         not_pickle_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_pickle").strpath
@@ -285,7 +285,7 @@ class TestPickleDataNodeEntity:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.not_pickle has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.not_pickle' has invalid data for data node '{dn.id}'"
         )
 
         wrong_format_pickle_path = tmpdir_factory.mktemp("data").join("wrong_format_df.p").strpath
@@ -296,7 +296,7 @@ class TestPickleDataNodeEntity:
         assert bool(reasons) is False
         assert (
             str(list(reasons._reasons[dn.id])[0])
-            == f'The uploaded file wrong_format_df.p has invalid data for data node "{dn.id}"'
+            == f"The uploaded file 'wrong_format_df.p' has invalid data for data node '{dn.id}'"
         )
 
         assert_frame_equal(dn.read(), old_data)  # The content of the dn should not change when upload fails

+ 1 - 1
tests/core/job/test_job_manager.py

@@ -414,7 +414,7 @@ def test_is_deletable():
 
     rc = _JobManager._is_deletable("some_job")
     assert not rc
-    assert "Entity some_job does not exist in the repository." in rc.reasons
+    assert "Entity 'some_job' does not exist in the repository." in rc.reasons
 
     assert job.is_completed()
     assert _JobManager._is_deletable(job)

+ 640 - 0
tests/core/scenario/test_scenario_duplicator.py

@@ -0,0 +1,640 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+from datetime import datetime, timedelta
+
+from taipy import Config, Frequency, Scope, Sequence
+from taipy.core.cycle._cycle_manager import _CycleManager
+from taipy.core.data._data_manager import _DataManager
+from taipy.core.job._job_manager import _JobManager
+from taipy.core.scenario._scenario_duplicator import _ScenarioDuplicator
+from taipy.core.scenario._scenario_manager import _ScenarioManager
+from taipy.core.sequence._sequence_manager import _SequenceManager
+from taipy.core.submission._submission_manager import _SubmissionManager
+from taipy.core.task._task_manager import _TaskManager
+
+
+def identity(x):
+    return x
+
+def test_constructor():
+    dn_config_1 = Config.configure_pickle_data_node("dn_1")
+    additional_dn_config_1 = Config.configure_data_node("additional_dn_1")
+    task_config_1 = Config.configure_task("task_1", print, [dn_config_1], [])
+    scenario_config_1 = Config.configure_scenario("scenario_1", [task_config_1], [additional_dn_config_1])
+    scenario = _ScenarioManager._create(scenario_config_1)
+
+    duplicator = _ScenarioDuplicator(scenario)
+    assert duplicator.scenario == scenario
+    assert duplicator.data_to_duplicate == {"dn_1", "additional_dn_1"}
+
+    duplicator = _ScenarioDuplicator(scenario, True)
+    assert duplicator.scenario == scenario
+    assert duplicator.data_to_duplicate == {"dn_1", "additional_dn_1"}
+
+    duplicator = _ScenarioDuplicator(scenario, False)
+    assert duplicator.scenario == scenario
+    assert duplicator.data_to_duplicate == set()
+
+    duplicator = _ScenarioDuplicator(scenario, {"dn_1"})
+    assert duplicator.scenario == scenario
+    assert duplicator.data_to_duplicate == {"dn_1"}
+
+    duplicator = _ScenarioDuplicator(scenario, {"additional_dn_1"})
+    assert duplicator.scenario == scenario
+    assert duplicator.data_to_duplicate == {"additional_dn_1"}
+
+
+def test_duplicate_scenario_scoped_dns_no_cycle_one_sequence():
+    dn_config_1 = Config.configure_pickle_data_node("dn_1", default_data="data1", key1="value1")
+    dn_config_2 = Config.configure_pickle_data_node("dn_2", key1="value2")
+    additional_dn_config_1 = Config.configure_data_node("additional_dn_1")
+    task_config_1 = Config.configure_task("task_1",
+                                          print,
+                                          input=[dn_config_1],
+                                          output=[dn_config_2],
+                                          skippable=True,
+                                          k="v")
+    scenario_config_1 = Config.configure_scenario("scenario_1", [task_config_1], [additional_dn_config_1])
+    creation_date = datetime.now() - timedelta(minutes=1)
+    name = "original"
+    scenario = _ScenarioManager._create(scenario_config_1, creation_date, name)
+    scenario.properties["key"] = "value"
+    scenario.add_sequence("sequence_1", [scenario.task_1], {"key_seq": "value_seq"})
+    scenario.submit()
+    assert len(_ScenarioManager._get_all()) == 1
+    assert len(_DataManager._get_all()) == 3
+    assert len(_TaskManager._get_all()) == 1
+    assert len(_SequenceManager._get_all()) == 1
+    assert len(_CycleManager._get_all()) == 0
+    assert len(_SubmissionManager._get_all()) == 1
+    assert len(_JobManager._get_all()) == 1
+
+    new_scenario = _ScenarioDuplicator(scenario, False).duplicate()
+
+    # Check scenario attributes
+    assert scenario.id != new_scenario.id
+    assert scenario.config_id == new_scenario.config_id == "scenario_1"
+    assert scenario.name == name
+    assert new_scenario.name == name
+    assert scenario.creation_date == creation_date
+    assert new_scenario.creation_date > creation_date
+    assert scenario.cycle is None
+    assert new_scenario.cycle is None
+
+    # Check task attributes
+    assert len(scenario.tasks) == len(new_scenario.tasks) == 1
+    task = scenario.tasks["task_1"]
+    new_task = new_scenario.tasks["task_1"]
+    assert task.id != new_task.id
+    assert task._config_id == new_task._config_id == "task_1"
+    assert task._owner_id == scenario.id
+    assert new_task._owner_id == new_scenario.id
+    assert task._parent_ids == {scenario.id, Sequence._new_id("sequence_1", scenario.id)}
+    assert new_task._parent_ids == {new_scenario.id, Sequence._new_id("sequence_1", new_scenario.id)}
+
+    assert task._function == new_task._function
+    assert task._skippable == new_task._skippable is True
+    assert task._properties == new_task._properties == {"k": "v"}
+
+    # Check data node attributes
+    assert len(scenario.data_nodes) == len(new_scenario.data_nodes) == 3
+    def assert_dn(config_id, ppties, additional_dn=False):
+        dn = scenario.data_nodes[config_id]
+        new_dn = new_scenario.data_nodes[config_id]
+        assert dn.id != new_dn.id
+        assert dn._config_id == new_dn._config_id == config_id
+        assert dn._owner_id == scenario.id
+        assert new_dn._owner_id == new_scenario.id
+        if additional_dn:
+            assert dn._parent_ids == {scenario.id}
+            assert new_dn._parent_ids == {new_scenario.id}
+        else:
+            assert dn._parent_ids == {task.id}
+            assert new_dn._parent_ids == {new_task.id}
+        assert dn._scope == new_dn._scope == Scope.SCENARIO
+        assert len(dn._properties) == len(new_dn._properties)
+        for k, v in dn._properties.items():
+            assert new_dn._properties[k] == v
+        for k, v in ppties.items():
+            assert dn._properties[k] == new_dn._properties[k] == v
+        # assert dn_1._last_edit_date < new_dn_1._last_edit_date>
+        # assert dn_1.edits
+        # assert new_dn_1.edits
+
+    assert_dn("dn_1", {"key1": "value1"})
+    assert_dn("dn_2", {"key1": "value2"})
+    assert_dn("additional_dn_1", {}, additional_dn=True)
+
+    # Check sequence attributes
+    assert len(scenario.sequences) == len(new_scenario.sequences) == 1
+    sequence = scenario.sequences["sequence_1"]
+    new_sequence = new_scenario.sequences["sequence_1"]
+    assert sequence.id != new_sequence.id
+    assert len(sequence._tasks) == len(new_sequence._tasks) == 1
+    assert sequence.tasks["task_1"].id == task.id
+    assert new_sequence.tasks["task_1"].id == new_task.id
+    assert sequence._owner_id == scenario.id
+    assert new_sequence._owner_id == new_scenario.id
+    assert sequence._parent_ids == {scenario.id}
+    assert new_sequence._parent_ids == {new_scenario.id}
+    assert len(sequence._properties) == len(new_sequence._properties)
+    for k, v in sequence._properties.items():
+        assert new_sequence._properties[k] == v
+    assert sequence._properties["key_seq"] == new_sequence._properties["key_seq"] == "value_seq"
+
+    # Check cycles, submissions and jobs are not duplicated
+    assert len(_ScenarioManager._get_all()) == 2
+    assert len(_DataManager._get_all()) == 6
+    assert len(_TaskManager._get_all()) == 2
+    assert len(_SequenceManager._get_all()) == 2
+    assert len(_CycleManager._get_all()) == 0
+    assert len(_SubmissionManager._get_all()) == 1
+    assert len(_JobManager._get_all()) == 1
+
+
+def test_duplicate_same_cycle():
+    dn_cfg_1 = Config.configure_pickle_data_node("dn_1", scope=Scope.GLOBAL)
+    dn_cfg_2 = Config.configure_pickle_data_node("dn_2", scope=Scope.CYCLE)
+    dn_cfg_3 = Config.configure_pickle_data_node("dn_3", scope=Scope.SCENARIO)
+    dn_cfg_4 = Config.configure_pickle_data_node("dn_4", scope=Scope.SCENARIO)
+    t_cfg_1 = Config.configure_task("task_1", identity, input=[dn_cfg_1], output=[dn_cfg_2])
+    t_cfg_2 = Config.configure_task("task_2", identity, input=[dn_cfg_2], output=[dn_cfg_3])
+    t_cfg_3 = Config.configure_task("task_3", identity, input=[dn_cfg_3], output=[dn_cfg_4])
+    s_cfg_1 = Config.configure_scenario("scenario_1", [t_cfg_1, t_cfg_2, t_cfg_3], frequency=Frequency.DAILY)
+    creation_date = datetime.now() - timedelta(days=2)
+    name = "original"
+    scenario = _ScenarioManager._create(s_cfg_1, creation_date, name)
+
+    assert len(_ScenarioManager._get_all()) == 1
+    assert len(_TaskManager._get_all()) == 3
+    assert len(_DataManager._get_all()) == 4
+
+    new_name = "new"
+    new_scenario = _ScenarioDuplicator(scenario, False).duplicate(creation_date, new_name)
+
+    assert len(_ScenarioManager._get_all()) == 2
+    assert len(_TaskManager._get_all()) == 5
+    assert len(_DataManager._get_all()) == 6
+
+    # Check scenario attributes
+    assert scenario.id != new_scenario.id
+    assert scenario.config_id == new_scenario.config_id == "scenario_1"
+    assert scenario.name == name
+    assert new_scenario.name == new_name
+    assert scenario.creation_date == new_scenario.creation_date == creation_date
+    assert scenario.cycle.id == new_scenario.cycle.id
+    assert len(scenario.sequences) == len(new_scenario.sequences) == 0
+
+    # Check tasks attributes
+    assert len(scenario.tasks) == len(new_scenario.tasks) == 3
+    task_1 = scenario.tasks["task_1"]
+    new_task_1 = new_scenario.tasks["task_1"]
+    assert task_1 == new_task_1
+    assert task_1.id == new_task_1.id
+    assert task_1._config_id == new_task_1._config_id == "task_1"
+    assert task_1._owner_id == new_task_1._owner_id == scenario.cycle.id
+    assert task_1._parent_ids == new_task_1._parent_ids == {scenario.id, new_scenario.id}
+    assert task_1._function == new_task_1._function
+    assert task_1._skippable == new_task_1._skippable is False
+    assert task_1._properties == new_task_1._properties == {}
+
+    task_2 = scenario.tasks["task_2"]
+    new_task_2 = new_scenario.tasks["task_2"]
+    assert task_2.id != new_task_2.id
+    assert task_2._config_id == new_task_2._config_id == "task_2"
+    assert task_2._owner_id == scenario.id
+    assert new_task_2._owner_id == new_scenario.id
+    assert task_2._parent_ids == {scenario.id}
+    assert new_task_2._parent_ids == {new_scenario.id}
+    assert task_2._function == new_task_2._function
+    assert task_2._skippable == new_task_2._skippable is False
+    assert task_2._properties == new_task_2._properties == {}
+
+    task_3 = scenario.tasks["task_3"]
+    new_task_3 = new_scenario.tasks["task_3"]
+    assert task_3.id != new_task_3.id
+    assert task_3._config_id == new_task_3._config_id == "task_3"
+    assert task_3._owner_id == scenario.id
+    assert new_task_3._owner_id == new_scenario.id
+    assert task_3._parent_ids == {scenario.id}
+    assert new_task_3._parent_ids == {new_scenario.id}
+    assert task_3._function == new_task_3._function
+    assert task_3._skippable == new_task_3._skippable is False
+    assert task_3._properties == new_task_3._properties == {}
+
+    # Check data node attributes
+    assert len(scenario.data_nodes) == len(new_scenario.data_nodes) == 4
+    dn_1 = scenario.data_nodes["dn_1"]
+    new_dn_1 = new_scenario.data_nodes["dn_1"]
+    assert dn_1.id == new_dn_1.id
+    assert dn_1._config_id == new_dn_1._config_id == "dn_1"
+    assert dn_1._scope == new_dn_1._scope == Scope.GLOBAL
+    assert dn_1._owner_id == new_dn_1._owner_id is None
+    assert dn_1._parent_ids == new_dn_1._parent_ids == {task_1.id, new_task_1.id}
+    assert dn_1._last_edit_date == new_dn_1._last_edit_date
+    assert dn_1._edits == new_dn_1._edits
+    assert len(dn_1._properties) == len(new_dn_1._properties)
+    for k, v in dn_1._properties.items():
+        assert new_dn_1._properties[k] == v
+
+    dn_2 = scenario.dn_2
+    new_dn_2 = new_scenario.dn_2
+    assert dn_2.id == new_dn_2.id
+    assert dn_2._config_id == new_dn_2._config_id == "dn_2"
+    assert dn_2._scope == new_dn_2._scope == Scope.CYCLE
+    assert dn_2._owner_id == new_dn_2._owner_id == scenario.cycle.id
+    assert dn_2._parent_ids == {task_1.id, task_2.id, new_task_2.id}
+    assert new_dn_2._parent_ids == {task_1.id, task_2.id, new_task_2.id}
+    assert dn_2._last_edit_date == new_dn_2._last_edit_date
+    assert dn_2._edits == new_dn_2._edits
+    assert len(dn_2._properties) == len(new_dn_2._properties)
+    for k, v in dn_2._properties.items():
+        assert new_dn_2._properties[k] == v
+
+    dn_3 = scenario.data_nodes["dn_3"]
+    new_dn_3 = new_scenario.data_nodes["dn_3"]
+    assert dn_3.id != new_dn_3.id
+    assert dn_3._config_id == new_dn_3._config_id == "dn_3"
+    assert dn_3._scope == new_dn_3._scope == Scope.SCENARIO
+    assert dn_3._owner_id == scenario.id
+    assert new_dn_3._owner_id == new_scenario.id
+    assert dn_3._parent_ids == {task_2.id, task_3.id}
+    assert new_dn_3._parent_ids == {new_task_2.id, new_task_3.id}
+    assert dn_3._last_edit_date == new_dn_3._last_edit_date
+    assert dn_3._edits == new_dn_3._edits
+    assert len(dn_3._properties) == len(new_dn_3._properties)
+    for k, v in dn_3._properties.items():
+        assert new_dn_3._properties[k] == v
+
+    dn_4 = scenario.data_nodes["dn_4"]
+    new_dn_4 = new_scenario.data_nodes["dn_4"]
+    assert dn_4.id != new_dn_4.id
+    assert dn_4._config_id == new_dn_4._config_id == "dn_4"
+    assert dn_4._scope == new_dn_4._scope == Scope.SCENARIO
+    assert dn_4._owner_id == scenario.id
+    assert new_dn_4._owner_id == new_scenario.id
+    assert dn_4._parent_ids == {task_3.id}
+    assert new_dn_4._parent_ids == {new_task_3.id}
+    assert dn_4._last_edit_date == new_dn_4._last_edit_date
+    assert dn_4._edits == new_dn_4._edits
+    assert len(dn_4._properties) == len(new_dn_4._properties)
+    for k, v in dn_4._properties.items():
+        assert new_dn_4._properties[k] == v
+
+
+def test_duplicate_to_new_cycle():
+    dn_cfg_1 = Config.configure_pickle_data_node("dn_1", scope=Scope.GLOBAL)
+    dn_cfg_2 = Config.configure_pickle_data_node("dn_2", scope=Scope.CYCLE)
+    dn_cfg_3 = Config.configure_pickle_data_node("dn_3", scope=Scope.SCENARIO)
+    dn_cfg_4 = Config.configure_pickle_data_node("dn_4", scope=Scope.SCENARIO)
+    t_cfg_1 = Config.configure_task("task_1", identity, input=[dn_cfg_1], output=[dn_cfg_2])
+    t_cfg_2 = Config.configure_task("task_2", identity, input=[dn_cfg_2], output=[dn_cfg_3])
+    t_cfg_3 = Config.configure_task("task_3", identity, input=[dn_cfg_3], output=[dn_cfg_4])
+    s_cfg_1 = Config.configure_scenario("scenario_1", [t_cfg_1, t_cfg_2, t_cfg_3], frequency=Frequency.DAILY)
+    creation_date = datetime.now() - timedelta(days=2)
+    name = "original"
+    scenario = _ScenarioManager._create(s_cfg_1, creation_date, name)
+
+    assert len(_CycleManager._get_all()) == 1
+    assert len(_ScenarioManager._get_all()) == 1
+    assert len(_TaskManager._get_all()) == 3
+    assert len(_DataManager._get_all()) == 4
+
+    new_creation_date = datetime.now()
+    new_name = "new"
+    new_scenario = _ScenarioDuplicator(scenario, False).duplicate(new_creation_date, new_name)
+
+    assert len(_CycleManager._get_all()) == 2
+    assert len(_ScenarioManager._get_all()) == 2
+    assert len(_TaskManager._get_all()) == 6
+    assert len(_DataManager._get_all()) == 7
+
+    # Check scenario attributes
+    assert scenario.id != new_scenario.id
+    assert scenario.config_id == new_scenario.config_id == "scenario_1"
+    assert scenario.name == name
+    assert new_scenario.name == new_name
+    assert scenario.creation_date == creation_date
+    assert new_scenario.creation_date == new_creation_date
+    assert scenario.cycle.id != new_scenario.cycle.id
+
+    # Check tasks attributes
+    assert len(scenario.tasks) == len(new_scenario.tasks) == 3
+    task_1 = scenario.tasks["task_1"]
+    new_task_1 = new_scenario.tasks["task_1"]
+    assert task_1 != new_task_1
+    assert task_1.id != new_task_1.id
+    assert task_1._config_id == new_task_1._config_id == "task_1"
+    assert task_1._owner_id == scenario.cycle.id
+    assert new_task_1._owner_id == new_scenario.cycle.id
+    assert task_1._parent_ids == {scenario.id}
+    assert new_task_1._parent_ids == {new_scenario.id}
+    assert task_1._function == new_task_1._function
+    assert task_1._skippable == new_task_1._skippable is False
+    assert task_1._properties == new_task_1._properties == {}
+
+    task_2 = scenario.tasks["task_2"]
+    new_task_2 = new_scenario.tasks["task_2"]
+    assert task_2.id != new_task_2.id
+    assert task_2._config_id == new_task_2._config_id == "task_2"
+    assert task_2._owner_id == scenario.id
+    assert new_task_2._owner_id == new_scenario.id
+    assert task_2._parent_ids == {scenario.id}
+    assert new_task_2._parent_ids == {new_scenario.id}
+    assert task_2._function == new_task_2._function
+    assert task_2._skippable == new_task_2._skippable is False
+    assert task_2._properties == new_task_2._properties == {}
+
+    task_3 = scenario.tasks["task_3"]
+    new_task_3 = new_scenario.tasks["task_3"]
+    assert task_3.id != new_task_3.id
+    assert task_3._config_id == new_task_3._config_id == "task_3"
+    assert task_3._owner_id == scenario.id
+    assert new_task_3._owner_id == new_scenario.id
+    assert task_3._parent_ids == {scenario.id}
+    assert new_task_3._parent_ids == {new_scenario.id}
+    assert task_3._function == new_task_3._function
+    assert task_3._skippable == new_task_3._skippable is False
+    assert task_3._properties == new_task_3._properties == {}
+
+    # Check data node attributes
+    assert len(scenario.data_nodes) == len(new_scenario.data_nodes) == 4
+    dn_1 = scenario.data_nodes["dn_1"]
+    new_dn_1 = new_scenario.data_nodes["dn_1"]
+    assert dn_1.id == new_dn_1.id
+    assert dn_1._config_id == new_dn_1._config_id == "dn_1"
+    assert dn_1._scope == new_dn_1._scope == Scope.GLOBAL
+    assert dn_1._owner_id == new_dn_1._owner_id is None
+    assert dn_1._parent_ids == new_dn_1._parent_ids == {task_1.id, new_task_1.id}
+    assert dn_1._last_edit_date == new_dn_1._last_edit_date
+    assert dn_1._edits == new_dn_1._edits
+    assert len(dn_1._properties) == len(new_dn_1._properties)
+    for k, v in dn_1._properties.items():
+        assert new_dn_1._properties[k] == v
+
+    dn_2 = scenario.dn_2
+    new_dn_2 = new_scenario.dn_2
+    assert dn_2.id != new_dn_2.id
+    assert dn_2._config_id == new_dn_2._config_id == "dn_2"
+    assert dn_2._scope == new_dn_2._scope == Scope.CYCLE
+    assert dn_2._owner_id == scenario.cycle.id
+    assert new_dn_2._owner_id == new_scenario.cycle.id
+    assert dn_2._parent_ids == {task_1.id, task_2.id}
+    assert new_dn_2._parent_ids == {new_task_1.id, new_task_2.id}
+    assert dn_2._last_edit_date == new_dn_2._last_edit_date
+    assert dn_2._edits == new_dn_2._edits
+    assert len(dn_2._properties) == len(new_dn_2._properties)
+    for k, v in dn_2._properties.items():
+        assert new_dn_2._properties[k] == v
+
+    dn_3 = scenario.data_nodes["dn_3"]
+    new_dn_3 = new_scenario.data_nodes["dn_3"]
+    assert dn_3.id != new_dn_3.id
+    assert dn_3._config_id == new_dn_3._config_id == "dn_3"
+    assert dn_3._scope == new_dn_3._scope == Scope.SCENARIO
+    assert dn_3._owner_id == scenario.id
+    assert new_dn_3._owner_id == new_scenario.id
+    assert dn_3._parent_ids == {task_2.id, task_3.id}
+    assert new_dn_3._parent_ids == {new_task_2.id, new_task_3.id}
+    assert dn_3._last_edit_date == new_dn_3._last_edit_date
+    assert dn_3._edits == new_dn_3._edits
+    assert len(dn_3._properties) == len(new_dn_3._properties)
+    for k, v in dn_3._properties.items():
+        assert new_dn_3._properties[k] == v
+
+    dn_4 = scenario.data_nodes["dn_4"]
+    new_dn_4 = new_scenario.data_nodes["dn_4"]
+    assert dn_4.id != new_dn_4.id
+    assert dn_4._config_id == new_dn_4._config_id == "dn_4"
+    assert dn_4._scope == new_dn_4._scope == Scope.SCENARIO
+    assert dn_4._owner_id == scenario.id
+    assert new_dn_4._owner_id == new_scenario.id
+    assert dn_4._parent_ids == {task_3.id}
+    assert new_dn_4._parent_ids == {new_task_3.id}
+    assert dn_4._last_edit_date == new_dn_4._last_edit_date
+    assert dn_4._edits == new_dn_4._edits
+    assert len(dn_4._properties) == len(new_dn_4._properties)
+    for k, v in dn_4._properties.items():
+        assert new_dn_4._properties[k] == v
+
+
+def test_duplicate_to_new_cycle_with_existing_scenario():
+    dn_cfg_1 = Config.configure_pickle_data_node("dn_1", scope=Scope.GLOBAL)
+    dn_cfg_2 = Config.configure_pickle_data_node("dn_2", scope=Scope.CYCLE)
+    dn_cfg_3 = Config.configure_pickle_data_node("dn_3", scope=Scope.SCENARIO)
+    dn_cfg_4 = Config.configure_pickle_data_node("dn_4", scope=Scope.SCENARIO)
+    t_cfg_1 = Config.configure_task("task_1", identity, input=[dn_cfg_1], output=[dn_cfg_2])
+    t_cfg_2 = Config.configure_task("task_2", identity, input=[dn_cfg_2], output=[dn_cfg_3])
+    t_cfg_3 = Config.configure_task("task_3", identity, input=[dn_cfg_3], output=[dn_cfg_4])
+    s_cfg_1 = Config.configure_scenario("scenario_1", [t_cfg_1, t_cfg_2, t_cfg_3], frequency=Frequency.DAILY)
+    creation_date = datetime.now() - timedelta(days=2)
+    new_creation_date = datetime.now()
+    name = "original"
+    existing_name = "existing"
+    new_name = "new"
+
+    scenario = _ScenarioManager._create(s_cfg_1, creation_date, name)
+    existing_scenario = _ScenarioManager._create(s_cfg_1, new_creation_date, existing_name)
+
+    assert len(_CycleManager._get_all()) == 2
+    assert len(_ScenarioManager._get_all()) == 2
+    assert len(_TaskManager._get_all()) == 6
+    assert len(_DataManager._get_all()) == 7
+    new_scenario = _ScenarioDuplicator(scenario, False).duplicate(new_creation_date, new_name)
+
+    assert len(_CycleManager._get_all()) == 2
+    assert len(_ScenarioManager._get_all()) == 3
+    assert len(_TaskManager._get_all()) == 8
+    assert len(_DataManager._get_all()) == 9
+
+    # Check scenario attributes
+    assert scenario.id != new_scenario.id != existing_scenario.id
+    assert scenario.config_id == new_scenario.config_id == "scenario_1"
+    assert scenario.name == name
+    assert new_scenario.name == new_name
+    assert scenario.creation_date == creation_date
+    assert new_scenario.creation_date == new_creation_date
+    assert scenario.cycle.id != new_scenario.cycle.id == existing_scenario.cycle.id
+
+    # Check tasks attributes
+    assert len(scenario.tasks) == len(new_scenario.tasks) == 3
+    task_1 = scenario.tasks["task_1"]
+    existing_task_1 = existing_scenario.tasks["task_1"]
+    new_task_1 = new_scenario.tasks["task_1"]
+    assert existing_task_1 == new_task_1
+    assert existing_task_1.id == new_task_1.id
+    assert task_1._config_id == existing_task_1._config_id == new_task_1._config_id == "task_1"
+    assert task_1._owner_id == scenario.cycle.id
+    assert existing_task_1._owner_id == new_task_1._owner_id == existing_scenario.cycle.id
+    assert task_1._parent_ids == {scenario.id}
+    assert existing_task_1._parent_ids == new_task_1._parent_ids == {existing_scenario.id, new_scenario.id}
+    assert task_1._function == existing_task_1._function == new_task_1._function
+    assert task_1._skippable == existing_task_1._skippable == new_task_1._skippable is False
+    assert task_1._properties == existing_task_1._properties == new_task_1._properties == {}
+
+    task_2 = scenario.tasks["task_2"]
+    existing_task_2 = existing_scenario.tasks["task_2"]
+    new_task_2 = new_scenario.tasks["task_2"]
+    assert task_2.id != new_task_2.id
+    assert task_2._config_id == new_task_2._config_id == "task_2"
+    assert task_2._owner_id == scenario.id
+    assert new_task_2._owner_id == new_scenario.id
+    assert task_2._parent_ids == {scenario.id}
+    assert new_task_2._parent_ids == {new_scenario.id}
+    assert task_2._function == new_task_2._function
+    assert task_2._skippable == new_task_2._skippable is False
+    assert task_2._properties == new_task_2._properties == {}
+
+    task_3 = scenario.tasks["task_3"]
+    new_task_3 = new_scenario.tasks["task_3"]
+    assert task_3.id != new_task_3.id
+    assert task_3._config_id == new_task_3._config_id == "task_3"
+    assert task_3._owner_id == scenario.id
+    assert new_task_3._owner_id == new_scenario.id
+    assert task_3._parent_ids == {scenario.id}
+    assert new_task_3._parent_ids == {new_scenario.id}
+    assert task_3._function == new_task_3._function
+    assert task_3._skippable == new_task_3._skippable is False
+    assert task_3._properties == new_task_3._properties == {}
+
+    # Check data node attributes
+    assert len(scenario.data_nodes) == len(new_scenario.data_nodes) == 4
+    dn_1 = scenario.data_nodes["dn_1"]
+    new_dn_1 = new_scenario.data_nodes["dn_1"]
+    assert dn_1.id == new_dn_1.id
+    assert dn_1._config_id == new_dn_1._config_id == "dn_1"
+    assert dn_1._scope == new_dn_1._scope == Scope.GLOBAL
+    assert dn_1._owner_id == new_dn_1._owner_id is None
+    assert dn_1._parent_ids == new_dn_1._parent_ids == {task_1.id, new_task_1.id}
+    assert dn_1._last_edit_date == new_dn_1._last_edit_date
+    assert dn_1._edits == new_dn_1._edits
+    assert len(dn_1._properties) == len(new_dn_1._properties)
+    for k, v in dn_1._properties.items():
+        assert new_dn_1._properties[k] == v
+
+    dn_2 = scenario.dn_2
+    existing_dn_2 = existing_scenario.dn_2
+    new_dn_2 = new_scenario.dn_2
+    assert dn_2.id != new_dn_2.id == existing_dn_2.id
+    assert dn_2._config_id == existing_dn_2._config_id == new_dn_2._config_id == "dn_2"
+    assert dn_2._scope == existing_dn_2._scope == new_dn_2._scope == Scope.CYCLE
+    assert dn_2._owner_id == scenario.cycle.id
+    assert existing_dn_2._owner_id == new_dn_2._owner_id == new_scenario.cycle.id
+    assert dn_2._parent_ids == {task_1.id, task_2.id}
+    assert existing_dn_2._parent_ids == new_dn_2._parent_ids == {new_task_1.id, new_task_2.id, existing_task_2.id}
+    assert dn_2._last_edit_date == new_dn_2._last_edit_date
+    assert dn_2._edits == new_dn_2._edits
+    assert len(dn_2._properties) == len(new_dn_2._properties)
+    for k, v in dn_2._properties.items():
+        if k == "path":
+            assert new_dn_2._properties[k] == existing_dn_2._properties[k] != v
+        else:
+            assert new_dn_2._properties[k] == existing_dn_2._properties[k] == v
+
+    dn_3 = scenario.data_nodes["dn_3"]
+    existing_dn_3 = existing_scenario.data_nodes["dn_3"]
+    new_dn_3 = new_scenario.data_nodes["dn_3"]
+    assert dn_3.id != new_dn_3.id != existing_dn_3.id
+    assert dn_3._config_id == new_dn_3._config_id == existing_dn_3._config_id == "dn_3"
+    assert dn_3._scope == new_dn_3._scope == existing_dn_3._scope == Scope.SCENARIO
+    assert dn_3._owner_id == scenario.id
+    assert existing_dn_3._owner_id != new_dn_3._owner_id == new_scenario.id
+    assert dn_3._parent_ids == {task_2.id, task_3.id}
+    assert new_dn_3._parent_ids == {new_task_2.id, new_task_3.id}
+    assert dn_3._last_edit_date == new_dn_3._last_edit_date
+    assert dn_3._edits == new_dn_3._edits
+    assert len(dn_3._properties) == len(new_dn_3._properties)
+    for k, v in dn_3._properties.items():
+        assert new_dn_3._properties[k] == v
+
+    dn_4 = scenario.data_nodes["dn_4"]
+    new_dn_4 = new_scenario.data_nodes["dn_4"]
+    assert dn_4.id != new_dn_4.id
+    assert dn_4._config_id == new_dn_4._config_id == "dn_4"
+    assert dn_4._scope == new_dn_4._scope == Scope.SCENARIO
+    assert dn_4._owner_id == scenario.id
+    assert new_dn_4._owner_id == new_scenario.id
+    assert dn_4._parent_ids == {task_3.id}
+    assert new_dn_4._parent_ids == {new_task_3.id}
+    assert dn_4._last_edit_date == new_dn_4._last_edit_date
+    assert dn_4._edits == new_dn_4._edits
+    assert len(dn_4._properties) == len(new_dn_4._properties)
+    for k, v in dn_4._properties.items():
+        assert new_dn_4._properties[k] == v
+
+
+def test_duplicate_with_all_global_dn():
+    dn_config_1 = Config.configure_pickle_data_node("dn_1", scope=Scope.GLOBAL)
+    dn_config_2 = Config.configure_pickle_data_node("dn_2", scope=Scope.GLOBAL)
+    task_config_1 = Config.configure_task("task_1", print, [dn_config_1], [dn_config_2])
+    scenario_config_1 = Config.configure_scenario("scenario_1", [task_config_1], frequency=Frequency.DAILY)
+    scenario = _ScenarioManager._create(scenario_config_1, datetime.now() - timedelta(days=10), "original")
+
+    assert len(_ScenarioManager._get_all()) == 1
+    assert len(_DataManager._get_all()) == 2
+    assert len(_TaskManager._get_all()) == 1
+
+    new_scenario = _ScenarioDuplicator(scenario, False).duplicate(datetime.now(), "new")
+
+    assert len(_ScenarioManager._get_all()) == 2
+    assert len(_DataManager._get_all()) == 2
+    assert len(_TaskManager._get_all()) == 1
+
+    # Check scenario attributes
+    assert scenario.id != new_scenario.id
+    assert scenario.config_id == new_scenario.config_id == "scenario_1"
+    assert scenario.name == "original"
+    assert new_scenario.name == "new"
+    assert scenario.creation_date != new_scenario.creation_date
+    assert scenario.cycle.id != new_scenario.cycle.id
+
+    # Check tasks attributes
+    assert len(scenario.tasks) == len(new_scenario.tasks) == 1
+    task_1 = scenario.tasks["task_1"]
+    new_task_1 = new_scenario.tasks["task_1"]
+    assert task_1 == new_task_1
+    assert task_1.id == new_task_1.id
+    assert task_1._config_id == new_task_1._config_id == "task_1"
+    assert task_1._owner_id == new_task_1._owner_id is None
+    assert task_1._parent_ids == new_task_1._parent_ids == {scenario.id, new_scenario.id}
+    assert task_1._function == new_task_1._function
+    assert task_1._skippable == new_task_1._skippable is False
+    assert task_1._properties == new_task_1._properties == {}
+
+    # Check data node attributes
+    assert len(scenario.data_nodes) == len(new_scenario.data_nodes) == 2
+    dn_1 = scenario.data_nodes["dn_1"]
+    new_dn_1 = new_scenario.data_nodes["dn_1"]
+    assert dn_1.id == new_dn_1.id
+    assert dn_1._config_id == new_dn_1._config_id == "dn_1"
+    assert dn_1._scope == new_dn_1._scope == Scope.GLOBAL
+    assert dn_1._owner_id == new_dn_1._owner_id is None
+    assert dn_1._parent_ids == new_dn_1._parent_ids == {task_1.id, new_task_1.id}
+    assert dn_1._last_edit_date == new_dn_1._last_edit_date
+    assert dn_1._edits == new_dn_1._edits
+    assert len(dn_1._properties) == len(new_dn_1._properties)
+    for k, v in dn_1._properties.items():
+        assert new_dn_1._properties[k] == v
+
+    # Check data node attributes
+    dn_2 = scenario.dn_2
+    new_dn_2 = new_scenario.dn_2
+    assert dn_2.id == new_dn_2.id
+    assert dn_2._config_id == new_dn_2._config_id == "dn_2"
+    assert dn_2._scope == new_dn_2._scope == Scope.GLOBAL
+    assert dn_2._owner_id == new_dn_2._owner_id is None
+    assert dn_2._parent_ids == {task_1.id}
+    assert dn_2._last_edit_date == new_dn_2._last_edit_date
+    assert dn_2._edits == new_dn_2._edits
+    assert len(dn_2._properties) == len(new_dn_2._properties)
+    for k, v in dn_2._properties.items():
+        assert new_dn_2._properties[k] == v

+ 44 - 5
tests/core/scenario/test_scenario_manager.py

@@ -11,6 +11,7 @@
 
 from datetime import datetime, timedelta
 from typing import Callable, Iterable, Optional
+from unittest import mock
 from unittest.mock import ANY, patch
 
 import freezegun
@@ -41,7 +42,8 @@ from taipy.core.exceptions.exceptions import (
     UnauthorizedTagError,
 )
 from taipy.core.job._job_manager import _JobManager
-from taipy.core.reason import WrongConfigType
+from taipy.core.reason import EntityDoesNotExist, ReasonCollection, WrongConfigType
+from taipy.core.scenario._scenario_duplicator import _ScenarioDuplicator
 from taipy.core.scenario._scenario_manager import _ScenarioManager
 from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
 from taipy.core.scenario.scenario import Scenario
@@ -385,14 +387,14 @@ def test_can_create():
     reasons = _ScenarioManager._can_create(task_config)
     assert bool(reasons) is False
     assert reasons._reasons[task_config.id] == {WrongConfigType(task_config.id, ScenarioConfig.__name__)}
-    assert str(list(reasons._reasons[task_config.id])[0]) == 'Object "task" must be a valid ScenarioConfig'
+    assert str(list(reasons._reasons[task_config.id])[0]) == "Object 'task' must be a valid ScenarioConfig"
     with pytest.raises(AttributeError):
         _ScenarioManager._create(task_config)
 
     reasons = _ScenarioManager._can_create(1)
     assert bool(reasons) is False
     assert reasons._reasons["1"] == {WrongConfigType(1, ScenarioConfig.__name__)}
-    assert str(list(reasons._reasons["1"])[0]) == 'Object "1" must be a valid ScenarioConfig'
+    assert str(list(reasons._reasons["1"])[0]) == "Object '1' must be a valid ScenarioConfig"
     with pytest.raises(AttributeError):
         _ScenarioManager._create(1)
 
@@ -406,7 +408,7 @@ def test_is_deletable():
 
     rc = _ScenarioManager._is_deletable("some_scenario")
     assert not rc
-    assert "Entity some_scenario does not exist in the repository." in rc.reasons
+    assert "Entity 'some_scenario' does not exist in the repository." in rc.reasons
 
     assert len(_ScenarioManager._get_all()) == 2
     assert scenario_1_primary.is_primary
@@ -1049,7 +1051,7 @@ def test_is_submittable():
 
     rc = _ScenarioManager._is_submittable("some_scenario")
     assert not rc
-    assert "Entity some_scenario does not exist in the repository." in rc.reasons
+    assert "Entity 'some_scenario' does not exist in the repository." in rc.reasons
 
     assert len(_ScenarioManager._get_all()) == 1
     assert _ScenarioManager._is_submittable(scenario)
@@ -1553,3 +1555,40 @@ def test_filter_scenarios_by_creation_datetime():
     )
     assert len(filtered_scenarios) == 1
     assert [s_1_1] == filtered_scenarios
+
+
+def test_can_duplicate_scenario():
+    dn_config = Config.configure_pickle_data_node("dn")
+    task_config = Config.configure_task("task_1", print, [dn_config])
+    scenario_config = Config.configure_scenario("scenario_1", [task_config])
+    scenario = _ScenarioManager._create(scenario_config)
+
+    reasons = _ScenarioManager._can_duplicate(scenario)
+    assert bool(reasons)
+    assert reasons._reasons == {}
+
+    reasons = _ScenarioManager._can_duplicate(scenario.id)
+    assert bool(reasons)
+    assert reasons._reasons == {}
+
+    reasons = _ScenarioManager._can_duplicate("WRONG_ID")
+    assert not bool(reasons)
+    assert reasons._reasons["WRONG_ID"] == {EntityDoesNotExist("WRONG_ID")}
+    assert str(list(reasons._reasons["WRONG_ID"])[0]) == "Entity 'WRONG_ID' does not exist in the repository"
+
+
+def test_duplicate_scenario():
+    scenario = Scenario("config_id", set(), {}, set(), ScenarioId("scenario_id"))
+    with mock.patch.object(_ScenarioManager, "_can_duplicate", return_value= ReasonCollection()) as mock_can:
+        with mock.patch.object(_ScenarioDuplicator, "duplicate") as mock_duplicate:
+            _ScenarioManager._duplicate(scenario)
+            mock_can.assert_called_once_with(scenario)
+            mock_duplicate.assert_called_once_with(None, None)
+            mock_duplicate.reset_mock()
+            mock_can.reset_mock()
+
+            new_date = datetime.now()
+            new_name = "new_name"
+            _ScenarioManager._duplicate(scenario, new_date, new_name)
+            mock_can.assert_called_once_with(scenario)
+            mock_duplicate.assert_called_once_with(new_date, new_name)

+ 1 - 1
tests/core/sequence/test_sequence_manager.py

@@ -208,7 +208,7 @@ def test_is_submittable():
 
     rc = _SequenceManager._is_submittable("some_sequence")
     assert not rc
-    assert "Entity some_sequence does not exist in the repository." in rc.reasons
+    assert "Entity 'some_sequence' does not exist in the repository." in rc.reasons
 
     scenario.add_sequences({"sequence": [task]})
     sequence = scenario.sequences["sequence"]

+ 1 - 1
tests/core/submission/test_submission_manager.py

@@ -153,7 +153,7 @@ def test_is_deletable():
 
     rc = submission_manager._is_deletable("some_submission")
     assert not rc
-    assert "Entity some_submission does not exist in the repository." in rc.reasons
+    assert "Entity 'some_submission' does not exist in the repository." in rc.reasons
 
     assert submission._submission_status == SubmissionStatus.SUBMITTED
     assert not submission.is_deletable()

+ 20 - 1
tests/core/task/test_task_manager.py

@@ -22,6 +22,7 @@ from taipy.core._version._version_manager import _VersionManager
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.data.in_memory import InMemoryDataNode
 from taipy.core.exceptions.exceptions import ModelNotFound, NonExistingTask
+from taipy.core.reason import EntityDoesNotExist
 from taipy.core.task._task_manager import _TaskManager
 from taipy.core.task._task_manager_factory import _TaskManagerFactory
 from taipy.core.task.task import Task
@@ -310,7 +311,7 @@ def test_is_submittable():
 
     rc = _TaskManager._is_submittable("some_task")
     assert not rc
-    assert "Entity some_task does not exist in the repository" in rc.reasons
+    assert "Entity 'some_task' does not exist in the repository" in rc.reasons
 
     assert len(_TaskManager._get_all()) == 1
     assert _TaskManager._is_submittable(task)
@@ -483,3 +484,21 @@ def test_get_scenarios_by_config_id_in_multiple_versions_environment():
 
 def _create_task_from_config(task_config, *args, **kwargs):
     return _TaskManager._bulk_get_or_create([task_config], *args, **kwargs)[0]
+
+def test_can_duplicate():
+    dn_config = Config.configure_pickle_data_node("dn", scope=Scope.SCENARIO)
+    task_config = Config.configure_task("task_1", print, [dn_config])
+    task = _TaskManager._bulk_get_or_create([task_config])[0]
+
+    reasons = _TaskManager._can_duplicate(task.id)
+    assert bool(reasons)
+    assert reasons._reasons == {}
+
+    reasons = _TaskManager._can_duplicate(task)
+    assert bool(reasons)
+    assert reasons._reasons == {}
+
+    reasons = _TaskManager._can_duplicate("1")
+    assert not bool(reasons)
+    assert reasons._reasons["1"] == {EntityDoesNotExist("1")}
+    assert str(list(reasons._reasons["1"])[0]) == "Entity '1' does not exist in the repository"

+ 26 - 0
tests/core/test_taipy.py

@@ -869,3 +869,29 @@ class TestTaipy:
         assert len(tp.get_scenarios()) == 5
         assert len(tp.get_entities_by_config_id(scenario_config_1.id)) == 3
         assert len(tp.get_entities_by_config_id(scenario_config_2.id)) == 2
+
+    def test_can_duplicate(self):
+        dn_config = Config.configure_in_memory_data_node("dn", 10)
+        task_config = Config.configure_task("task", print, [dn_config])
+        scenario_config = Config.configure_scenario("sc", {task_config}, [], Frequency.DAILY)
+
+        scenario = tp.create_scenario(scenario_config)
+        assert tp.can_duplicate(scenario)
+        assert not tp.can_duplicate("1")
+
+    def test_duplicate_scenario(self):
+        dn_config = Config.configure_in_memory_data_node("dn", 10)
+        task_config = Config.configure_task("task", print, [dn_config])
+        scenario_config = Config.configure_scenario("sc", {task_config}, [], Frequency.DAILY)
+
+        scenario = tp.create_scenario(scenario_config)
+
+        with mock.patch("taipy.core.scenario._scenario_manager._ScenarioManager._duplicate") as mck:
+            tp.duplicate_scenario(scenario)
+            mck.assert_called_once_with(scenario, None, None)
+        with mock.patch("taipy.core.scenario._scenario_manager._ScenarioManager._duplicate") as mck:
+            tp.duplicate_scenario(scenario, datetime.datetime(2022, 2, 5))
+            mck.assert_called_once_with(scenario, datetime.datetime(2022, 2, 5), None)
+        with mock.patch("taipy.core.scenario._scenario_manager._ScenarioManager._duplicate") as mck:
+            tp.duplicate_scenario(scenario, datetime.datetime(2022, 2, 5), "displayable_name")
+            mck.assert_called_once_with(scenario, datetime.datetime(2022, 2, 5), "displayable_name")