test_data_node_config.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import datetime
  12. import os
  13. from unittest import mock
  14. import pytest
  15. from taipy.common.config import Config
  16. from taipy.common.config.common.scope import Scope
  17. from taipy.common.config.exceptions.exceptions import ConfigurationUpdateBlocked
  18. from taipy.core import MongoDefaultDocument
  19. from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
  20. from taipy.core.config import DataNodeConfig
  21. from taipy.core.config.job_config import JobConfig
  22. def test_data_node_config_default_parameter():
  23. csv_dn_cfg = Config.configure_data_node("data_node_1", "csv")
  24. assert csv_dn_cfg.scope == Scope.SCENARIO
  25. assert csv_dn_cfg.has_header is True
  26. assert csv_dn_cfg.exposed_type == "pandas"
  27. assert csv_dn_cfg.validity_period is None
  28. json_dn_cfg = Config.configure_data_node("data_node_2", "json")
  29. assert json_dn_cfg.scope == Scope.SCENARIO
  30. assert json_dn_cfg.validity_period is None
  31. # parquet_dn_cfg = Config.configure_data_node("data_node_3", "parquet")
  32. # assert parquet_dn_cfg.scope == Scope.SCENARIO
  33. # assert parquet_dn_cfg.engine == "pyarrow"
  34. # assert parquet_dn_cfg.compression == "snappy"
  35. # assert parquet_dn_cfg.exposed_type == "pandas"
  36. # assert parquet_dn_cfg.validity_period is None
  37. excel_dn_cfg = Config.configure_data_node("data_node_4", "excel")
  38. assert excel_dn_cfg.scope == Scope.SCENARIO
  39. assert excel_dn_cfg.has_header is True
  40. assert excel_dn_cfg.exposed_type == "pandas"
  41. assert excel_dn_cfg.validity_period is None
  42. generic_dn_cfg = Config.configure_data_node("data_node_5", "generic")
  43. assert generic_dn_cfg.scope == Scope.SCENARIO
  44. assert generic_dn_cfg.validity_period is None
  45. in_memory_dn_cfg = Config.configure_data_node("data_node_6", "in_memory")
  46. assert in_memory_dn_cfg.scope == Scope.SCENARIO
  47. assert in_memory_dn_cfg.validity_period is None
  48. pickle_dn_cfg = Config.configure_data_node("data_node_7", "pickle")
  49. assert pickle_dn_cfg.scope == Scope.SCENARIO
  50. assert pickle_dn_cfg.validity_period is None
  51. sql_table_dn_cfg = Config.configure_data_node(
  52. "data_node_8", "sql_table", db_name="test", db_engine="mssql", table_name="test"
  53. )
  54. assert sql_table_dn_cfg.scope == Scope.SCENARIO
  55. assert sql_table_dn_cfg.db_host == "localhost"
  56. assert sql_table_dn_cfg.db_port == 1433
  57. assert sql_table_dn_cfg.db_driver == ""
  58. assert sql_table_dn_cfg.sqlite_file_extension == ".db"
  59. assert sql_table_dn_cfg.exposed_type == "pandas"
  60. assert sql_table_dn_cfg.validity_period is None
  61. sql_dn_cfg = Config.configure_data_node(
  62. "data_node_9", "sql", db_name="test", db_engine="mssql", read_query="test", write_query_builder=print
  63. )
  64. assert sql_dn_cfg.scope == Scope.SCENARIO
  65. assert sql_dn_cfg.db_host == "localhost"
  66. assert sql_dn_cfg.db_port == 1433
  67. assert sql_dn_cfg.db_driver == ""
  68. assert sql_dn_cfg.sqlite_file_extension == ".db"
  69. assert sql_dn_cfg.exposed_type == "pandas"
  70. assert sql_dn_cfg.validity_period is None
  71. mongo_dn_cfg = Config.configure_data_node(
  72. "data_node_10", "mongo_collection", db_name="test", collection_name="test"
  73. )
  74. assert mongo_dn_cfg.scope == Scope.SCENARIO
  75. assert mongo_dn_cfg.db_host == "localhost"
  76. assert mongo_dn_cfg.db_port == 27017
  77. assert mongo_dn_cfg.custom_document == MongoDefaultDocument
  78. assert mongo_dn_cfg.db_username == ""
  79. assert mongo_dn_cfg.db_password == ""
  80. assert mongo_dn_cfg.db_driver == ""
  81. assert mongo_dn_cfg.validity_period is None
  82. aws_s3_object_dn_cfg = Config.configure_data_node(
  83. "data_node_11",
  84. "s3_object",
  85. aws_access_key="test",
  86. aws_secret_access_key="test_secret",
  87. aws_s3_bucket_name="test_bucket",
  88. aws_s3_object_key="test_file.txt",
  89. )
  90. assert aws_s3_object_dn_cfg.scope == Scope.SCENARIO
  91. assert aws_s3_object_dn_cfg.aws_access_key == "test"
  92. assert aws_s3_object_dn_cfg.aws_secret_access_key == "test_secret"
  93. assert aws_s3_object_dn_cfg.aws_s3_bucket_name == "test_bucket"
  94. assert aws_s3_object_dn_cfg.aws_s3_object_key == "test_file.txt"
  95. assert aws_s3_object_dn_cfg.aws_region is None
  96. assert aws_s3_object_dn_cfg.aws_s3_object_parameters is None
  97. assert aws_s3_object_dn_cfg.validity_period is None
  98. def test_data_node_config_check(caplog):
  99. data_node_config = Config.configure_data_node("data_nodes1", "pickle")
  100. assert list(Config.data_nodes) == [DataNodeConfig._DEFAULT_KEY, data_node_config.id]
  101. data_node2_config = Config.configure_data_node("data_nodes2", "pickle")
  102. assert list(Config.data_nodes) == [DataNodeConfig._DEFAULT_KEY, data_node_config.id, data_node2_config.id]
  103. data_node3_config = Config.configure_data_node("data_nodes3", "csv", has_header=True, default_path="")
  104. assert list(Config.data_nodes) == [
  105. "default",
  106. data_node_config.id,
  107. data_node2_config.id,
  108. data_node3_config.id,
  109. ]
  110. with pytest.raises(SystemExit):
  111. Config.configure_data_node("data_nodes", storage_type="bar")
  112. Config.check()
  113. expected_error_message = (
  114. "`storage_type` field of DataNodeConfig `data_nodes` must be either csv, sql_table,"
  115. " sql, mongo_collection, pickle, excel, generic, json, parquet, s3_object, or in_memory. Current"
  116. ' value of property `storage_type` is "bar".'
  117. )
  118. assert expected_error_message in caplog.text
  119. with pytest.raises(SystemExit):
  120. Config.configure_data_node("data_nodes", scope="bar")
  121. Config.check()
  122. expected_error_message = (
  123. "`scope` field of DataNodeConfig `data_nodes` must be populated with a Scope value."
  124. ' Current value of property `scope` is "bar".'
  125. )
  126. assert expected_error_message in caplog.text
  127. with pytest.raises(TypeError):
  128. Config.configure_data_node("data_nodes", storage_type="sql")
  129. with pytest.raises(SystemExit):
  130. Config.configure_data_node("data_nodes", storage_type="generic")
  131. Config.check()
  132. expected_error_message = (
  133. "`storage_type` field of DataNodeConfig `data_nodes` must be either csv, sql_table,"
  134. " sql, mongo_collection, pickle, excel, generic, json, parquet, s3_object, or in_memory."
  135. ' Current value of property `storage_type` is "bar".'
  136. )
  137. assert expected_error_message in caplog.text
  138. def test_configure_data_node_from_another_configuration():
  139. d1_cfg = Config.configure_sql_table_data_node(
  140. "d1",
  141. db_username="foo",
  142. db_password="bar",
  143. db_name="db",
  144. db_engine="mssql",
  145. db_port=8080,
  146. db_host="somewhere",
  147. table_name="foo",
  148. scope=Scope.GLOBAL,
  149. foo="bar",
  150. )
  151. d2_cfg = Config.configure_data_node_from(
  152. source_configuration=d1_cfg,
  153. id="d2",
  154. table_name="table_2",
  155. )
  156. assert d2_cfg.id == "d2"
  157. assert d2_cfg.storage_type == "sql_table"
  158. assert d2_cfg.scope == Scope.GLOBAL
  159. assert d2_cfg.validity_period is None
  160. assert d2_cfg.db_username == "foo"
  161. assert d2_cfg.db_password == "bar"
  162. assert d2_cfg.db_name == "db"
  163. assert d2_cfg.db_engine == "mssql"
  164. assert d2_cfg.db_port == 8080
  165. assert d2_cfg.db_host == "somewhere"
  166. assert d2_cfg.table_name == "table_2"
  167. assert d2_cfg.foo == "bar"
  168. d3_cfg = Config.configure_data_node_from(
  169. source_configuration=d1_cfg,
  170. id="d3",
  171. scope=Scope.SCENARIO,
  172. validity_period=datetime.timedelta(days=1),
  173. table_name="table_3",
  174. foo="baz",
  175. )
  176. assert d3_cfg.id == "d3"
  177. assert d3_cfg.storage_type == "sql_table"
  178. assert d3_cfg.scope == Scope.SCENARIO
  179. assert d3_cfg.validity_period == datetime.timedelta(days=1)
  180. assert d3_cfg.db_username == "foo"
  181. assert d3_cfg.db_password == "bar"
  182. assert d3_cfg.db_name == "db"
  183. assert d3_cfg.db_engine == "mssql"
  184. assert d3_cfg.db_port == 8080
  185. assert d3_cfg.db_host == "somewhere"
  186. assert d3_cfg.table_name == "table_3"
  187. assert d3_cfg.foo == "baz"
  188. def test_data_node_count():
  189. Config.configure_data_node("data_nodes1", "pickle")
  190. assert len(Config.data_nodes) == 2
  191. Config.configure_data_node("data_nodes2", "pickle")
  192. assert len(Config.data_nodes) == 3
  193. Config.configure_data_node("data_nodes3", "pickle")
  194. assert len(Config.data_nodes) == 4
  195. def test_data_node_getitem():
  196. data_node_id = "data_nodes1"
  197. data_node_config = Config.configure_data_node(data_node_id, "pickle", default_path="foo.p")
  198. assert Config.data_nodes[data_node_id].id == data_node_config.id
  199. assert Config.data_nodes[data_node_id].default_path == "foo.p"
  200. assert Config.data_nodes[data_node_id].storage_type == data_node_config.storage_type
  201. assert Config.data_nodes[data_node_id].scope == data_node_config.scope
  202. assert Config.data_nodes[data_node_id].properties == data_node_config.properties
  203. def test_data_node_creation_no_duplication():
  204. Config.configure_data_node("data_nodes1", "pickle")
  205. assert len(Config.data_nodes) == 2
  206. Config.configure_data_node("data_nodes1", "pickle")
  207. assert len(Config.data_nodes) == 2
  208. def test_date_node_create_with_datetime():
  209. data_node_config = Config.configure_data_node(
  210. id="datetime_data",
  211. my_property=datetime.datetime(1991, 1, 1),
  212. foo="hello",
  213. test=1,
  214. test_dict={"type": "Datetime", 2: "daw"},
  215. )
  216. assert data_node_config.foo == "hello"
  217. assert data_node_config.my_property == datetime.datetime(1991, 1, 1)
  218. assert data_node_config.test == 1
  219. assert data_node_config.test_dict.get("type") == "Datetime"
  220. def test_data_node_with_env_variable_value():
  221. with mock.patch.dict(os.environ, {"FOO": "pickle", "BAR": "baz"}):
  222. Config.configure_data_node("data_node", storage_type="ENV[FOO]", prop="ENV[BAR]")
  223. assert Config.data_nodes["data_node"].prop == "baz"
  224. assert Config.data_nodes["data_node"].properties["prop"] == "baz"
  225. assert Config.data_nodes["data_node"]._properties["prop"] == "ENV[BAR]"
  226. assert Config.data_nodes["data_node"].storage_type == "pickle"
  227. assert Config.data_nodes["data_node"]._storage_type == "ENV[FOO]"
  228. def test_data_node_with_env_variable_in_write_fct_args():
  229. def read_fct(): ...
  230. def write_fct(): ...
  231. with mock.patch.dict(os.environ, {"FOO": "bar", "BAZ": "qux"}):
  232. Config.configure_data_node(
  233. "data_node",
  234. storage_type="generic",
  235. read_fct=read_fct,
  236. write_fct=write_fct,
  237. write_fct_args=["ENV[FOO]", "my_param", "ENV[BAZ]"],
  238. )
  239. assert Config.data_nodes["data_node"].write_fct_args == ["bar", "my_param", "qux"]
  240. def test_data_node_with_env_variable_in_read_fct_args():
  241. def read_fct(): ...
  242. def write_fct(): ...
  243. with mock.patch.dict(os.environ, {"FOO": "bar", "BAZ": "qux"}):
  244. Config.configure_data_node(
  245. "data_node",
  246. storage_type="generic",
  247. read_fct=read_fct,
  248. write_fct=write_fct,
  249. read_fct_args=["ENV[FOO]", "my_param", "ENV[BAZ]"],
  250. )
  251. assert Config.data_nodes["data_node"].read_fct_args == ["bar", "my_param", "qux"]
  252. def test_block_datanode_config_update_in_development_mode():
  253. data_node_id = "data_node_id"
  254. Config.configure_job_executions(mode=JobConfig._DEVELOPMENT_MODE)
  255. data_node_config = Config.configure_data_node(
  256. id=data_node_id,
  257. storage_type="pickle",
  258. default_path="foo.p",
  259. scope=Scope.SCENARIO,
  260. )
  261. assert Config.data_nodes[data_node_id].id == data_node_id
  262. assert Config.data_nodes[data_node_id].default_path == "foo.p"
  263. assert Config.data_nodes[data_node_id].storage_type == "pickle"
  264. assert Config.data_nodes[data_node_id].scope == Scope.SCENARIO
  265. assert Config.data_nodes[data_node_id].properties == {"default_path": "foo.p"}
  266. _OrchestratorFactory._build_dispatcher()
  267. with pytest.raises(ConfigurationUpdateBlocked):
  268. data_node_config.storage_type = "foo"
  269. with pytest.raises(ConfigurationUpdateBlocked):
  270. data_node_config.scope = Scope.SCENARIO
  271. with pytest.raises(ConfigurationUpdateBlocked):
  272. data_node_config.properties = {"foo": "bar"}
  273. assert Config.data_nodes[data_node_id].id == data_node_id
  274. assert Config.data_nodes[data_node_id].default_path == "foo.p"
  275. assert Config.data_nodes[data_node_id].storage_type == "pickle"
  276. assert Config.data_nodes[data_node_id].scope == Scope.SCENARIO
  277. assert Config.data_nodes[data_node_id].properties == {"default_path": "foo.p"}
  278. def test_block_datanode_config_update_in_standalone_mode():
  279. data_node_id = "data_node_id"
  280. Config.configure_job_executions(mode=JobConfig._STANDALONE_MODE)
  281. data_node_config = Config.configure_data_node(
  282. id=data_node_id,
  283. storage_type="pickle",
  284. default_path="foo.p",
  285. scope=Scope.SCENARIO,
  286. )
  287. assert Config.data_nodes[data_node_id].id == data_node_id
  288. assert Config.data_nodes[data_node_id].default_path == "foo.p"
  289. assert Config.data_nodes[data_node_id].storage_type == "pickle"
  290. assert Config.data_nodes[data_node_id].scope == Scope.SCENARIO
  291. assert Config.data_nodes[data_node_id].properties == {"default_path": "foo.p"}
  292. _OrchestratorFactory._build_dispatcher()
  293. with pytest.raises(ConfigurationUpdateBlocked):
  294. data_node_config.storage_type = "foo"
  295. with pytest.raises(ConfigurationUpdateBlocked):
  296. data_node_config.scope = Scope.SCENARIO
  297. with pytest.raises(ConfigurationUpdateBlocked):
  298. data_node_config.properties = {"foo": "bar"}
  299. assert Config.data_nodes[data_node_id].id == data_node_id
  300. assert Config.data_nodes[data_node_id].default_path == "foo.p"
  301. assert Config.data_nodes[data_node_id].storage_type == "pickle"
  302. assert Config.data_nodes[data_node_id].scope == Scope.SCENARIO
  303. assert Config.data_nodes[data_node_id].properties == {"default_path": "foo.p"}
  304. def test_clean_config():
  305. dn1_config = Config.configure_data_node(
  306. id="id1",
  307. storage_type="csv",
  308. default_path="foo.p",
  309. scope=Scope.GLOBAL,
  310. validity_period=datetime.timedelta(2),
  311. )
  312. dn2_config = Config.configure_data_node(
  313. id="id2",
  314. storage_type="json",
  315. default_path="bar.json",
  316. scope=Scope.GLOBAL,
  317. validity_period=datetime.timedelta(2),
  318. )
  319. assert Config.data_nodes["id1"] is dn1_config
  320. assert Config.data_nodes["id2"] is dn2_config
  321. dn1_config._clean()
  322. dn2_config._clean()
  323. # Check if the instance before and after _clean() is the same
  324. assert Config.data_nodes["id1"] is dn1_config
  325. assert Config.data_nodes["id2"] is dn2_config
  326. # Check if the value is similar to the default_config, but with difference instances
  327. assert dn1_config.id == "id1"
  328. assert dn2_config.id == "id2"
  329. assert dn1_config.storage_type == dn2_config.storage_type == "pickle"
  330. assert dn1_config.scope == dn2_config.scope == Scope.SCENARIO
  331. assert dn1_config.validity_period is dn2_config.validity_period is None
  332. assert dn1_config.default_path is dn2_config.default_path is None
  333. assert dn1_config.properties == dn2_config.properties == {}
  334. def test_normalize_path():
  335. data_node_config = Config.configure_data_node(id="data_nodes1", storage_type="csv", path=r"data\file.csv")
  336. assert data_node_config.path == "data/file.csv"