123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import datetime
- import os
- from unittest import mock
- import pytest
- from taipy import Scope
- from taipy.common.config import Config
- from taipy.common.config.exceptions.exceptions import ConfigurationUpdateBlocked
- from taipy.core import MongoDefaultDocument
- from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
- from taipy.core.config import DataNodeConfig
- from taipy.core.config.job_config import JobConfig
- def test_data_node_config_default_parameter():
- csv_dn_cfg = Config.configure_data_node("data_node_1", "csv")
- assert csv_dn_cfg.scope == Scope.SCENARIO
- assert csv_dn_cfg.has_header is True
- assert csv_dn_cfg.exposed_type == "pandas"
- assert csv_dn_cfg.validity_period is None
- json_dn_cfg = Config.configure_data_node("data_node_2", "json")
- assert json_dn_cfg.scope == Scope.SCENARIO
- assert json_dn_cfg.validity_period is None
- parquet_dn_cfg = Config.configure_data_node("data_node_3", "parquet")
- assert parquet_dn_cfg.scope == Scope.SCENARIO
- assert parquet_dn_cfg.engine == "pyarrow"
- assert parquet_dn_cfg.compression == "snappy"
- assert parquet_dn_cfg.exposed_type == "pandas"
- assert parquet_dn_cfg.validity_period is None
- excel_dn_cfg = Config.configure_data_node("data_node_4", "excel")
- assert excel_dn_cfg.scope == Scope.SCENARIO
- assert excel_dn_cfg.has_header is True
- assert excel_dn_cfg.exposed_type == "pandas"
- assert excel_dn_cfg.validity_period is None
- generic_dn_cfg = Config.configure_data_node("data_node_5", "generic")
- assert generic_dn_cfg.scope == Scope.SCENARIO
- assert generic_dn_cfg.validity_period is None
- in_memory_dn_cfg = Config.configure_data_node("data_node_6", "in_memory")
- assert in_memory_dn_cfg.scope == Scope.SCENARIO
- assert in_memory_dn_cfg.validity_period is None
- pickle_dn_cfg = Config.configure_data_node("data_node_7", "pickle")
- assert pickle_dn_cfg.scope == Scope.SCENARIO
- assert pickle_dn_cfg.validity_period is None
- sql_table_dn_cfg = Config.configure_data_node(
- "data_node_8", "sql_table", db_name="test", db_engine="mssql", table_name="test"
- )
- assert sql_table_dn_cfg.scope == Scope.SCENARIO
- assert sql_table_dn_cfg.db_host == "localhost"
- assert sql_table_dn_cfg.db_port == 1433
- assert sql_table_dn_cfg.db_driver == ""
- assert sql_table_dn_cfg.sqlite_file_extension == ".db"
- assert sql_table_dn_cfg.exposed_type == "pandas"
- assert sql_table_dn_cfg.validity_period is None
- sql_dn_cfg = Config.configure_data_node(
- "data_node_9", "sql", db_name="test", db_engine="mssql", read_query="test", write_query_builder=print
- )
- assert sql_dn_cfg.scope == Scope.SCENARIO
- assert sql_dn_cfg.db_host == "localhost"
- assert sql_dn_cfg.db_port == 1433
- assert sql_dn_cfg.db_driver == ""
- assert sql_dn_cfg.sqlite_file_extension == ".db"
- assert sql_dn_cfg.exposed_type == "pandas"
- assert sql_dn_cfg.validity_period is None
- mongo_dn_cfg = Config.configure_data_node(
- "data_node_10", "mongo_collection", db_name="test", collection_name="test"
- )
- assert mongo_dn_cfg.scope == Scope.SCENARIO
- assert mongo_dn_cfg.db_host == "localhost"
- assert mongo_dn_cfg.db_port == 27017
- assert mongo_dn_cfg.custom_document == MongoDefaultDocument
- assert mongo_dn_cfg.db_username == ""
- assert mongo_dn_cfg.db_password == ""
- assert mongo_dn_cfg.db_driver == ""
- assert mongo_dn_cfg.validity_period is None
- aws_s3_object_dn_cfg = Config.configure_data_node(
- "data_node_11",
- "s3_object",
- aws_access_key="test",
- aws_secret_access_key="test_secret",
- aws_s3_bucket_name="test_bucket",
- aws_s3_object_key="test_file.txt",
- )
- assert aws_s3_object_dn_cfg.scope == Scope.SCENARIO
- assert aws_s3_object_dn_cfg.aws_access_key == "test"
- assert aws_s3_object_dn_cfg.aws_secret_access_key == "test_secret"
- assert aws_s3_object_dn_cfg.aws_s3_bucket_name == "test_bucket"
- assert aws_s3_object_dn_cfg.aws_s3_object_key == "test_file.txt"
- assert aws_s3_object_dn_cfg.aws_region is None
- assert aws_s3_object_dn_cfg.aws_s3_object_parameters is None
- assert aws_s3_object_dn_cfg.validity_period is None
- def test_data_node_config_check(caplog):
- data_node_config = Config.configure_data_node("data_nodes1", "pickle")
- assert list(Config.data_nodes) == [DataNodeConfig._DEFAULT_KEY, data_node_config.id]
- data_node2_config = Config.configure_data_node("data_nodes2", "pickle")
- assert list(Config.data_nodes) == [DataNodeConfig._DEFAULT_KEY, data_node_config.id, data_node2_config.id]
- data_node3_config = Config.configure_data_node("data_nodes3", "csv", has_header=True, default_path="")
- assert list(Config.data_nodes) == [
- "default",
- data_node_config.id,
- data_node2_config.id,
- data_node3_config.id,
- ]
- with pytest.raises(SystemExit):
- Config.configure_data_node("data_nodes", storage_type="bar")
- Config.check()
- expected_error_message = (
- "`storage_type` field of DataNodeConfig `data_nodes` must be either csv, sql_table,"
- " sql, mongo_collection, pickle, excel, generic, json, parquet, s3_object, or in_memory. Current"
- ' value of property `storage_type` is "bar".'
- )
- assert expected_error_message in caplog.text
- with pytest.raises(SystemExit):
- Config.configure_data_node("data_nodes", scope="bar")
- Config.check()
- expected_error_message = (
- "`scope` field of DataNodeConfig `data_nodes` must be populated with a Scope value."
- ' Current value of property `scope` is "bar".'
- )
- assert expected_error_message in caplog.text
- with pytest.raises(TypeError):
- Config.configure_data_node("data_nodes", storage_type="sql")
- with pytest.raises(SystemExit):
- Config.configure_data_node("data_nodes", storage_type="generic")
- Config.check()
- expected_error_message = (
- "`storage_type` field of DataNodeConfig `data_nodes` must be either csv, sql_table,"
- " sql, mongo_collection, pickle, excel, generic, json, parquet, s3_object, or in_memory."
- ' Current value of property `storage_type` is "bar".'
- )
- assert expected_error_message in caplog.text
- def test_configure_data_node_from_another_configuration():
- d1_cfg = Config.configure_sql_table_data_node(
- "d1",
- db_username="foo",
- db_password="bar",
- db_name="db",
- db_engine="mssql",
- db_port=8080,
- db_host="somewhere",
- table_name="foo",
- scope=Scope.GLOBAL,
- foo="bar",
- )
- d2_cfg = Config.configure_data_node_from(
- source_configuration=d1_cfg,
- id="d2",
- table_name="table_2",
- )
- assert d2_cfg.id == "d2"
- assert d2_cfg.storage_type == "sql_table"
- assert d2_cfg.scope == Scope.GLOBAL
- assert d2_cfg.validity_period is None
- assert d2_cfg.db_username == "foo"
- assert d2_cfg.db_password == "bar"
- assert d2_cfg.db_name == "db"
- assert d2_cfg.db_engine == "mssql"
- assert d2_cfg.db_port == 8080
- assert d2_cfg.db_host == "somewhere"
- assert d2_cfg.table_name == "table_2"
- assert d2_cfg.foo == "bar"
- d3_cfg = Config.configure_data_node_from(
- source_configuration=d1_cfg,
- id="d3",
- scope=Scope.SCENARIO,
- validity_period=datetime.timedelta(days=1),
- table_name="table_3",
- foo="baz",
- )
- assert d3_cfg.id == "d3"
- assert d3_cfg.storage_type == "sql_table"
- assert d3_cfg.scope == Scope.SCENARIO
- assert d3_cfg.validity_period == datetime.timedelta(days=1)
- assert d3_cfg.db_username == "foo"
- assert d3_cfg.db_password == "bar"
- assert d3_cfg.db_name == "db"
- assert d3_cfg.db_engine == "mssql"
- assert d3_cfg.db_port == 8080
- assert d3_cfg.db_host == "somewhere"
- assert d3_cfg.table_name == "table_3"
- assert d3_cfg.foo == "baz"
- def test_data_node_count():
- Config.configure_data_node("data_nodes1", "pickle")
- assert len(Config.data_nodes) == 2
- Config.configure_data_node("data_nodes2", "pickle")
- assert len(Config.data_nodes) == 3
- Config.configure_data_node("data_nodes3", "pickle")
- assert len(Config.data_nodes) == 4
- def test_data_node_getitem():
- data_node_id = "data_nodes1"
- data_node_config = Config.configure_data_node(data_node_id, "pickle", default_path="foo.p")
- assert Config.data_nodes[data_node_id].id == data_node_config.id
- assert Config.data_nodes[data_node_id].default_path == "foo.p"
- assert Config.data_nodes[data_node_id].storage_type == data_node_config.storage_type
- assert Config.data_nodes[data_node_id].scope == data_node_config.scope
- assert Config.data_nodes[data_node_id].properties == data_node_config.properties
- def test_data_node_creation_no_duplication():
- Config.configure_data_node("data_nodes1", "pickle")
- assert len(Config.data_nodes) == 2
- Config.configure_data_node("data_nodes1", "pickle")
- assert len(Config.data_nodes) == 2
- def test_date_node_create_with_datetime():
- data_node_config = Config.configure_data_node(
- id="datetime_data",
- my_property=datetime.datetime(1991, 1, 1),
- foo="hello",
- test=1,
- test_dict={"type": "Datetime", 2: "daw"},
- )
- assert data_node_config.foo == "hello"
- assert data_node_config.my_property == datetime.datetime(1991, 1, 1)
- assert data_node_config.test == 1
- assert data_node_config.test_dict.get("type") == "Datetime"
- def test_data_node_with_env_variable_value():
- with mock.patch.dict(os.environ, {"FOO": "pickle", "BAR": "baz"}):
- Config.configure_data_node("data_node", storage_type="ENV[FOO]", prop="ENV[BAR]")
- assert Config.data_nodes["data_node"].prop == "baz"
- assert Config.data_nodes["data_node"].properties["prop"] == "baz"
- assert Config.data_nodes["data_node"]._properties["prop"] == "ENV[BAR]"
- assert Config.data_nodes["data_node"].storage_type == "pickle"
- assert Config.data_nodes["data_node"]._storage_type == "ENV[FOO]"
- def test_data_node_with_env_variable_in_write_fct_args():
- def read_fct(): ...
- def write_fct(): ...
- with mock.patch.dict(os.environ, {"FOO": "bar", "BAZ": "qux"}):
- Config.configure_data_node(
- "data_node",
- storage_type="generic",
- read_fct=read_fct,
- write_fct=write_fct,
- write_fct_args=["ENV[FOO]", "my_param", "ENV[BAZ]"],
- )
- assert Config.data_nodes["data_node"].write_fct_args == ["bar", "my_param", "qux"]
- def test_data_node_with_env_variable_in_read_fct_args():
- def read_fct(): ...
- def write_fct(): ...
- with mock.patch.dict(os.environ, {"FOO": "bar", "BAZ": "qux"}):
- Config.configure_data_node(
- "data_node",
- storage_type="generic",
- read_fct=read_fct,
- write_fct=write_fct,
- read_fct_args=["ENV[FOO]", "my_param", "ENV[BAZ]"],
- )
- assert Config.data_nodes["data_node"].read_fct_args == ["bar", "my_param", "qux"]
- def test_block_datanode_config_update_in_development_mode():
- data_node_id = "data_node_id"
- Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
- data_node_config = Config.configure_data_node(
- id=data_node_id,
- storage_type="pickle",
- default_path="foo.p",
- scope=Scope.SCENARIO,
- )
- assert Config.data_nodes[data_node_id].id == data_node_id
- assert Config.data_nodes[data_node_id].default_path == "foo.p"
- assert Config.data_nodes[data_node_id].storage_type == "pickle"
- assert Config.data_nodes[data_node_id].scope == Scope.SCENARIO
- assert Config.data_nodes[data_node_id].properties == {"default_path": "foo.p"}
- _OrchestratorFactory._build_dispatcher()
- with pytest.raises(ConfigurationUpdateBlocked):
- data_node_config.storage_type = "foo"
- with pytest.raises(ConfigurationUpdateBlocked):
- data_node_config.scope = Scope.SCENARIO
- with pytest.raises(ConfigurationUpdateBlocked):
- data_node_config.properties = {"foo": "bar"}
- assert Config.data_nodes[data_node_id].id == data_node_id
- assert Config.data_nodes[data_node_id].default_path == "foo.p"
- assert Config.data_nodes[data_node_id].storage_type == "pickle"
- assert Config.data_nodes[data_node_id].scope == Scope.SCENARIO
- assert Config.data_nodes[data_node_id].properties == {"default_path": "foo.p"}
- def test_block_datanode_config_update_in_standalone_mode():
- data_node_id = "data_node_id"
- Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE)
- data_node_config = Config.configure_data_node(
- id=data_node_id,
- storage_type="pickle",
- default_path="foo.p",
- scope=Scope.SCENARIO,
- )
- assert Config.data_nodes[data_node_id].id == data_node_id
- assert Config.data_nodes[data_node_id].default_path == "foo.p"
- assert Config.data_nodes[data_node_id].storage_type == "pickle"
- assert Config.data_nodes[data_node_id].scope == Scope.SCENARIO
- assert Config.data_nodes[data_node_id].properties == {"default_path": "foo.p"}
- _OrchestratorFactory._build_dispatcher()
- with pytest.raises(ConfigurationUpdateBlocked):
- data_node_config.storage_type = "foo"
- with pytest.raises(ConfigurationUpdateBlocked):
- data_node_config.scope = Scope.SCENARIO
- with pytest.raises(ConfigurationUpdateBlocked):
- data_node_config.properties = {"foo": "bar"}
- assert Config.data_nodes[data_node_id].id == data_node_id
- assert Config.data_nodes[data_node_id].default_path == "foo.p"
- assert Config.data_nodes[data_node_id].storage_type == "pickle"
- assert Config.data_nodes[data_node_id].scope == Scope.SCENARIO
- assert Config.data_nodes[data_node_id].properties == {"default_path": "foo.p"}
- def test_clean_config():
- dn1_config = Config.configure_data_node(
- id="id1",
- storage_type="csv",
- default_path="foo.p",
- scope=Scope.GLOBAL,
- validity_period=datetime.timedelta(2),
- )
- dn2_config = Config.configure_data_node(
- id="id2",
- storage_type="json",
- default_path="bar.json",
- scope=Scope.GLOBAL,
- validity_period=datetime.timedelta(2),
- )
- assert Config.data_nodes["id1"] is dn1_config
- assert Config.data_nodes["id2"] is dn2_config
- dn1_config._clean()
- dn2_config._clean()
- # Check if the instance before and after _clean() is the same
- assert Config.data_nodes["id1"] is dn1_config
- assert Config.data_nodes["id2"] is dn2_config
- # Check if the value is similar to the default_config, but with difference instances
- assert dn1_config.id == "id1"
- assert dn2_config.id == "id2"
- assert dn1_config.storage_type == dn2_config.storage_type == "pickle"
- assert dn1_config.scope == dn2_config.scope == Scope.SCENARIO
- assert dn1_config.validity_period is dn2_config.validity_period is None
- assert dn1_config.default_path is dn2_config.default_path is None
- assert dn1_config.properties == dn2_config.properties == {}
- def test_normalize_path():
- data_node_config = Config.configure_data_node(id="data_nodes1", storage_type="csv", path=r"data\file.csv")
- assert data_node_config.path == "data/file.csv"
|