test_import.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import shutil
  13. import zipfile
  14. import pandas as pd
  15. import pytest
  16. import taipy.core.taipy as tp
  17. from taipy import Config, Frequency, Scope
  18. from taipy.core._version._version_manager import _VersionManager
  19. from taipy.core.cycle._cycle_manager import _CycleManager
  20. from taipy.core.data._data_manager import _DataManager
  21. from taipy.core.exceptions.exceptions import (
  22. ConflictedConfigurationError,
  23. EntitiesToBeImportAlredyExist,
  24. ImportArchiveDoesntContainAnyScenario,
  25. ImportScenarioDoesntHaveAVersion,
  26. )
  27. from taipy.core.job._job_manager import _JobManager
  28. from taipy.core.scenario._scenario_manager import _ScenarioManager
  29. from taipy.core.submission._submission_manager import _SubmissionManager
  30. from taipy.core.task._task_manager import _TaskManager
  31. @pytest.fixture(scope="function", autouse=True)
  32. def clean_export_zip_file():
  33. if os.path.exists("./tmp.zip"):
  34. os.remove("./tmp.zip")
  35. yield
  36. if os.path.exists("./tmp.zip"):
  37. os.remove("./tmp.zip")
  38. def plus_1(x):
  39. return x + 1
  40. def plus_1_dataframe(x):
  41. return pd.DataFrame({"output": [x + 1]})
  42. def configure_test_scenario(input_data, frequency=None):
  43. input_cfg = Config.configure_data_node(
  44. id=f"i_{input_data}", storage_type="pickle", scope=Scope.SCENARIO, default_data=input_data
  45. )
  46. csv_output_cfg = Config.configure_data_node(id=f"o_{input_data}_csv", storage_type="csv")
  47. excel_output_cfg = Config.configure_data_node(id=f"o_{input_data}_excel", storage_type="excel")
  48. parquet_output_cfg = Config.configure_data_node(id=f"o_{input_data}_parquet", storage_type="parquet")
  49. json_output_cfg = Config.configure_data_node(id=f"o_{input_data}_json", storage_type="json")
  50. csv_task_cfg = Config.configure_task(f"t_{input_data}_csv", plus_1_dataframe, input_cfg, csv_output_cfg)
  51. excel_task_cfg = Config.configure_task(f"t_{input_data}_excel", plus_1_dataframe, input_cfg, excel_output_cfg)
  52. parquet_task_cfg = Config.configure_task(f"t_{input_data}_parquet", plus_1_dataframe, input_cfg, parquet_output_cfg)
  53. json_task_cfg = Config.configure_task(f"t_{input_data}_json", plus_1, input_cfg, json_output_cfg)
  54. scenario_cfg = Config.configure_scenario(
  55. id=f"s_{input_data}",
  56. task_configs=[csv_task_cfg, excel_task_cfg, parquet_task_cfg, json_task_cfg],
  57. frequency=frequency,
  58. )
  59. return scenario_cfg
  60. def export_test_scenario(scenario_cfg, export_path="tmp.zip", override=False, include_data=False):
  61. scenario = tp.create_scenario(scenario_cfg)
  62. tp.submit(scenario)
  63. # Export the submitted scenario
  64. tp.export_scenario(scenario.id, export_path, override, include_data)
  65. return scenario
  66. def test_import_scenario_without_data(init_managers):
  67. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  68. scenario = export_test_scenario(scenario_cfg)
  69. init_managers()
  70. assert _ScenarioManager._get_all() == []
  71. imported_scenario = tp.import_scenario("tmp.zip")
  72. # The imported scenario should be the same as the exported scenario
  73. assert _ScenarioManager._get_all() == [imported_scenario]
  74. assert imported_scenario == scenario
  75. # All entities belonging to the scenario should be imported
  76. assert len(_CycleManager._get_all()) == 1
  77. assert len(_TaskManager._get_all()) == 4
  78. assert len(_DataManager._get_all()) == 5
  79. assert len(_JobManager._get_all()) == 4
  80. assert len(_SubmissionManager._get_all()) == 1
  81. assert len(_VersionManager._get_all()) == 1
  82. def test_import_scenario_with_data(init_managers):
  83. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  84. export_test_scenario(scenario_cfg, include_data=True)
  85. init_managers()
  86. assert _ScenarioManager._get_all() == []
  87. imported_scenario = tp.import_scenario("tmp.zip")
  88. # All data of all data nodes should be imported
  89. assert all(os.path.exists(dn.path) for dn in imported_scenario.data_nodes.values())
  90. def test_import_scenario_when_entities_are_already_existed_should_rollback(caplog):
  91. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  92. export_test_scenario(scenario_cfg)
  93. caplog.clear()
  94. _CycleManager._delete_all()
  95. _TaskManager._delete_all()
  96. _DataManager._delete_all()
  97. _JobManager._delete_all()
  98. _ScenarioManager._delete_all()
  99. assert len(_CycleManager._get_all()) == 0
  100. assert len(_TaskManager._get_all()) == 0
  101. assert len(_DataManager._get_all()) == 0
  102. assert len(_JobManager._get_all()) == 0
  103. assert len(_SubmissionManager._get_all()) == 1 # Keep the submission entity to test the rollback
  104. submission_id = _SubmissionManager._get_all()[0].id
  105. assert len(_ScenarioManager._get_all()) == 0
  106. # Import the scenario when the old entities still exist should raise an error
  107. with pytest.raises(EntitiesToBeImportAlredyExist):
  108. tp.import_scenario("tmp.zip")
  109. assert all(log.levelname in ["ERROR", "INFO"] for log in caplog.records)
  110. assert "An error occurred during the import" in caplog.text
  111. assert f"{submission_id} already exists. Please use the 'override' parameter to override it" in caplog.text
  112. # No entity should be imported and the old entities should be kept
  113. assert len(_CycleManager._get_all()) == 0
  114. assert len(_TaskManager._get_all()) == 0
  115. assert len(_DataManager._get_all()) == 0
  116. assert len(_JobManager._get_all()) == 0
  117. assert len(_SubmissionManager._get_all()) == 1 # Keep the submission entity to test the rollback
  118. assert len(_ScenarioManager._get_all()) == 0
  119. caplog.clear()
  120. # Import with override flag
  121. tp.import_scenario("tmp.zip", override=True)
  122. assert all(log.levelname in ["WARNING", "INFO"] for log in caplog.records)
  123. assert f"{submission_id} already exists and will be overridden" in caplog.text
  124. # The scenario is imported and overridden the old one
  125. assert len(_ScenarioManager._get_all()) == 1
  126. assert len(_CycleManager._get_all()) == 1
  127. assert len(_TaskManager._get_all()) == 4
  128. assert len(_DataManager._get_all()) == 5
  129. assert len(_JobManager._get_all()) == 4
  130. assert len(_SubmissionManager._get_all()) == 1
  131. assert len(_VersionManager._get_all()) == 1
  132. def test_import_incompatible_scenario(init_managers):
  133. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  134. export_test_scenario(scenario_cfg)
  135. Config.unblock_update()
  136. # Configure a new dn to make the exported version incompatible
  137. Config.configure_data_node("new_dn")
  138. with pytest.raises(ConflictedConfigurationError):
  139. tp.import_scenario("tmp.zip")
  140. def test_import_a_non_exists_folder():
  141. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  142. export_test_scenario(scenario_cfg)
  143. with pytest.raises(FileNotFoundError):
  144. tp.import_scenario("non_exists_folder")
  145. def test_import_an_empty_archive(tmpdir_factory):
  146. empty_folder = tmpdir_factory.mktemp("empty_folder").strpath
  147. shutil.make_archive("tmp", "zip", empty_folder)
  148. with pytest.raises(ImportArchiveDoesntContainAnyScenario):
  149. tp.import_scenario("tmp.zip")
  150. def test_import_with_no_version(tmp_path):
  151. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  152. export_test_scenario(scenario_cfg)
  153. # Extract the zip,
  154. with zipfile.ZipFile("./tmp.zip") as zip_file:
  155. zip_file.extractall(tmp_path)
  156. # remove the version,
  157. shutil.rmtree(f"{tmp_path}/version")
  158. # and archive the scenario without the version again
  159. shutil.make_archive("tmp", "zip", tmp_path)
  160. with pytest.raises(ImportScenarioDoesntHaveAVersion):
  161. tp.import_scenario("tmp.zip")