|
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- from datetime import datetime, timedelta
- from taipy import Config, Frequency, Scope, Sequence
- from taipy.core.cycle._cycle_manager import _CycleManager
- from taipy.core.data._data_manager import _DataManager
- from taipy.core.job._job_manager import _JobManager
- from taipy.core.scenario._scenario_duplicator import _ScenarioDuplicator
- from taipy.core.scenario._scenario_manager import _ScenarioManager
- from taipy.core.sequence._sequence_manager import _SequenceManager
- from taipy.core.submission._submission_manager import _SubmissionManager
- from taipy.core.task._task_manager import _TaskManager
- def identity(x):
- return x
- def test_constructor():
- dn_config_1 = Config.configure_pickle_data_node("dn_1")
- additional_dn_config_1 = Config.configure_data_node("additional_dn_1")
- task_config_1 = Config.configure_task("task_1", print, [dn_config_1], [])
- scenario_config_1 = Config.configure_scenario("scenario_1", [task_config_1], [additional_dn_config_1])
- scenario = _ScenarioManager._create(scenario_config_1)
- duplicator = _ScenarioDuplicator(scenario)
- assert duplicator.scenario == scenario
- assert duplicator.data_to_duplicate == {"dn_1", "additional_dn_1"}
- duplicator = _ScenarioDuplicator(scenario, True)
- assert duplicator.scenario == scenario
- assert duplicator.data_to_duplicate == {"dn_1", "additional_dn_1"}
- duplicator = _ScenarioDuplicator(scenario, False)
- assert duplicator.scenario == scenario
- assert duplicator.data_to_duplicate == set()
- duplicator = _ScenarioDuplicator(scenario, {"dn_1"})
- assert duplicator.scenario == scenario
- assert duplicator.data_to_duplicate == {"dn_1"}
- duplicator = _ScenarioDuplicator(scenario, {"additional_dn_1"})
- assert duplicator.scenario == scenario
- assert duplicator.data_to_duplicate == {"additional_dn_1"}
- def test_duplicate_scenario_scoped_dns_no_cycle_one_sequence():
- dn_config_1 = Config.configure_pickle_data_node("dn_1", default_data="data1", key1="value1")
- dn_config_2 = Config.configure_pickle_data_node("dn_2", key1="value2")
- additional_dn_config_1 = Config.configure_data_node("additional_dn_1")
- task_config_1 = Config.configure_task("task_1",
- print,
- input=[dn_config_1],
- output=[dn_config_2],
- skippable=True,
- k="v")
- scenario_config_1 = Config.configure_scenario("scenario_1", [task_config_1], [additional_dn_config_1])
- creation_date = datetime.now() - timedelta(minutes=1)
- name = "original"
- scenario = _ScenarioManager._create(scenario_config_1, creation_date, name)
- scenario.properties["key"] = "value"
- scenario.add_sequence("sequence_1", [scenario.task_1], {"key_seq": "value_seq"})
- scenario.submit()
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_DataManager._get_all()) == 3
- assert len(_TaskManager._get_all()) == 1
- assert len(_SequenceManager._get_all()) == 1
- assert len(_CycleManager._get_all()) == 0
- assert len(_SubmissionManager._get_all()) == 1
- assert len(_JobManager._get_all()) == 1
- new_scenario = _ScenarioDuplicator(scenario, False).duplicate()
- # Check scenario attributes
- assert scenario.id != new_scenario.id
- assert scenario.config_id == new_scenario.config_id == "scenario_1"
- assert scenario.name == name
- assert new_scenario.name == name
- assert scenario.creation_date == creation_date
- assert new_scenario.creation_date > creation_date
- assert scenario.cycle is None
- assert new_scenario.cycle is None
- # Check task attributes
- assert len(scenario.tasks) == len(new_scenario.tasks) == 1
- task = scenario.tasks["task_1"]
- new_task = new_scenario.tasks["task_1"]
- assert task.id != new_task.id
- assert task._config_id == new_task._config_id == "task_1"
- assert task._owner_id == scenario.id
- assert new_task._owner_id == new_scenario.id
- assert task._parent_ids == {scenario.id, Sequence._new_id("sequence_1", scenario.id)}
- assert new_task._parent_ids == {new_scenario.id, Sequence._new_id("sequence_1", new_scenario.id)}
- assert task._function == new_task._function
- assert task._skippable == new_task._skippable is True
- assert task._properties == new_task._properties == {"k": "v"}
- # Check data node attributes
- assert len(scenario.data_nodes) == len(new_scenario.data_nodes) == 3
- def assert_dn(config_id, ppties, additional_dn=False):
- dn = scenario.data_nodes[config_id]
- new_dn = new_scenario.data_nodes[config_id]
- assert dn.id != new_dn.id
- assert dn._config_id == new_dn._config_id == config_id
- assert dn._owner_id == scenario.id
- assert new_dn._owner_id == new_scenario.id
- if additional_dn:
- assert dn._parent_ids == {scenario.id}
- assert new_dn._parent_ids == {new_scenario.id}
- else:
- assert dn._parent_ids == {task.id}
- assert new_dn._parent_ids == {new_task.id}
- assert dn._scope == new_dn._scope == Scope.SCENARIO
- assert len(dn._properties) == len(new_dn._properties)
- for k, v in dn._properties.items():
- assert new_dn._properties[k] == v
- for k, v in ppties.items():
- assert dn._properties[k] == new_dn._properties[k] == v
- # assert dn_1._last_edit_date < new_dn_1._last_edit_date>
- # assert dn_1.edits
- # assert new_dn_1.edits
- assert_dn("dn_1", {"key1": "value1"})
- assert_dn("dn_2", {"key1": "value2"})
- assert_dn("additional_dn_1", {}, additional_dn=True)
- # Check sequence attributes
- assert len(scenario.sequences) == len(new_scenario.sequences) == 1
- sequence = scenario.sequences["sequence_1"]
- new_sequence = new_scenario.sequences["sequence_1"]
- assert sequence.id != new_sequence.id
- assert len(sequence._tasks) == len(new_sequence._tasks) == 1
- assert sequence.tasks["task_1"].id == task.id
- assert new_sequence.tasks["task_1"].id == new_task.id
- assert sequence._owner_id == scenario.id
- assert new_sequence._owner_id == new_scenario.id
- assert sequence._parent_ids == {scenario.id}
- assert new_sequence._parent_ids == {new_scenario.id}
- assert len(sequence._properties) == len(new_sequence._properties)
- for k, v in sequence._properties.items():
- assert new_sequence._properties[k] == v
- assert sequence._properties["key_seq"] == new_sequence._properties["key_seq"] == "value_seq"
- # Check cycles, submissions and jobs are not duplicated
- assert len(_ScenarioManager._get_all()) == 2
- assert len(_DataManager._get_all()) == 6
- assert len(_TaskManager._get_all()) == 2
- assert len(_SequenceManager._get_all()) == 2
- assert len(_CycleManager._get_all()) == 0
- assert len(_SubmissionManager._get_all()) == 1
- assert len(_JobManager._get_all()) == 1
- def test_duplicate_same_cycle():
- dn_cfg_1 = Config.configure_pickle_data_node("dn_1", scope=Scope.GLOBAL)
- dn_cfg_2 = Config.configure_pickle_data_node("dn_2", scope=Scope.CYCLE)
- dn_cfg_3 = Config.configure_pickle_data_node("dn_3", scope=Scope.SCENARIO)
- dn_cfg_4 = Config.configure_pickle_data_node("dn_4", scope=Scope.SCENARIO)
- t_cfg_1 = Config.configure_task("task_1", identity, input=[dn_cfg_1], output=[dn_cfg_2])
- t_cfg_2 = Config.configure_task("task_2", identity, input=[dn_cfg_2], output=[dn_cfg_3])
- t_cfg_3 = Config.configure_task("task_3", identity, input=[dn_cfg_3], output=[dn_cfg_4])
- s_cfg_1 = Config.configure_scenario("scenario_1", [t_cfg_1, t_cfg_2, t_cfg_3], frequency=Frequency.DAILY)
- creation_date = datetime.now() - timedelta(days=2)
- name = "original"
- scenario = _ScenarioManager._create(s_cfg_1, creation_date, name)
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_TaskManager._get_all()) == 3
- assert len(_DataManager._get_all()) == 4
- new_name = "new"
- new_scenario = _ScenarioDuplicator(scenario, False).duplicate(creation_date, new_name)
- assert len(_ScenarioManager._get_all()) == 2
- assert len(_TaskManager._get_all()) == 5
- assert len(_DataManager._get_all()) == 6
- # Check scenario attributes
- assert scenario.id != new_scenario.id
- assert scenario.config_id == new_scenario.config_id == "scenario_1"
- assert scenario.name == name
- assert new_scenario.name == new_name
- assert scenario.creation_date == new_scenario.creation_date == creation_date
- assert scenario.cycle.id == new_scenario.cycle.id
- assert len(scenario.sequences) == len(new_scenario.sequences) == 0
- # Check tasks attributes
- assert len(scenario.tasks) == len(new_scenario.tasks) == 3
- task_1 = scenario.tasks["task_1"]
- new_task_1 = new_scenario.tasks["task_1"]
- assert task_1 == new_task_1
- assert task_1.id == new_task_1.id
- assert task_1._config_id == new_task_1._config_id == "task_1"
- assert task_1._owner_id == new_task_1._owner_id == scenario.cycle.id
- assert task_1._parent_ids == new_task_1._parent_ids == {scenario.id, new_scenario.id}
- assert task_1._function == new_task_1._function
- assert task_1._skippable == new_task_1._skippable is False
- assert task_1._properties == new_task_1._properties == {}
- task_2 = scenario.tasks["task_2"]
- new_task_2 = new_scenario.tasks["task_2"]
- assert task_2.id != new_task_2.id
- assert task_2._config_id == new_task_2._config_id == "task_2"
- assert task_2._owner_id == scenario.id
- assert new_task_2._owner_id == new_scenario.id
- assert task_2._parent_ids == {scenario.id}
- assert new_task_2._parent_ids == {new_scenario.id}
- assert task_2._function == new_task_2._function
- assert task_2._skippable == new_task_2._skippable is False
- assert task_2._properties == new_task_2._properties == {}
- task_3 = scenario.tasks["task_3"]
- new_task_3 = new_scenario.tasks["task_3"]
- assert task_3.id != new_task_3.id
- assert task_3._config_id == new_task_3._config_id == "task_3"
- assert task_3._owner_id == scenario.id
- assert new_task_3._owner_id == new_scenario.id
- assert task_3._parent_ids == {scenario.id}
- assert new_task_3._parent_ids == {new_scenario.id}
- assert task_3._function == new_task_3._function
- assert task_3._skippable == new_task_3._skippable is False
- assert task_3._properties == new_task_3._properties == {}
- # Check data node attributes
- assert len(scenario.data_nodes) == len(new_scenario.data_nodes) == 4
- dn_1 = scenario.data_nodes["dn_1"]
- new_dn_1 = new_scenario.data_nodes["dn_1"]
- assert dn_1.id == new_dn_1.id
- assert dn_1._config_id == new_dn_1._config_id == "dn_1"
- assert dn_1._scope == new_dn_1._scope == Scope.GLOBAL
- assert dn_1._owner_id == new_dn_1._owner_id is None
- assert dn_1._parent_ids == new_dn_1._parent_ids == {task_1.id, new_task_1.id}
- assert dn_1._last_edit_date == new_dn_1._last_edit_date
- assert dn_1._edits == new_dn_1._edits
- assert len(dn_1._properties) == len(new_dn_1._properties)
- for k, v in dn_1._properties.items():
- assert new_dn_1._properties[k] == v
- dn_2 = scenario.dn_2
- new_dn_2 = new_scenario.dn_2
- assert dn_2.id == new_dn_2.id
- assert dn_2._config_id == new_dn_2._config_id == "dn_2"
- assert dn_2._scope == new_dn_2._scope == Scope.CYCLE
- assert dn_2._owner_id == new_dn_2._owner_id == scenario.cycle.id
- assert dn_2._parent_ids == {task_1.id, task_2.id, new_task_2.id}
- assert new_dn_2._parent_ids == {task_1.id, task_2.id, new_task_2.id}
- assert dn_2._last_edit_date == new_dn_2._last_edit_date
- assert dn_2._edits == new_dn_2._edits
- assert len(dn_2._properties) == len(new_dn_2._properties)
- for k, v in dn_2._properties.items():
- assert new_dn_2._properties[k] == v
- dn_3 = scenario.data_nodes["dn_3"]
- new_dn_3 = new_scenario.data_nodes["dn_3"]
- assert dn_3.id != new_dn_3.id
- assert dn_3._config_id == new_dn_3._config_id == "dn_3"
- assert dn_3._scope == new_dn_3._scope == Scope.SCENARIO
- assert dn_3._owner_id == scenario.id
- assert new_dn_3._owner_id == new_scenario.id
- assert dn_3._parent_ids == {task_2.id, task_3.id}
- assert new_dn_3._parent_ids == {new_task_2.id, new_task_3.id}
- assert dn_3._last_edit_date == new_dn_3._last_edit_date
- assert dn_3._edits == new_dn_3._edits
- assert len(dn_3._properties) == len(new_dn_3._properties)
- for k, v in dn_3._properties.items():
- assert new_dn_3._properties[k] == v
- dn_4 = scenario.data_nodes["dn_4"]
- new_dn_4 = new_scenario.data_nodes["dn_4"]
- assert dn_4.id != new_dn_4.id
- assert dn_4._config_id == new_dn_4._config_id == "dn_4"
- assert dn_4._scope == new_dn_4._scope == Scope.SCENARIO
- assert dn_4._owner_id == scenario.id
- assert new_dn_4._owner_id == new_scenario.id
- assert dn_4._parent_ids == {task_3.id}
- assert new_dn_4._parent_ids == {new_task_3.id}
- assert dn_4._last_edit_date == new_dn_4._last_edit_date
- assert dn_4._edits == new_dn_4._edits
- assert len(dn_4._properties) == len(new_dn_4._properties)
- for k, v in dn_4._properties.items():
- assert new_dn_4._properties[k] == v
- def test_duplicate_to_new_cycle():
- dn_cfg_1 = Config.configure_pickle_data_node("dn_1", scope=Scope.GLOBAL)
- dn_cfg_2 = Config.configure_pickle_data_node("dn_2", scope=Scope.CYCLE)
- dn_cfg_3 = Config.configure_pickle_data_node("dn_3", scope=Scope.SCENARIO)
- dn_cfg_4 = Config.configure_pickle_data_node("dn_4", scope=Scope.SCENARIO)
- t_cfg_1 = Config.configure_task("task_1", identity, input=[dn_cfg_1], output=[dn_cfg_2])
- t_cfg_2 = Config.configure_task("task_2", identity, input=[dn_cfg_2], output=[dn_cfg_3])
- t_cfg_3 = Config.configure_task("task_3", identity, input=[dn_cfg_3], output=[dn_cfg_4])
- s_cfg_1 = Config.configure_scenario("scenario_1", [t_cfg_1, t_cfg_2, t_cfg_3], frequency=Frequency.DAILY)
- creation_date = datetime.now() - timedelta(days=2)
- name = "original"
- scenario = _ScenarioManager._create(s_cfg_1, creation_date, name)
- assert len(_CycleManager._get_all()) == 1
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_TaskManager._get_all()) == 3
- assert len(_DataManager._get_all()) == 4
- new_creation_date = datetime.now()
- new_name = "new"
- new_scenario = _ScenarioDuplicator(scenario, False).duplicate(new_creation_date, new_name)
- assert len(_CycleManager._get_all()) == 2
- assert len(_ScenarioManager._get_all()) == 2
- assert len(_TaskManager._get_all()) == 6
- assert len(_DataManager._get_all()) == 7
- # Check scenario attributes
- assert scenario.id != new_scenario.id
- assert scenario.config_id == new_scenario.config_id == "scenario_1"
- assert scenario.name == name
- assert new_scenario.name == new_name
- assert scenario.creation_date == creation_date
- assert new_scenario.creation_date == new_creation_date
- assert scenario.cycle.id != new_scenario.cycle.id
- # Check tasks attributes
- assert len(scenario.tasks) == len(new_scenario.tasks) == 3
- task_1 = scenario.tasks["task_1"]
- new_task_1 = new_scenario.tasks["task_1"]
- assert task_1 != new_task_1
- assert task_1.id != new_task_1.id
- assert task_1._config_id == new_task_1._config_id == "task_1"
- assert task_1._owner_id == scenario.cycle.id
- assert new_task_1._owner_id == new_scenario.cycle.id
- assert task_1._parent_ids == {scenario.id}
- assert new_task_1._parent_ids == {new_scenario.id}
- assert task_1._function == new_task_1._function
- assert task_1._skippable == new_task_1._skippable is False
- assert task_1._properties == new_task_1._properties == {}
- task_2 = scenario.tasks["task_2"]
- new_task_2 = new_scenario.tasks["task_2"]
- assert task_2.id != new_task_2.id
- assert task_2._config_id == new_task_2._config_id == "task_2"
- assert task_2._owner_id == scenario.id
- assert new_task_2._owner_id == new_scenario.id
- assert task_2._parent_ids == {scenario.id}
- assert new_task_2._parent_ids == {new_scenario.id}
- assert task_2._function == new_task_2._function
- assert task_2._skippable == new_task_2._skippable is False
- assert task_2._properties == new_task_2._properties == {}
- task_3 = scenario.tasks["task_3"]
- new_task_3 = new_scenario.tasks["task_3"]
- assert task_3.id != new_task_3.id
- assert task_3._config_id == new_task_3._config_id == "task_3"
- assert task_3._owner_id == scenario.id
- assert new_task_3._owner_id == new_scenario.id
- assert task_3._parent_ids == {scenario.id}
- assert new_task_3._parent_ids == {new_scenario.id}
- assert task_3._function == new_task_3._function
- assert task_3._skippable == new_task_3._skippable is False
- assert task_3._properties == new_task_3._properties == {}
- # Check data node attributes
- assert len(scenario.data_nodes) == len(new_scenario.data_nodes) == 4
- dn_1 = scenario.data_nodes["dn_1"]
- new_dn_1 = new_scenario.data_nodes["dn_1"]
- assert dn_1.id == new_dn_1.id
- assert dn_1._config_id == new_dn_1._config_id == "dn_1"
- assert dn_1._scope == new_dn_1._scope == Scope.GLOBAL
- assert dn_1._owner_id == new_dn_1._owner_id is None
- assert dn_1._parent_ids == new_dn_1._parent_ids == {task_1.id, new_task_1.id}
- assert dn_1._last_edit_date == new_dn_1._last_edit_date
- assert dn_1._edits == new_dn_1._edits
- assert len(dn_1._properties) == len(new_dn_1._properties)
- for k, v in dn_1._properties.items():
- assert new_dn_1._properties[k] == v
- dn_2 = scenario.dn_2
- new_dn_2 = new_scenario.dn_2
- assert dn_2.id != new_dn_2.id
- assert dn_2._config_id == new_dn_2._config_id == "dn_2"
- assert dn_2._scope == new_dn_2._scope == Scope.CYCLE
- assert dn_2._owner_id == scenario.cycle.id
- assert new_dn_2._owner_id == new_scenario.cycle.id
- assert dn_2._parent_ids == {task_1.id, task_2.id}
- assert new_dn_2._parent_ids == {new_task_1.id, new_task_2.id}
- assert dn_2._last_edit_date == new_dn_2._last_edit_date
- assert dn_2._edits == new_dn_2._edits
- assert len(dn_2._properties) == len(new_dn_2._properties)
- for k, v in dn_2._properties.items():
- assert new_dn_2._properties[k] == v
- dn_3 = scenario.data_nodes["dn_3"]
- new_dn_3 = new_scenario.data_nodes["dn_3"]
- assert dn_3.id != new_dn_3.id
- assert dn_3._config_id == new_dn_3._config_id == "dn_3"
- assert dn_3._scope == new_dn_3._scope == Scope.SCENARIO
- assert dn_3._owner_id == scenario.id
- assert new_dn_3._owner_id == new_scenario.id
- assert dn_3._parent_ids == {task_2.id, task_3.id}
- assert new_dn_3._parent_ids == {new_task_2.id, new_task_3.id}
- assert dn_3._last_edit_date == new_dn_3._last_edit_date
- assert dn_3._edits == new_dn_3._edits
- assert len(dn_3._properties) == len(new_dn_3._properties)
- for k, v in dn_3._properties.items():
- assert new_dn_3._properties[k] == v
- dn_4 = scenario.data_nodes["dn_4"]
- new_dn_4 = new_scenario.data_nodes["dn_4"]
- assert dn_4.id != new_dn_4.id
- assert dn_4._config_id == new_dn_4._config_id == "dn_4"
- assert dn_4._scope == new_dn_4._scope == Scope.SCENARIO
- assert dn_4._owner_id == scenario.id
- assert new_dn_4._owner_id == new_scenario.id
- assert dn_4._parent_ids == {task_3.id}
- assert new_dn_4._parent_ids == {new_task_3.id}
- assert dn_4._last_edit_date == new_dn_4._last_edit_date
- assert dn_4._edits == new_dn_4._edits
- assert len(dn_4._properties) == len(new_dn_4._properties)
- for k, v in dn_4._properties.items():
- assert new_dn_4._properties[k] == v
- def test_duplicate_to_new_cycle_with_existing_scenario():
- dn_cfg_1 = Config.configure_pickle_data_node("dn_1", scope=Scope.GLOBAL)
- dn_cfg_2 = Config.configure_pickle_data_node("dn_2", scope=Scope.CYCLE)
- dn_cfg_3 = Config.configure_pickle_data_node("dn_3", scope=Scope.SCENARIO)
- dn_cfg_4 = Config.configure_pickle_data_node("dn_4", scope=Scope.SCENARIO)
- t_cfg_1 = Config.configure_task("task_1", identity, input=[dn_cfg_1], output=[dn_cfg_2])
- t_cfg_2 = Config.configure_task("task_2", identity, input=[dn_cfg_2], output=[dn_cfg_3])
- t_cfg_3 = Config.configure_task("task_3", identity, input=[dn_cfg_3], output=[dn_cfg_4])
- s_cfg_1 = Config.configure_scenario("scenario_1", [t_cfg_1, t_cfg_2, t_cfg_3], frequency=Frequency.DAILY)
- creation_date = datetime.now() - timedelta(days=2)
- new_creation_date = datetime.now()
- name = "original"
- existing_name = "existing"
- new_name = "new"
- scenario = _ScenarioManager._create(s_cfg_1, creation_date, name)
- existing_scenario = _ScenarioManager._create(s_cfg_1, new_creation_date, existing_name)
- assert len(_CycleManager._get_all()) == 2
- assert len(_ScenarioManager._get_all()) == 2
- assert len(_TaskManager._get_all()) == 6
- assert len(_DataManager._get_all()) == 7
- new_scenario = _ScenarioDuplicator(scenario, False).duplicate(new_creation_date, new_name)
- assert len(_CycleManager._get_all()) == 2
- assert len(_ScenarioManager._get_all()) == 3
- assert len(_TaskManager._get_all()) == 8
- assert len(_DataManager._get_all()) == 9
- # Check scenario attributes
- assert scenario.id != new_scenario.id != existing_scenario.id
- assert scenario.config_id == new_scenario.config_id == "scenario_1"
- assert scenario.name == name
- assert new_scenario.name == new_name
- assert scenario.creation_date == creation_date
- assert new_scenario.creation_date == new_creation_date
- assert scenario.cycle.id != new_scenario.cycle.id == existing_scenario.cycle.id
- # Check tasks attributes
- assert len(scenario.tasks) == len(new_scenario.tasks) == 3
- task_1 = scenario.tasks["task_1"]
- existing_task_1 = existing_scenario.tasks["task_1"]
- new_task_1 = new_scenario.tasks["task_1"]
- assert existing_task_1 == new_task_1
- assert existing_task_1.id == new_task_1.id
- assert task_1._config_id == existing_task_1._config_id == new_task_1._config_id == "task_1"
- assert task_1._owner_id == scenario.cycle.id
- assert existing_task_1._owner_id == new_task_1._owner_id == existing_scenario.cycle.id
- assert task_1._parent_ids == {scenario.id}
- assert existing_task_1._parent_ids == new_task_1._parent_ids == {existing_scenario.id, new_scenario.id}
- assert task_1._function == existing_task_1._function == new_task_1._function
- assert task_1._skippable == existing_task_1._skippable == new_task_1._skippable is False
- assert task_1._properties == existing_task_1._properties == new_task_1._properties == {}
- task_2 = scenario.tasks["task_2"]
- existing_task_2 = existing_scenario.tasks["task_2"]
- new_task_2 = new_scenario.tasks["task_2"]
- assert task_2.id != new_task_2.id
- assert task_2._config_id == new_task_2._config_id == "task_2"
- assert task_2._owner_id == scenario.id
- assert new_task_2._owner_id == new_scenario.id
- assert task_2._parent_ids == {scenario.id}
- assert new_task_2._parent_ids == {new_scenario.id}
- assert task_2._function == new_task_2._function
- assert task_2._skippable == new_task_2._skippable is False
- assert task_2._properties == new_task_2._properties == {}
- task_3 = scenario.tasks["task_3"]
- new_task_3 = new_scenario.tasks["task_3"]
- assert task_3.id != new_task_3.id
- assert task_3._config_id == new_task_3._config_id == "task_3"
- assert task_3._owner_id == scenario.id
- assert new_task_3._owner_id == new_scenario.id
- assert task_3._parent_ids == {scenario.id}
- assert new_task_3._parent_ids == {new_scenario.id}
- assert task_3._function == new_task_3._function
- assert task_3._skippable == new_task_3._skippable is False
- assert task_3._properties == new_task_3._properties == {}
- # Check data node attributes
- assert len(scenario.data_nodes) == len(new_scenario.data_nodes) == 4
- dn_1 = scenario.data_nodes["dn_1"]
- new_dn_1 = new_scenario.data_nodes["dn_1"]
- assert dn_1.id == new_dn_1.id
- assert dn_1._config_id == new_dn_1._config_id == "dn_1"
- assert dn_1._scope == new_dn_1._scope == Scope.GLOBAL
- assert dn_1._owner_id == new_dn_1._owner_id is None
- assert dn_1._parent_ids == new_dn_1._parent_ids == {task_1.id, new_task_1.id}
- assert dn_1._last_edit_date == new_dn_1._last_edit_date
- assert dn_1._edits == new_dn_1._edits
- assert len(dn_1._properties) == len(new_dn_1._properties)
- for k, v in dn_1._properties.items():
- assert new_dn_1._properties[k] == v
- dn_2 = scenario.dn_2
- existing_dn_2 = existing_scenario.dn_2
- new_dn_2 = new_scenario.dn_2
- assert dn_2.id != new_dn_2.id == existing_dn_2.id
- assert dn_2._config_id == existing_dn_2._config_id == new_dn_2._config_id == "dn_2"
- assert dn_2._scope == existing_dn_2._scope == new_dn_2._scope == Scope.CYCLE
- assert dn_2._owner_id == scenario.cycle.id
- assert existing_dn_2._owner_id == new_dn_2._owner_id == new_scenario.cycle.id
- assert dn_2._parent_ids == {task_1.id, task_2.id}
- assert existing_dn_2._parent_ids == new_dn_2._parent_ids == {new_task_1.id, new_task_2.id, existing_task_2.id}
- assert dn_2._last_edit_date == new_dn_2._last_edit_date
- assert dn_2._edits == new_dn_2._edits
- assert len(dn_2._properties) == len(new_dn_2._properties)
- for k, v in dn_2._properties.items():
- if k == "path":
- assert new_dn_2._properties[k] == existing_dn_2._properties[k] != v
- else:
- assert new_dn_2._properties[k] == existing_dn_2._properties[k] == v
- dn_3 = scenario.data_nodes["dn_3"]
- existing_dn_3 = existing_scenario.data_nodes["dn_3"]
- new_dn_3 = new_scenario.data_nodes["dn_3"]
- assert dn_3.id != new_dn_3.id != existing_dn_3.id
- assert dn_3._config_id == new_dn_3._config_id == existing_dn_3._config_id == "dn_3"
- assert dn_3._scope == new_dn_3._scope == existing_dn_3._scope == Scope.SCENARIO
- assert dn_3._owner_id == scenario.id
- assert existing_dn_3._owner_id != new_dn_3._owner_id == new_scenario.id
- assert dn_3._parent_ids == {task_2.id, task_3.id}
- assert new_dn_3._parent_ids == {new_task_2.id, new_task_3.id}
- assert dn_3._last_edit_date == new_dn_3._last_edit_date
- assert dn_3._edits == new_dn_3._edits
- assert len(dn_3._properties) == len(new_dn_3._properties)
- for k, v in dn_3._properties.items():
- assert new_dn_3._properties[k] == v
- dn_4 = scenario.data_nodes["dn_4"]
- new_dn_4 = new_scenario.data_nodes["dn_4"]
- assert dn_4.id != new_dn_4.id
- assert dn_4._config_id == new_dn_4._config_id == "dn_4"
- assert dn_4._scope == new_dn_4._scope == Scope.SCENARIO
- assert dn_4._owner_id == scenario.id
- assert new_dn_4._owner_id == new_scenario.id
- assert dn_4._parent_ids == {task_3.id}
- assert new_dn_4._parent_ids == {new_task_3.id}
- assert dn_4._last_edit_date == new_dn_4._last_edit_date
- assert dn_4._edits == new_dn_4._edits
- assert len(dn_4._properties) == len(new_dn_4._properties)
- for k, v in dn_4._properties.items():
- assert new_dn_4._properties[k] == v
- def test_duplicate_with_all_global_dn():
- dn_config_1 = Config.configure_pickle_data_node("dn_1", scope=Scope.GLOBAL)
- dn_config_2 = Config.configure_pickle_data_node("dn_2", scope=Scope.GLOBAL)
- task_config_1 = Config.configure_task("task_1", print, [dn_config_1], [dn_config_2])
- scenario_config_1 = Config.configure_scenario("scenario_1", [task_config_1], frequency=Frequency.DAILY)
- scenario = _ScenarioManager._create(scenario_config_1, datetime.now() - timedelta(days=10), "original")
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_DataManager._get_all()) == 2
- assert len(_TaskManager._get_all()) == 1
- new_scenario = _ScenarioDuplicator(scenario, False).duplicate(datetime.now(), "new")
- assert len(_ScenarioManager._get_all()) == 2
- assert len(_DataManager._get_all()) == 2
- assert len(_TaskManager._get_all()) == 1
- # Check scenario attributes
- assert scenario.id != new_scenario.id
- assert scenario.config_id == new_scenario.config_id == "scenario_1"
- assert scenario.name == "original"
- assert new_scenario.name == "new"
- assert scenario.creation_date != new_scenario.creation_date
- assert scenario.cycle.id != new_scenario.cycle.id
- # Check tasks attributes
- assert len(scenario.tasks) == len(new_scenario.tasks) == 1
- task_1 = scenario.tasks["task_1"]
- new_task_1 = new_scenario.tasks["task_1"]
- assert task_1 == new_task_1
- assert task_1.id == new_task_1.id
- assert task_1._config_id == new_task_1._config_id == "task_1"
- assert task_1._owner_id == new_task_1._owner_id is None
- assert task_1._parent_ids == new_task_1._parent_ids == {scenario.id, new_scenario.id}
- assert task_1._function == new_task_1._function
- assert task_1._skippable == new_task_1._skippable is False
- assert task_1._properties == new_task_1._properties == {}
- # Check data node attributes
- assert len(scenario.data_nodes) == len(new_scenario.data_nodes) == 2
- dn_1 = scenario.data_nodes["dn_1"]
- new_dn_1 = new_scenario.data_nodes["dn_1"]
- assert dn_1.id == new_dn_1.id
- assert dn_1._config_id == new_dn_1._config_id == "dn_1"
- assert dn_1._scope == new_dn_1._scope == Scope.GLOBAL
- assert dn_1._owner_id == new_dn_1._owner_id is None
- assert dn_1._parent_ids == new_dn_1._parent_ids == {task_1.id, new_task_1.id}
- assert dn_1._last_edit_date == new_dn_1._last_edit_date
- assert dn_1._edits == new_dn_1._edits
- assert len(dn_1._properties) == len(new_dn_1._properties)
- for k, v in dn_1._properties.items():
- assert new_dn_1._properties[k] == v
- # Check data node attributes
- dn_2 = scenario.dn_2
- new_dn_2 = new_scenario.dn_2
- assert dn_2.id == new_dn_2.id
- assert dn_2._config_id == new_dn_2._config_id == "dn_2"
- assert dn_2._scope == new_dn_2._scope == Scope.GLOBAL
- assert dn_2._owner_id == new_dn_2._owner_id is None
- assert dn_2._parent_ids == {task_1.id}
- assert dn_2._last_edit_date == new_dn_2._last_edit_date
- assert dn_2._edits == new_dn_2._edits
- assert len(dn_2._properties) == len(new_dn_2._properties)
- for k, v in dn_2._properties.items():
- assert new_dn_2._properties[k] == v
|