123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- # Copyright 2021-2024 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import os
- from datetime import timedelta
- from unittest import mock
- from taipy.common.config import Config
- from taipy.common.config.common.frequency import Frequency
- from taipy.common.config.common.scope import Scope
- from taipy.core.config import DataNodeConfig, ScenarioConfig, TaskConfig
- from taipy.core.config.core_section import CoreSection
- from tests.core.utils.named_temporary_file import NamedTemporaryFile
- def test_write_configuration_file():
- expected_config = f"""
- [TAIPY]
- [JOB]
- mode = "standalone"
- max_nb_of_workers = "2:int"
- [CORE]
- root_folder = "./taipy/"
- storage_folder = "user_data/"
- taipy_storage_folder = ".taipy/"
- repository_type = "filesystem"
- read_entity_retry = "0:int"
- mode = "development"
- version_number = ""
- force = "False:bool"
- core_version = "{CoreSection._CURRENT_CORE_VERSION}"
- [DATA_NODE.default]
- storage_type = "in_memory"
- scope = "SCENARIO:SCOPE"
- validity_period = "1d0h0m0s:timedelta"
- custom = "default_custom_prop"
- [DATA_NODE.dn1]
- storage_type = "pickle"
- scope = "SCENARIO:SCOPE"
- validity_period = "1d0h0m0s:timedelta"
- custom = "custom property"
- default_data = "dn1"
- [DATA_NODE.dn2]
- storage_type = "ENV[FOO]"
- scope = "SCENARIO:SCOPE"
- validity_period = "2d0h0m0s:timedelta"
- foo = "bar"
- default_data = "dn2"
- baz = "ENV[QUX]"
- quux = "ENV[QUUZ]:bool"
- corge = [ "grault", "ENV[GARPLY]", "ENV[WALDO]:int", "3.0:float",]
- [DATA_NODE.dn3]
- storage_type = "ENV[FOO]"
- scope = "SCENARIO:SCOPE"
- validity_period = "1d0h0m0s:timedelta"
- foo = "bar"
- default_data = "dn3"
- quux = "ENV[QUUZ]:bool"
- [TASK.default]
- inputs = []
- outputs = []
- skippable = "False:bool"
- [TASK.t1]
- function = "builtins.print:function"
- inputs = [ "dn1:SECTION",]
- outputs = [ "dn2:SECTION",]
- skippable = "False:bool"
- description = "t1 description"
- [SCENARIO.default]
- tasks = []
- additional_data_nodes = []
- frequency = "QUARTERLY:FREQUENCY"
- owner = "Michel Platini"
- [SCENARIO.s1]
- tasks = [ "t1:SECTION",]
- additional_data_nodes = [ "dn3:SECTION",]
- frequency = "QUARTERLY:FREQUENCY"
- owner = "Raymond Kopa"
- [SCENARIO.default.comparators]
- [SCENARIO.default.sequences]
- [SCENARIO.s1.comparators]
- [SCENARIO.s1.sequences]
- sequence = [ "t1:SECTION",]
- """.strip()
- tf = NamedTemporaryFile()
- with mock.patch.dict(
- os.environ, {"FOO": "in_memory", "QUX": "qux", "QUUZ": "true", "GARPLY": "garply", "WALDO": "17"}
- ):
- Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)
- Config.set_default_data_node_configuration(
- storage_type="in_memory",
- custom="default_custom_prop",
- validity_period=timedelta(1),
- )
- dn1_cfg_v2 = Config.configure_data_node(
- "dn1", storage_type="pickle", scope=Scope.SCENARIO, default_data="dn1", custom="custom property"
- )
- dn2_cfg_v2 = Config.configure_data_node(
- "dn2",
- storage_type="ENV[FOO]",
- validity_period=timedelta(2),
- foo="bar",
- default_data="dn2",
- baz="ENV[QUX]",
- quux="ENV[QUUZ]:bool",
- corge=("grault", "ENV[GARPLY]", "ENV[WALDO]:int", 3.0),
- )
- dn3_cfg_v2 = Config.configure_data_node(
- "dn3",
- storage_type="ENV[FOO]",
- foo="bar",
- default_data="dn3",
- quux="ENV[QUUZ]:bool",
- )
- assert dn2_cfg_v2.scope == Scope.SCENARIO
- t1_cfg_v2 = Config.configure_task("t1", print, dn1_cfg_v2, dn2_cfg_v2, description="t1 description")
- Config.set_default_scenario_configuration([], [], Frequency.QUARTERLY, owner="Michel Platini")
- Config.configure_scenario(
- "s1",
- task_configs=[t1_cfg_v2],
- additional_data_node_configs=[dn3_cfg_v2],
- frequency=Frequency.QUARTERLY,
- owner="Raymond Kopa",
- sequences={"sequence": [t1_cfg_v2]},
- )
- Config.backup(tf.filename)
- actual_config = tf.read().strip() # problem here
- assert actual_config == expected_config
- Config.override(tf.filename)
- tf2 = NamedTemporaryFile()
- Config.backup(tf2.filename)
- actual_config_2 = tf2.read().strip()
- assert actual_config_2 == expected_config
- def test_read_configuration_file():
- file_config = NamedTemporaryFile(
- """
- [DATA_NODE.default]
- has_header = true
- [DATA_NODE.my_datanode]
- path = "/data/csv"
- validity_period = "1d0h0m0s:timedelta"
- [DATA_NODE.my_datanode2]
- path = "/data2/csv"
- [DATA_NODE.my_datanode3]
- path = "/data3/csv"
- source = "local"
- [TASK.my_task]
- inputs = ["my_datanode:SECTION"]
- outputs = ["my_datanode2:SECTION"]
- description = "task description"
- [SCENARIO.my_scenario]
- tasks = [ "my_task:SECTION"]
- additional_data_nodes = ["my_datanode3:SECTION"]
- owner = "John Doe"
- [SCENARIO.my_scenario.sequences]
- sequence = [ "my_task:SECTION",]
- """
- )
- Config.configure_task("my_task", print)
- Config.override(file_config.filename)
- assert len(Config.data_nodes) == 4
- assert type(Config.data_nodes["my_datanode"]) is DataNodeConfig
- assert type(Config.data_nodes["my_datanode2"]) is DataNodeConfig
- assert type(Config.data_nodes["my_datanode3"]) is DataNodeConfig
- assert Config.data_nodes["my_datanode"].path == "/data/csv"
- assert Config.data_nodes["my_datanode2"].path == "/data2/csv"
- assert Config.data_nodes["my_datanode3"].path == "/data3/csv"
- assert Config.data_nodes["my_datanode"].id == "my_datanode"
- assert Config.data_nodes["my_datanode2"].id == "my_datanode2"
- assert Config.data_nodes["my_datanode3"].id == "my_datanode3"
- assert Config.data_nodes["my_datanode"].validity_period == timedelta(1)
- assert Config.data_nodes["my_datanode3"].source == "local"
- assert len(Config.tasks) == 2
- assert type(Config.tasks["my_task"]) is TaskConfig
- assert Config.tasks["my_task"].id == "my_task"
- assert Config.tasks["my_task"].description == "task description"
- assert Config.tasks["my_task"].function == print
- assert len(Config.tasks["my_task"].inputs) == 1
- assert type(Config.tasks["my_task"].inputs[0]) is DataNodeConfig
- assert Config.tasks["my_task"].inputs[0].path == "/data/csv"
- assert Config.tasks["my_task"].inputs[0].id == "my_datanode"
- assert len(Config.tasks["my_task"].outputs) == 1
- assert type(Config.tasks["my_task"].outputs[0]) is DataNodeConfig
- assert Config.tasks["my_task"].outputs[0].path == "/data2/csv"
- assert Config.tasks["my_task"].outputs[0].id == "my_datanode2"
- assert len(Config.scenarios) == 2
- assert type(Config.scenarios["my_scenario"]) is ScenarioConfig
- assert Config.scenarios["my_scenario"].id == "my_scenario"
- assert Config.scenarios["my_scenario"].owner == "John Doe"
- assert len(Config.scenarios["my_scenario"].tasks) == 1
- assert type(Config.scenarios["my_scenario"].tasks[0]) is TaskConfig
- assert len(Config.scenarios["my_scenario"].additional_data_nodes) == 1
- assert type(Config.scenarios["my_scenario"].additional_data_nodes[0]) is DataNodeConfig
- assert Config.scenarios["my_scenario"].tasks[0].id == "my_task"
- assert Config.scenarios["my_scenario"].tasks[0].description == "task description"
- assert Config.scenarios["my_scenario"].additional_data_nodes[0].id == "my_datanode3"
- assert Config.scenarios["my_scenario"].additional_data_nodes[0].source == "local"
- assert [task.id for task in Config.scenarios["my_scenario"].sequences["sequence"]] == [
- Config.scenarios["my_scenario"].tasks[0].id
- ]
|