123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- # Copyright 2021-2024 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import os
- import shutil
- import zipfile
- import pandas as pd
- import pytest
- import taipy.core.taipy as tp
- from taipy import Config, Frequency, Scope
- from taipy.core._version._version_manager import _VersionManager
- from taipy.core.cycle._cycle_manager import _CycleManager
- from taipy.core.data._data_manager import _DataManager
- from taipy.core.exceptions.exceptions import (
- ConflictedConfigurationError,
- EntitiesToBeImportAlredyExist,
- ImportArchiveDoesntContainAnyScenario,
- ImportScenarioDoesntHaveAVersion,
- )
- from taipy.core.job._job_manager import _JobManager
- from taipy.core.scenario._scenario_manager import _ScenarioManager
- from taipy.core.submission._submission_manager import _SubmissionManager
- from taipy.core.task._task_manager import _TaskManager
- @pytest.fixture(scope="function", autouse=True)
- def clean_export_zip_file():
- if os.path.exists("./tmp.zip"):
- os.remove("./tmp.zip")
- yield
- if os.path.exists("./tmp.zip"):
- os.remove("./tmp.zip")
- def plus_1(x):
- return x + 1
- def plus_1_dataframe(x):
- return pd.DataFrame({"output": [x + 1]})
- def configure_test_scenario(input_data, frequency=None):
- input_cfg = Config.configure_data_node(
- id=f"i_{input_data}", storage_type="pickle", scope=Scope.SCENARIO, default_data=input_data
- )
- csv_output_cfg = Config.configure_data_node(id=f"o_{input_data}_csv", storage_type="csv")
- excel_output_cfg = Config.configure_data_node(id=f"o_{input_data}_excel", storage_type="excel")
- parquet_output_cfg = Config.configure_data_node(id=f"o_{input_data}_parquet", storage_type="parquet")
- json_output_cfg = Config.configure_data_node(id=f"o_{input_data}_json", storage_type="json")
- csv_task_cfg = Config.configure_task(f"t_{input_data}_csv", plus_1_dataframe, input_cfg, csv_output_cfg)
- excel_task_cfg = Config.configure_task(f"t_{input_data}_excel", plus_1_dataframe, input_cfg, excel_output_cfg)
- parquet_task_cfg = Config.configure_task(f"t_{input_data}_parquet", plus_1_dataframe, input_cfg, parquet_output_cfg)
- json_task_cfg = Config.configure_task(f"t_{input_data}_json", plus_1, input_cfg, json_output_cfg)
- scenario_cfg = Config.configure_scenario(
- id=f"s_{input_data}",
- task_configs=[csv_task_cfg, excel_task_cfg, parquet_task_cfg, json_task_cfg],
- frequency=frequency,
- )
- return scenario_cfg
- def export_test_scenario(scenario_cfg, export_path="tmp.zip", override=False, include_data=False):
- scenario = tp.create_scenario(scenario_cfg)
- tp.submit(scenario)
- # Export the submitted scenario
- tp.export_scenario(scenario.id, export_path, override, include_data)
- return scenario
- def test_import_scenario_without_data(init_managers):
- scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
- scenario = export_test_scenario(scenario_cfg)
- init_managers()
- assert _ScenarioManager._get_all() == []
- imported_scenario = tp.import_scenario("tmp.zip")
- # The imported scenario should be the same as the exported scenario
- assert _ScenarioManager._get_all() == [imported_scenario]
- assert imported_scenario == scenario
- # All entities belonging to the scenario should be imported
- assert len(_CycleManager._get_all()) == 1
- assert len(_TaskManager._get_all()) == 4
- assert len(_DataManager._get_all()) == 5
- assert len(_JobManager._get_all()) == 4
- assert len(_SubmissionManager._get_all()) == 1
- assert len(_VersionManager._get_all()) == 1
- def test_import_scenario_with_data(init_managers):
- scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
- export_test_scenario(scenario_cfg, include_data=True)
- init_managers()
- assert _ScenarioManager._get_all() == []
- imported_scenario = tp.import_scenario("tmp.zip")
- # All data of all data nodes should be imported
- assert all(os.path.exists(dn.path) for dn in imported_scenario.data_nodes.values())
- def test_import_scenario_when_entities_are_already_existed_should_rollback(caplog):
- scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
- export_test_scenario(scenario_cfg)
- caplog.clear()
- _CycleManager._delete_all()
- _TaskManager._delete_all()
- _DataManager._delete_all()
- _JobManager._delete_all()
- _ScenarioManager._delete_all()
- assert len(_CycleManager._get_all()) == 0
- assert len(_TaskManager._get_all()) == 0
- assert len(_DataManager._get_all()) == 0
- assert len(_JobManager._get_all()) == 0
- assert len(_SubmissionManager._get_all()) == 1 # Keep the submission entity to test the rollback
- submission_id = _SubmissionManager._get_all()[0].id
- assert len(_ScenarioManager._get_all()) == 0
- # Import the scenario when the old entities still exist should raise an error
- with pytest.raises(EntitiesToBeImportAlredyExist):
- tp.import_scenario("tmp.zip")
- assert all(log.levelname in ["ERROR", "INFO"] for log in caplog.records)
- assert "An error occurred during the import" in caplog.text
- assert f"{submission_id} already exists. Please use the 'override' parameter to override it" in caplog.text
- # No entity should be imported and the old entities should be kept
- assert len(_CycleManager._get_all()) == 0
- assert len(_TaskManager._get_all()) == 0
- assert len(_DataManager._get_all()) == 0
- assert len(_JobManager._get_all()) == 0
- assert len(_SubmissionManager._get_all()) == 1 # Keep the submission entity to test the rollback
- assert len(_ScenarioManager._get_all()) == 0
- caplog.clear()
- # Import with override flag
- tp.import_scenario("tmp.zip", override=True)
- assert all(log.levelname in ["WARNING", "INFO"] for log in caplog.records)
- assert f"{submission_id} already exists and will be overridden" in caplog.text
- # The scenario is imported and overridden the old one
- assert len(_ScenarioManager._get_all()) == 1
- assert len(_CycleManager._get_all()) == 1
- assert len(_TaskManager._get_all()) == 4
- assert len(_DataManager._get_all()) == 5
- assert len(_JobManager._get_all()) == 4
- assert len(_SubmissionManager._get_all()) == 1
- assert len(_VersionManager._get_all()) == 1
- def test_import_incompatible_scenario(init_managers):
- scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
- export_test_scenario(scenario_cfg)
- Config.unblock_update()
- # Configure a new dn to make the exported version incompatible
- Config.configure_data_node("new_dn")
- with pytest.raises(ConflictedConfigurationError):
- tp.import_scenario("tmp.zip")
- def test_import_a_non_exists_folder():
- scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
- export_test_scenario(scenario_cfg)
- with pytest.raises(FileNotFoundError):
- tp.import_scenario("non_exists_folder")
- def test_import_an_empty_archive(tmpdir_factory):
- empty_folder = tmpdir_factory.mktemp("empty_folder").strpath
- shutil.make_archive("tmp", "zip", empty_folder)
- with pytest.raises(ImportArchiveDoesntContainAnyScenario):
- tp.import_scenario("tmp.zip")
- def test_import_with_no_version(tmp_path):
- scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
- export_test_scenario(scenario_cfg)
- # Extract the zip,
- with zipfile.ZipFile("./tmp.zip") as zip_file:
- zip_file.extractall(tmp_path)
- # remove the version,
- shutil.rmtree(f"{tmp_path}/version")
- # and archive the scenario without the version again
- shutil.make_archive("tmp", "zip", tmp_path)
- with pytest.raises(ImportScenarioDoesntHaveAVersion):
- tp.import_scenario("tmp.zip")
|