test_task_config.py 8.8 KB


  1. # Copyright 2023 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. from unittest import mock
  13. from src.taipy.core.config import DataNodeConfig
  14. from taipy.config.common.scope import Scope
  15. from taipy.config.config import Config
  16. from tests.core.utils.named_temporary_file import NamedTemporaryFile
  17. def _configure_task_in_toml():
  18. return NamedTemporaryFile(
  19. content="""
  20. [TAIPY]
  21. [DATA_NODE.input]
  22. [DATA_NODE.output]
  23. [TASK.tasks1]
  24. function = "builtins.print:function"
  25. inputs = [ "input:SECTION",]
  26. outputs = [ "output:SECTION",]
  27. """
  28. )
  29. def _check_data_nodes_instance(dn_id, task_id):
  30. """Check if the data node instance in the task config correctly points to the Config._applied_config,
  31. not the Config._python_config or the Config._file_config
  32. """
  33. dn_config_applied_instance = Config.data_nodes[dn_id]
  34. for dn in Config.tasks[task_id].inputs:
  35. if dn.id == dn_id:
  36. dn_config_instance_via_task = dn
  37. for dn in Config.tasks[task_id].outputs:
  38. if dn.id == dn_id:
  39. dn_config_instance_via_task = dn
  40. dn_config_python_instance = None
  41. if Config._python_config._sections.get("DATA_NODE", None):
  42. dn_config_python_instance = Config._python_config._sections["DATA_NODE"][dn_id]
  43. dn_config_file_instance = None
  44. if Config._file_config._sections.get("DATA_NODE", None):
  45. dn_config_file_instance = Config._file_config._sections["DATA_NODE"][dn_id]
  46. if dn_config_python_instance:
  47. assert dn_config_python_instance.scope is None
  48. assert dn_config_python_instance is not dn_config_applied_instance
  49. assert dn_config_python_instance is not dn_config_instance_via_task
  50. if dn_config_file_instance:
  51. assert dn_config_file_instance.scope is None
  52. assert dn_config_file_instance is not dn_config_applied_instance
  53. assert dn_config_file_instance is not dn_config_instance_via_task
  54. assert dn_config_applied_instance.scope == DataNodeConfig._DEFAULT_SCOPE
  55. assert dn_config_instance_via_task is dn_config_applied_instance
  56. def test_data_node_instance_when_configure_task_in_python():
  57. input_config = Config.configure_data_node("input")
  58. output_config = Config.configure_data_node("output")
  59. Config.configure_task("tasks1", print, input_config, output_config)
  60. _check_data_nodes_instance("input", "tasks1")
  61. _check_data_nodes_instance("output", "tasks1")
  62. def test_data_node_instance_when_configure_task_by_loading_toml():
  63. toml_config = _configure_task_in_toml()
  64. Config.load(toml_config.filename)
  65. _check_data_nodes_instance("input", "tasks1")
  66. _check_data_nodes_instance("output", "tasks1")
  67. def test_data_node_instance_when_configure_task_by_overriding_toml():
  68. toml_config = _configure_task_in_toml()
  69. Config.override(toml_config.filename)
  70. _check_data_nodes_instance("input", "tasks1")
  71. _check_data_nodes_instance("output", "tasks1")
  72. def test_task_config_creation():
  73. input_config = Config.configure_data_node("input")
  74. output_config = Config.configure_data_node("output")
  75. task_config = Config.configure_task("tasks1", print, input_config, output_config)
  76. assert not task_config.skippable
  77. assert list(Config.tasks) == ["default", task_config.id]
  78. task2 = Config.configure_task("tasks2", print, input_config, output_config, skippable=True)
  79. assert task2.skippable
  80. assert list(Config.tasks) == ["default", task_config.id, task2.id]
  81. def test_task_count():
  82. input_config = Config.configure_data_node("input")
  83. output_config = Config.configure_data_node("output")
  84. Config.configure_task("tasks1", print, input_config, output_config)
  85. assert len(Config.tasks) == 2
  86. Config.configure_task("tasks2", print, input_config, output_config)
  87. assert len(Config.tasks) == 3
  88. Config.configure_task("tasks3", print, input_config, output_config)
  89. assert len(Config.tasks) == 4
  90. def test_task_getitem():
  91. input_config = Config.configure_data_node("input")
  92. output_config = Config.configure_data_node("output")
  93. task_id = "tasks1"
  94. task_cfg = Config.configure_task(task_id, print, input_config, output_config)
  95. assert Config.tasks[task_id].id == task_cfg.id
  96. assert Config.tasks[task_id].properties == task_cfg.properties
  97. assert Config.tasks[task_id].function == task_cfg.function
  98. assert Config.tasks[task_id].input_configs == task_cfg.input_configs
  99. assert Config.tasks[task_id].output_configs == task_cfg.output_configs
  100. assert Config.tasks[task_id].skippable == task_cfg.skippable
  101. def test_task_creation_no_duplication():
  102. input_config = Config.configure_data_node("input")
  103. output_config = Config.configure_data_node("output")
  104. Config.configure_task("tasks1", print, input_config, output_config)
  105. assert len(Config.tasks) == 2
  106. Config.configure_task("tasks1", print, input_config, output_config)
  107. assert len(Config.tasks) == 2
  108. def test_task_config_with_env_variable_value():
  109. input_config = Config.configure_data_node("input")
  110. output_config = Config.configure_data_node("output")
  111. with mock.patch.dict(os.environ, {"FOO": "plop", "BAR": "baz"}):
  112. Config.configure_task("task_name", print, input_config, output_config, prop="ENV[BAR]")
  113. assert Config.tasks["task_name"].prop == "baz"
  114. assert Config.tasks["task_name"].properties["prop"] == "baz"
  115. assert Config.tasks["task_name"]._properties["prop"] == "ENV[BAR]"
  116. def test_clean_config():
  117. dn1 = Config.configure_data_node("dn1")
  118. dn2 = Config.configure_data_node("dn2")
  119. task1_config = Config.configure_task("id1", print, dn1, dn2)
  120. task2_config = Config.configure_task("id2", print, dn2, dn1)
  121. assert Config.tasks["id1"] is task1_config
  122. assert Config.tasks["id2"] is task2_config
  123. task1_config._clean()
  124. task2_config._clean()
  125. # Check if the instance before and after _clean() is the same
  126. assert Config.tasks["id1"] is task1_config
  127. assert Config.tasks["id2"] is task2_config
  128. assert task1_config.id == "id1"
  129. assert task2_config.id == "id2"
  130. assert task1_config.function is task1_config.function is None
  131. assert task1_config.inputs == task1_config.inputs == []
  132. assert task1_config.input_configs == task1_config.input_configs == []
  133. assert task1_config.outputs == task1_config.outputs == []
  134. assert task1_config.output_configs == task1_config.output_configs == []
  135. assert task1_config.skippable is task1_config.skippable is False
  136. assert task1_config.properties == task1_config.properties == {}
  137. def test_deprecated_cacheable_attribute_remains_compatible():
  138. dn_1_id = "dn_1_id"
  139. dn_1_config = Config.configure_data_node(
  140. id=dn_1_id,
  141. storage_type="pickle",
  142. cacheable=False,
  143. scope=Scope.SCENARIO,
  144. )
  145. assert Config.data_nodes[dn_1_id].id == dn_1_id
  146. assert Config.data_nodes[dn_1_id].storage_type == "pickle"
  147. assert Config.data_nodes[dn_1_id].scope == Scope.SCENARIO
  148. assert Config.data_nodes[dn_1_id].properties == {"cacheable": False}
  149. assert not Config.data_nodes[dn_1_id].cacheable
  150. dn_1_config.cacheable = True
  151. assert Config.data_nodes[dn_1_id].properties == {"cacheable": True}
  152. assert Config.data_nodes[dn_1_id].cacheable
  153. dn_2_id = "dn_2_id"
  154. dn_2_config = Config.configure_data_node(
  155. id=dn_2_id,
  156. storage_type="pickle",
  157. cacheable=True,
  158. scope=Scope.SCENARIO,
  159. )
  160. assert Config.data_nodes[dn_2_id].id == dn_2_id
  161. assert Config.data_nodes[dn_2_id].storage_type == "pickle"
  162. assert Config.data_nodes[dn_2_id].scope == Scope.SCENARIO
  163. assert Config.data_nodes[dn_2_id].properties == {"cacheable": True}
  164. assert Config.data_nodes[dn_2_id].cacheable
  165. dn_2_config.cacheable = False
  166. assert Config.data_nodes[dn_1_id].properties == {"cacheable": False}
  167. assert not Config.data_nodes[dn_1_id].cacheable
  168. dn_3_id = "dn_3_id"
  169. dn_3_config = Config.configure_data_node(
  170. id=dn_3_id,
  171. storage_type="pickle",
  172. scope=Scope.SCENARIO,
  173. )
  174. assert Config.data_nodes[dn_3_id].id == dn_3_id
  175. assert Config.data_nodes[dn_3_id].storage_type == "pickle"
  176. assert Config.data_nodes[dn_3_id].scope == Scope.SCENARIO
  177. assert Config.data_nodes[dn_3_id].properties == {}
  178. assert not Config.data_nodes[dn_3_id].cacheable
  179. dn_3_config.cacheable = True
  180. assert Config.data_nodes[dn_3_id].properties == {"cacheable": True}
  181. assert Config.data_nodes[dn_3_id].cacheable