test_export.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import shutil
  13. import pandas as pd
  14. import pytest
  15. import taipy.core.taipy as tp
  16. from taipy import Config, Frequency, Scope
  17. from taipy.core.exceptions import ExportFolderAlreadyExists, InvalidExportPath
  18. @pytest.fixture(scope="function", autouse=True)
  19. def clean_tmp_folder():
  20. shutil.rmtree("./tmp", ignore_errors=True)
  21. yield
  22. shutil.rmtree("./tmp", ignore_errors=True)
  23. def plus_1(x):
  24. return x + 1
  25. def plus_1_dataframe(x):
  26. return pd.DataFrame({"output": [x + 1]})
  27. def configure_test_scenario(input_data, frequency=None):
  28. input_cfg = Config.configure_data_node(
  29. id=f"i_{input_data}", storage_type="pickle", scope=Scope.SCENARIO, default_data=input_data
  30. )
  31. csv_output_cfg = Config.configure_data_node(id=f"o_{input_data}_csv", storage_type="csv")
  32. excel_output_cfg = Config.configure_data_node(id=f"o_{input_data}_excel", storage_type="excel")
  33. parquet_output_cfg = Config.configure_data_node(id=f"o_{input_data}_parquet", storage_type="parquet")
  34. json_output_cfg = Config.configure_data_node(id=f"o_{input_data}_json", storage_type="json")
  35. csv_task_cfg = Config.configure_task(f"t_{input_data}_csv", plus_1_dataframe, input_cfg, csv_output_cfg)
  36. excel_task_cfg = Config.configure_task(f"t_{input_data}_excel", plus_1_dataframe, input_cfg, excel_output_cfg)
  37. parquet_task_cfg = Config.configure_task(f"t_{input_data}_parquet", plus_1_dataframe, input_cfg, parquet_output_cfg)
  38. json_task_cfg = Config.configure_task(f"t_{input_data}_json", plus_1, input_cfg, json_output_cfg)
  39. scenario_cfg = Config.configure_scenario(
  40. id=f"s_{input_data}",
  41. task_configs=[csv_task_cfg, excel_task_cfg, parquet_task_cfg, json_task_cfg],
  42. frequency=frequency,
  43. )
  44. return scenario_cfg
  45. def test_export_scenario_to_the_storage_folder():
  46. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  47. scenario = tp.create_scenario(scenario_cfg)
  48. with pytest.raises(InvalidExportPath):
  49. tp.export_scenario(scenario.id, Config.core.taipy_storage_folder)
  50. def test_export_scenario_with_cycle():
  51. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  52. scenario = tp.create_scenario(scenario_cfg)
  53. submission = tp.submit(scenario)
  54. jobs = submission.jobs
  55. # Export the submitted scenario
  56. tp.export_scenario(scenario.id, "./tmp/exp_scenario")
  57. assert sorted(os.listdir("./tmp/exp_scenario/data_nodes")) == sorted(
  58. [
  59. f"{scenario.i_1.id}.json",
  60. f"{scenario.o_1_csv.id}.json",
  61. f"{scenario.o_1_excel.id}.json",
  62. f"{scenario.o_1_parquet.id}.json",
  63. f"{scenario.o_1_json.id}.json",
  64. ]
  65. )
  66. assert sorted(os.listdir("./tmp/exp_scenario/tasks")) == sorted(
  67. [
  68. f"{scenario.t_1_csv.id}.json",
  69. f"{scenario.t_1_excel.id}.json",
  70. f"{scenario.t_1_parquet.id}.json",
  71. f"{scenario.t_1_json.id}.json",
  72. ]
  73. )
  74. assert sorted(os.listdir("./tmp/exp_scenario/scenarios")) == sorted([f"{scenario.id}.json"])
  75. assert sorted(os.listdir("./tmp/exp_scenario/jobs")) == sorted(
  76. [f"{jobs[0].id}.json", f"{jobs[1].id}.json", f"{jobs[2].id}.json", f"{jobs[3].id}.json"]
  77. )
  78. assert os.listdir("./tmp/exp_scenario/submission") == [f"{submission.id}.json"]
  79. assert sorted(os.listdir("./tmp/exp_scenario/cycles")) == sorted([f"{scenario.cycle.id}.json"])
  80. def test_export_scenario_without_cycle():
  81. scenario_cfg = configure_test_scenario(1)
  82. scenario = tp.create_scenario(scenario_cfg)
  83. tp.submit(scenario)
  84. # Export the submitted scenario
  85. tp.export_scenario(scenario.id, "./tmp/exp_scenario")
  86. assert os.path.exists("./tmp/exp_scenario/data_nodes")
  87. assert os.path.exists("./tmp/exp_scenario/tasks")
  88. assert os.path.exists("./tmp/exp_scenario/scenarios")
  89. assert os.path.exists("./tmp/exp_scenario/jobs")
  90. assert os.path.exists("./tmp/exp_scenario/submission")
  91. assert not os.path.exists("./tmp/exp_scenario/cycles") # No cycle
  92. def test_export_scenario_override_existing_files():
  93. scenario_1_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  94. scenario_2_cfg = configure_test_scenario(2)
  95. scenario_1 = tp.create_scenario(scenario_1_cfg)
  96. tp.submit(scenario_1)
  97. # Export the submitted scenario_1
  98. tp.export_scenario(scenario_1.id, "./tmp/exp_scenario")
  99. assert os.path.exists("./tmp/exp_scenario/data_nodes")
  100. assert os.path.exists("./tmp/exp_scenario/tasks")
  101. assert os.path.exists("./tmp/exp_scenario/scenarios")
  102. assert os.path.exists("./tmp/exp_scenario/jobs")
  103. assert os.path.exists("./tmp/exp_scenario/submission")
  104. assert os.path.exists("./tmp/exp_scenario/cycles")
  105. scenario_2 = tp.create_scenario(scenario_2_cfg)
  106. tp.submit(scenario_2)
  107. # Export the submitted scenario_2 to the same folder should raise an error
  108. with pytest.raises(ExportFolderAlreadyExists):
  109. tp.export_scenario(scenario_2.id, "./tmp/exp_scenario")
  110. # Export the submitted scenario_2 without a cycle and override the existing files
  111. tp.export_scenario(scenario_2.id, "./tmp/exp_scenario", override=True)
  112. assert os.path.exists("./tmp/exp_scenario/data_nodes")
  113. assert os.path.exists("./tmp/exp_scenario/tasks")
  114. assert os.path.exists("./tmp/exp_scenario/scenarios")
  115. assert os.path.exists("./tmp/exp_scenario/jobs")
  116. assert os.path.exists("./tmp/exp_scenario/submission")
  117. # The cycles folder should be removed when overriding
  118. assert not os.path.exists("./tmp/exp_scenario/cycles")
  119. def test_export_scenario_filesystem_with_data():
  120. scenario_cfg = configure_test_scenario(1)
  121. scenario = tp.create_scenario(scenario_cfg)
  122. tp.submit(scenario)
  123. # Export scenario without data
  124. tp.export_scenario(scenario.id, "./tmp/exp_scenario")
  125. assert not os.path.exists("./tmp/exp_scenario/user_data")
  126. # Export scenario with data
  127. tp.export_scenario(scenario.id, "./tmp/exp_scenario", include_data=True, override=True)
  128. assert os.path.exists("./tmp/exp_scenario/user_data")
  129. data_files = [f for _, _, files in os.walk("./tmp/exp_scenario/user_data") for f in files]
  130. assert sorted(data_files) == sorted(
  131. [
  132. f"{scenario.i_1.id}.p",
  133. f"{scenario.o_1_csv.id}.csv",
  134. f"{scenario.o_1_excel.id}.xlsx",
  135. f"{scenario.o_1_parquet.id}.parquet",
  136. f"{scenario.o_1_json.id}.json",
  137. ]
  138. )
  139. def test_export_non_file_based_data_node_raise_warning(caplog):
  140. input_cfg = Config.configure_data_node(id="i", storage_type="pickle", scope=Scope.SCENARIO, default_data=1)
  141. csv_output_cfg = Config.configure_data_node(id="o_csv", storage_type="csv")
  142. in_mem_output_cfg = Config.configure_data_node(id="o_mem", storage_type="in_memory")
  143. csv_task_cfg = Config.configure_task("t_csv", plus_1_dataframe, input_cfg, csv_output_cfg)
  144. in_mem_task_cfg = Config.configure_task("t_mem", plus_1, input_cfg, in_mem_output_cfg)
  145. scenario_cfg = Config.configure_scenario(id="s", task_configs=[csv_task_cfg, in_mem_task_cfg])
  146. scenario = tp.create_scenario(scenario_cfg)
  147. tp.submit(scenario)
  148. # Export scenario with in-memory data node
  149. tp.export_scenario(scenario.id, "./tmp/exp_scenario", include_data=True)
  150. expected_warning = f"Data node {scenario.o_mem.id} is not a file-based data node and the data will not be exported"
  151. assert expected_warning in caplog.text