test_export.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import zipfile
  13. import pandas as pd
  14. import pytest
  15. import taipy.core.taipy as tp
  16. from taipy import Config, Frequency, Scope
  17. from taipy.core.exceptions import ExportPathAlreadyExists
  18. @pytest.fixture(scope="function", autouse=True)
  19. def clean_export_zip_file():
  20. if os.path.exists("./tmp.zip"):
  21. os.remove("./tmp.zip")
  22. yield
  23. if os.path.exists("./tmp.zip"):
  24. os.remove("./tmp.zip")
  25. def plus_1(x):
  26. return x + 1
  27. def plus_1_dataframe(x):
  28. return pd.DataFrame({"output": [x + 1]})
  29. def configure_test_scenario(input_data, frequency=None):
  30. input_cfg = Config.configure_data_node(
  31. id=f"i_{input_data}", storage_type="pickle", scope=Scope.SCENARIO, default_data=input_data
  32. )
  33. csv_output_cfg = Config.configure_data_node(id=f"o_{input_data}_csv", storage_type="csv")
  34. excel_output_cfg = Config.configure_data_node(id=f"o_{input_data}_excel", storage_type="excel")
  35. parquet_output_cfg = Config.configure_data_node(id=f"o_{input_data}_parquet", storage_type="parquet")
  36. json_output_cfg = Config.configure_data_node(id=f"o_{input_data}_json", storage_type="json")
  37. csv_task_cfg = Config.configure_task(f"t_{input_data}_csv", plus_1_dataframe, input_cfg, csv_output_cfg)
  38. excel_task_cfg = Config.configure_task(f"t_{input_data}_excel", plus_1_dataframe, input_cfg, excel_output_cfg)
  39. parquet_task_cfg = Config.configure_task(f"t_{input_data}_parquet", plus_1_dataframe, input_cfg, parquet_output_cfg)
  40. json_task_cfg = Config.configure_task(f"t_{input_data}_json", plus_1, input_cfg, json_output_cfg)
  41. scenario_cfg = Config.configure_scenario(
  42. id=f"s_{input_data}",
  43. task_configs=[csv_task_cfg, excel_task_cfg, parquet_task_cfg, json_task_cfg],
  44. frequency=frequency,
  45. )
  46. return scenario_cfg
  47. def test_export_scenario_with_and_without_zip_extension(tmp_path):
  48. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  49. scenario = tp.create_scenario(scenario_cfg)
  50. tp.submit(scenario)
  51. # Export without the .zip extension should create the tmp.zip file
  52. tp.export_scenario(scenario.id, f"{tmp_path}/tmp")
  53. assert os.path.exists(f"{tmp_path}/tmp.zip")
  54. os.remove(f"{tmp_path}/tmp.zip")
  55. # Export with the .zip extension should also create the tmp.zip file
  56. tp.export_scenario(scenario.id, f"{tmp_path}/tmp.zip")
  57. assert os.path.exists(f"{tmp_path}/tmp.zip")
  58. # Export with another extension should create the tmp.<extension>.zip file
  59. tp.export_scenario(scenario.id, f"{tmp_path}/tmp.tar.gz")
  60. assert os.path.exists(f"{tmp_path}/tmp.tar.gz.zip")
  61. def test_export_scenario_with_cycle(tmp_path):
  62. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  63. scenario = tp.create_scenario(scenario_cfg)
  64. submission = tp.submit(scenario)
  65. jobs = submission.jobs
  66. # Export the submitted scenario
  67. tp.export_scenario(scenario.id, "tmp.zip")
  68. with zipfile.ZipFile("./tmp.zip") as zip_file:
  69. zip_file.extractall(tmp_path)
  70. assert sorted(os.listdir(f"{tmp_path}/data_nodes")) == sorted(
  71. [
  72. f"{scenario.i_1.id}.json",
  73. f"{scenario.o_1_csv.id}.json",
  74. f"{scenario.o_1_excel.id}.json",
  75. f"{scenario.o_1_parquet.id}.json",
  76. f"{scenario.o_1_json.id}.json",
  77. ]
  78. )
  79. assert sorted(os.listdir(f"{tmp_path}/tasks")) == sorted(
  80. [
  81. f"{scenario.t_1_csv.id}.json",
  82. f"{scenario.t_1_excel.id}.json",
  83. f"{scenario.t_1_parquet.id}.json",
  84. f"{scenario.t_1_json.id}.json",
  85. ]
  86. )
  87. assert sorted(os.listdir(f"{tmp_path}/scenarios")) == sorted([f"{scenario.id}.json"])
  88. assert sorted(os.listdir(f"{tmp_path}/jobs")) == sorted(
  89. [f"{jobs[0].id}.json", f"{jobs[1].id}.json", f"{jobs[2].id}.json", f"{jobs[3].id}.json"]
  90. )
  91. assert os.listdir(f"{tmp_path}/submission") == [f"{submission.id}.json"]
  92. assert sorted(os.listdir(f"{tmp_path}/cycles")) == sorted([f"{scenario.cycle.id}.json"])
  93. def test_export_scenario_without_cycle(tmp_path):
  94. scenario_cfg = configure_test_scenario(1)
  95. scenario = tp.create_scenario(scenario_cfg)
  96. tp.submit(scenario)
  97. # Export the submitted scenario
  98. tp.export_scenario(scenario.id, "tmp.zip")
  99. with zipfile.ZipFile("./tmp.zip") as zip_file:
  100. zip_file.extractall(tmp_path)
  101. assert os.path.exists(f"{tmp_path}/data_nodes")
  102. assert os.path.exists(f"{tmp_path}/tasks")
  103. assert os.path.exists(f"{tmp_path}/scenarios")
  104. assert os.path.exists(f"{tmp_path}/jobs")
  105. assert os.path.exists(f"{tmp_path}/submission")
  106. assert not os.path.exists(f"{tmp_path}/cycles") # No cycle
  107. def test_export_scenario_override_existing_files(tmp_path):
  108. scenario_1_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  109. scenario_2_cfg = configure_test_scenario(2)
  110. scenario_1 = tp.create_scenario(scenario_1_cfg)
  111. tp.submit(scenario_1)
  112. # Export the submitted scenario_1
  113. tp.export_scenario(scenario_1.id, "tmp.zip")
  114. with zipfile.ZipFile("./tmp.zip") as zip_file:
  115. zip_file.extractall(tmp_path / "scenario_1")
  116. assert os.path.exists(f"{tmp_path}/scenario_1/data_nodes")
  117. assert os.path.exists(f"{tmp_path}/scenario_1/tasks")
  118. assert os.path.exists(f"{tmp_path}/scenario_1/scenarios")
  119. assert os.path.exists(f"{tmp_path}/scenario_1/jobs")
  120. assert os.path.exists(f"{tmp_path}/scenario_1/submission")
  121. assert os.path.exists(f"{tmp_path}/scenario_1/cycles")
  122. scenario_2 = tp.create_scenario(scenario_2_cfg)
  123. tp.submit(scenario_2)
  124. # Export the submitted scenario_2 to the same path should raise an error
  125. with pytest.raises(ExportPathAlreadyExists):
  126. tp.export_scenario(scenario_2.id, "tmp.zip")
  127. # Export the submitted scenario_2 without a cycle and override the existing files
  128. tp.export_scenario(scenario_2.id, "tmp.zip", override=True)
  129. with zipfile.ZipFile("./tmp.zip") as zip_file:
  130. zip_file.extractall(tmp_path / "scenario_2")
  131. assert os.path.exists(f"{tmp_path}/scenario_2/data_nodes")
  132. assert os.path.exists(f"{tmp_path}/scenario_2/tasks")
  133. assert os.path.exists(f"{tmp_path}/scenario_2/scenarios")
  134. assert os.path.exists(f"{tmp_path}/scenario_2/jobs")
  135. assert os.path.exists(f"{tmp_path}/scenario_2/submission")
  136. # The cycles folder should not exists since the new scenario does not have a cycle
  137. assert not os.path.exists(f"{tmp_path}/scenario_2/cycles")
  138. def test_export_scenario_filesystem_with_data(tmp_path):
  139. scenario_cfg = configure_test_scenario(1)
  140. scenario = tp.create_scenario(scenario_cfg)
  141. tp.submit(scenario)
  142. # Export scenario without data
  143. tp.export_scenario(scenario.id, "tmp.zip")
  144. with zipfile.ZipFile("./tmp.zip") as zip_file:
  145. zip_file.extractall(tmp_path / "scenario_without_data")
  146. assert not os.path.exists(f"{tmp_path}/scenario_without_data/user_data")
  147. # Export scenario with data
  148. tp.export_scenario(scenario.id, "tmp.zip", include_data=True, override=True)
  149. with zipfile.ZipFile("./tmp.zip") as zip_file:
  150. zip_file.extractall(tmp_path / "scenario_with_data")
  151. assert os.path.exists(f"{tmp_path}/scenario_with_data/user_data")
  152. data_files = [f for _, _, files in os.walk(f"{tmp_path}/scenario_with_data/user_data") for f in files]
  153. assert sorted(data_files) == sorted(
  154. [
  155. f"{scenario.i_1.id}.p",
  156. f"{scenario.o_1_csv.id}.csv",
  157. f"{scenario.o_1_excel.id}.xlsx",
  158. f"{scenario.o_1_parquet.id}.parquet",
  159. f"{scenario.o_1_json.id}.json",
  160. ]
  161. )
  162. def test_export_non_file_based_data_node_raise_warning(caplog):
  163. input_cfg = Config.configure_data_node(id="i", storage_type="pickle", scope=Scope.SCENARIO, default_data=1)
  164. csv_output_cfg = Config.configure_data_node(id="o_csv", storage_type="csv")
  165. in_mem_output_cfg = Config.configure_data_node(id="o_mem", storage_type="in_memory")
  166. csv_task_cfg = Config.configure_task("t_csv", plus_1_dataframe, input_cfg, csv_output_cfg)
  167. in_mem_task_cfg = Config.configure_task("t_mem", plus_1, input_cfg, in_mem_output_cfg)
  168. scenario_cfg = Config.configure_scenario(id="s", task_configs=[csv_task_cfg, in_mem_task_cfg])
  169. scenario = tp.create_scenario(scenario_cfg)
  170. tp.submit(scenario)
  171. # Export scenario with in-memory data node
  172. tp.export_scenario(scenario.id, "tmp.zip", include_data=True)
  173. expected_warning = f"Data node {scenario.o_mem.id} is not a file-based data node and the data will not be exported"
  174. assert expected_warning in caplog.text