test_import.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import shutil
  13. import pandas as pd
  14. import pytest
  15. import taipy.core.taipy as tp
  16. from taipy import Config, Frequency, Scope
  17. from taipy.core._version._version_manager import _VersionManager
  18. from taipy.core.cycle._cycle_manager import _CycleManager
  19. from taipy.core.data._data_manager import _DataManager
  20. from taipy.core.exceptions.exceptions import (
  21. ConflictedConfigurationError,
  22. EntitiesToBeImportAlredyExist,
  23. ImportFolderDoesntContainAnyScenario,
  24. ImportScenarioDoesntHaveAVersion,
  25. )
  26. from taipy.core.job._job_manager import _JobManager
  27. from taipy.core.scenario._scenario_manager import _ScenarioManager
  28. from taipy.core.submission._submission_manager import _SubmissionManager
  29. from taipy.core.task._task_manager import _TaskManager
  30. @pytest.fixture(scope="function", autouse=True)
  31. def clean_tmp_folder():
  32. shutil.rmtree("./tmp", ignore_errors=True)
  33. yield
  34. shutil.rmtree("./tmp", ignore_errors=True)
  35. def plus_1(x):
  36. return x + 1
  37. def plus_1_dataframe(x):
  38. return pd.DataFrame({"output": [x + 1]})
  39. def configure_test_scenario(input_data, frequency=None):
  40. input_cfg = Config.configure_data_node(
  41. id=f"i_{input_data}", storage_type="pickle", scope=Scope.SCENARIO, default_data=input_data
  42. )
  43. csv_output_cfg = Config.configure_data_node(id=f"o_{input_data}_csv", storage_type="csv")
  44. excel_output_cfg = Config.configure_data_node(id=f"o_{input_data}_excel", storage_type="excel")
  45. parquet_output_cfg = Config.configure_data_node(id=f"o_{input_data}_parquet", storage_type="parquet")
  46. json_output_cfg = Config.configure_data_node(id=f"o_{input_data}_json", storage_type="json")
  47. csv_task_cfg = Config.configure_task(f"t_{input_data}_csv", plus_1_dataframe, input_cfg, csv_output_cfg)
  48. excel_task_cfg = Config.configure_task(f"t_{input_data}_excel", plus_1_dataframe, input_cfg, excel_output_cfg)
  49. parquet_task_cfg = Config.configure_task(f"t_{input_data}_parquet", plus_1_dataframe, input_cfg, parquet_output_cfg)
  50. json_task_cfg = Config.configure_task(f"t_{input_data}_json", plus_1, input_cfg, json_output_cfg)
  51. scenario_cfg = Config.configure_scenario(
  52. id=f"s_{input_data}",
  53. task_configs=[csv_task_cfg, excel_task_cfg, parquet_task_cfg, json_task_cfg],
  54. frequency=frequency,
  55. )
  56. return scenario_cfg
  57. def export_test_scenario(scenario_cfg, folder_path="./tmp/exp_scenario", override=False, include_data=False):
  58. scenario = tp.create_scenario(scenario_cfg)
  59. tp.submit(scenario)
  60. # Export the submitted scenario
  61. tp.export_scenario(scenario.id, folder_path, override, include_data)
  62. return scenario
  63. def test_import_scenario_without_data(init_managers):
  64. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  65. scenario = export_test_scenario(scenario_cfg)
  66. init_managers()
  67. assert _ScenarioManager._get_all() == []
  68. imported_scenario = tp.import_scenario("./tmp/exp_scenario")
  69. # The imported scenario should be the same as the exported scenario
  70. assert _ScenarioManager._get_all() == [imported_scenario]
  71. assert imported_scenario == scenario
  72. # All entities belonging to the scenario should be imported
  73. assert len(_CycleManager._get_all()) == 1
  74. assert len(_TaskManager._get_all()) == 4
  75. assert len(_DataManager._get_all()) == 5
  76. assert len(_JobManager._get_all()) == 4
  77. assert len(_SubmissionManager._get_all()) == 1
  78. assert len(_VersionManager._get_all()) == 1
  79. def test_import_scenario_with_data(init_managers):
  80. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  81. export_test_scenario(scenario_cfg, include_data=True)
  82. init_managers()
  83. assert _ScenarioManager._get_all() == []
  84. imported_scenario = tp.import_scenario("./tmp/exp_scenario")
  85. # All data of all data nodes should be imported
  86. assert all(os.path.exists(dn.path) for dn in imported_scenario.data_nodes.values())
  87. def test_import_scenario_when_entities_are_already_existed(caplog):
  88. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  89. export_test_scenario(scenario_cfg)
  90. caplog.clear()
  91. # Import the scenario when the old entities still exist
  92. with pytest.raises(EntitiesToBeImportAlredyExist):
  93. tp.import_scenario("./tmp/exp_scenario")
  94. assert all(log.levelname == "ERROR" for log in caplog.records[1:])
  95. caplog.clear()
  96. # Import with override flag
  97. assert len(_ScenarioManager._get_all()) == 1
  98. tp.import_scenario("./tmp/exp_scenario", override=True)
  99. assert all(log.levelname == "WARNING" for log in caplog.records[1:])
  100. # The scenario is overridden
  101. assert len(_ScenarioManager._get_all()) == 1
  102. def test_import_incompatible_scenario(init_managers):
  103. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  104. export_test_scenario(scenario_cfg)
  105. Config.unblock_update()
  106. # Configure a new dn to make the exported version incompatible
  107. Config.configure_data_node("new_dn")
  108. with pytest.raises(ConflictedConfigurationError):
  109. tp.import_scenario("./tmp/exp_scenario")
  110. def test_import_a_non_exists_folder():
  111. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  112. export_test_scenario(scenario_cfg)
  113. with pytest.raises(FileNotFoundError):
  114. tp.import_scenario("non_exists_folder")
  115. def test_import_an_empty_folder(tmpdir_factory):
  116. empty_folder = tmpdir_factory.mktemp("empty_folder").strpath
  117. with pytest.raises(ImportFolderDoesntContainAnyScenario):
  118. tp.import_scenario(empty_folder)
  119. def test_import_with_no_version():
  120. scenario_cfg = configure_test_scenario(1, frequency=Frequency.DAILY)
  121. export_test_scenario(scenario_cfg)
  122. shutil.rmtree("./tmp/exp_scenario/version")
  123. with pytest.raises(ImportScenarioDoesntHaveAVersion):
  124. tp.import_scenario("./tmp/exp_scenario")