Sfoglia il codice sorgente

tests: add export tests on sql repo

trgiangdo 1 anno fa
parent
commit
82af3af4f1
1 ha cambiato i file con 193 aggiunte e 0 eliminazioni
  1. 193 0
      tests/core/test_taipy/test_export_with_sql_repo.py

+ 193 - 0
tests/core/test_taipy/test_export_with_sql_repo.py

@@ -0,0 +1,193 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import os
+import shutil
+
+import pandas as pd
+import pytest
+
+import taipy.core.taipy as tp
+from taipy import Config, Frequency, Scope
+from taipy.core.exceptions import ExportFolderAlreadyExists, InvalidExportPath
+
+
+@pytest.fixture(scope="function", autouse=True)
+def clean_tmp_folder():
+    shutil.rmtree("./tmp", ignore_errors=True)
+    yield
+    shutil.rmtree("./tmp", ignore_errors=True)
+
+
+def plus_1(x):
+    return x + 1
+
+
+def plus_1_dataframe(x):
+    return pd.DataFrame({"output": [x + 1]})
+
+
+def configure_test_scenario(input_data, frequency=None):
+    input_cfg = Config.configure_data_node(
+        id=f"i_{input_data}", storage_type="pickle", scope=Scope.SCENARIO, default_data=input_data
+    )
+    csv_output_cfg = Config.configure_data_node(id=f"o_{input_data}_csv", storage_type="csv")
+    excel_output_cfg = Config.configure_data_node(id=f"o_{input_data}_excel", storage_type="excel")
+    parquet_output_cfg = Config.configure_data_node(id=f"o_{input_data}_parquet", storage_type="parquet")
+    json_output_cfg = Config.configure_data_node(id=f"o_{input_data}_json", storage_type="json")
+
+    csv_task_cfg = Config.configure_task(f"t_{input_data}_csv", plus_1_dataframe, input_cfg, csv_output_cfg)
+    excel_task_cfg = Config.configure_task(f"t_{input_data}_excel", plus_1_dataframe, input_cfg, excel_output_cfg)
+    parquet_task_cfg = Config.configure_task(f"t_{input_data}_parquet", plus_1_dataframe, input_cfg, parquet_output_cfg)
+    json_task_cfg = Config.configure_task(f"t_{input_data}_json", plus_1, input_cfg, json_output_cfg)
+    scenario_cfg = Config.configure_scenario(
+        id=f"s_{input_data}",
+        task_configs=[csv_task_cfg, excel_task_cfg, parquet_task_cfg, json_task_cfg],
+        frequency=frequency,
+    )
+
+    return scenario_cfg
+
+
+def test_export_scenario_to_the_storage_folder(init_sql_repo):
+    scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
+    scenario = tp.create_scenario(scenario_cfg)
+
+    with pytest.raises(InvalidExportPath):
+        tp.export_scenario(scenario.id, Config.core.taipy_storage_folder)
+
+
+def test_export_scenario_with_cycle(init_sql_repo):
+    scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
+
+    scenario = tp.create_scenario(scenario_cfg)
+    submission = tp.submit(scenario)
+    jobs = submission.jobs
+
+    # Export the submitted scenario
+    tp.export_scenario(scenario.id, "./tmp/exp_scenario")
+
+    assert sorted(os.listdir("./tmp/exp_scenario/data_node")) == sorted(
+        [
+            f"{scenario.i_1.id}.json",
+            f"{scenario.o_1_csv.id}.json",
+            f"{scenario.o_1_excel.id}.json",
+            f"{scenario.o_1_parquet.id}.json",
+            f"{scenario.o_1_json.id}.json",
+        ]
+    )
+    assert sorted(os.listdir("./tmp/exp_scenario/task")) == sorted(
+        [
+            f"{scenario.t_1_csv.id}.json",
+            f"{scenario.t_1_excel.id}.json",
+            f"{scenario.t_1_parquet.id}.json",
+            f"{scenario.t_1_json.id}.json",
+        ]
+    )
+    assert sorted(os.listdir("./tmp/exp_scenario/scenario")) == sorted([f"{scenario.id}.json"])
+    assert sorted(os.listdir("./tmp/exp_scenario/job")) == sorted(
+        [f"{jobs[0].id}.json", f"{jobs[1].id}.json", f"{jobs[2].id}.json", f"{jobs[3].id}.json"]
+    )
+    assert os.listdir("./tmp/exp_scenario/submission") == [f"{submission.id}.json"]
+    assert sorted(os.listdir("./tmp/exp_scenario/cycle")) == sorted([f"{scenario.cycle.id}.json"])
+
+
+def test_export_scenario_without_cycle(init_sql_repo):
+    scenario_cfg = configure_test_scenario(1)
+
+    scenario = tp.create_scenario(scenario_cfg)
+    tp.submit(scenario)
+
+    # Export the submitted scenario
+    tp.export_scenario(scenario.id, "./tmp/exp_scenario")
+
+    assert os.path.exists("./tmp/exp_scenario/data_node")
+    assert os.path.exists("./tmp/exp_scenario/task")
+    assert os.path.exists("./tmp/exp_scenario/scenario")
+    assert os.path.exists("./tmp/exp_scenario/job")
+    assert os.path.exists("./tmp/exp_scenario/submission")
+    assert not os.path.exists("./tmp/exp_scenario/cycle")  # No cycle
+
+
+def test_export_scenario_override_existing_files(init_sql_repo):
+    scenario_1_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
+    scenario_2_cfg = configure_test_scenario(2)
+
+    scenario_1 = tp.create_scenario(scenario_1_cfg)
+    tp.submit(scenario_1)
+
+    # Export the submitted scenario_1
+    tp.export_scenario(scenario_1.id, "./tmp/exp_scenario")
+    assert os.path.exists("./tmp/exp_scenario/data_node")
+    assert os.path.exists("./tmp/exp_scenario/task")
+    assert os.path.exists("./tmp/exp_scenario/scenario")
+    assert os.path.exists("./tmp/exp_scenario/job")
+    assert os.path.exists("./tmp/exp_scenario/submission")
+    assert os.path.exists("./tmp/exp_scenario/cycle")
+
+    scenario_2 = tp.create_scenario(scenario_2_cfg)
+    tp.submit(scenario_2)
+
+    # Export the submitted scenario_2 to the same folder should raise an error
+    with pytest.raises(ExportFolderAlreadyExists):
+        tp.export_scenario(scenario_2.id, "./tmp/exp_scenario")
+
+    # Export the submitted scenario_2 without a cycle and override the existing files
+    tp.export_scenario(scenario_2.id, "./tmp/exp_scenario", override=True)
+    assert os.path.exists("./tmp/exp_scenario/data_node")
+    assert os.path.exists("./tmp/exp_scenario/task")
+    assert os.path.exists("./tmp/exp_scenario/scenario")
+    assert os.path.exists("./tmp/exp_scenario/job")
+    assert os.path.exists("./tmp/exp_scenario/submission")
+    # The cycles folder should be removed when overriding
+    assert not os.path.exists("./tmp/exp_scenario/cycle")
+
+
+def test_export_scenario_filesystem_with_data(init_sql_repo):
+    scenario_cfg = configure_test_scenario(1)
+    scenario = tp.create_scenario(scenario_cfg)
+    tp.submit(scenario)
+
+    # Export scenario without data
+    tp.export_scenario(scenario.id, "./tmp/exp_scenario")
+    assert not os.path.exists("./tmp/exp_scenario/user_data")
+
+    # Export scenario with data
+    tp.export_scenario(scenario.id, "./tmp/exp_scenario", include_data=True, override=True)
+    assert os.path.exists("./tmp/exp_scenario/user_data")
+    data_files = [f for _, _, files in os.walk("./tmp/exp_scenario/user_data") for f in files]
+    assert sorted(data_files) == sorted(
+        [
+            f"{scenario.i_1.id}.p",
+            f"{scenario.o_1_csv.id}.csv",
+            f"{scenario.o_1_excel.id}.xlsx",
+            f"{scenario.o_1_parquet.id}.parquet",
+            f"{scenario.o_1_json.id}.json",
+        ]
+    )
+
+
+def test_export_non_file_based_data_node_raise_warning(init_sql_repo, caplog):
+    input_cfg = Config.configure_data_node(id="i", storage_type="pickle", scope=Scope.SCENARIO, default_data=1)
+    csv_output_cfg = Config.configure_data_node(id="o_csv", storage_type="csv")
+    in_mem_output_cfg = Config.configure_data_node(id="o_mem", storage_type="in_memory")
+
+    csv_task_cfg = Config.configure_task("t_csv", plus_1_dataframe, input_cfg, csv_output_cfg)
+    in_mem_task_cfg = Config.configure_task("t_mem", plus_1, input_cfg, in_mem_output_cfg)
+    scenario_cfg = Config.configure_scenario(id="s", task_configs=[csv_task_cfg, in_mem_task_cfg])
+
+    scenario = tp.create_scenario(scenario_cfg)
+    tp.submit(scenario)
+
+    # Export scenario with in-memory data node
+    tp.export_scenario(scenario.id, "./tmp/exp_scenario", include_data=True)
+    expected_warning = f"Data node {scenario.o_mem.id} is not a file-based data node and the data will not be exported"
+    assert expected_warning in caplog.text