test_scenario_repositories.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import pytest
  13. from taipy.core.exceptions import ModelNotFound
  14. from taipy.core.scenario._scenario_fs_repository import _ScenarioFSRepository
  15. from taipy.core.scenario._scenario_sql_repository import _ScenarioSQLRepository
  16. from taipy.core.scenario.scenario import Scenario, ScenarioId
  17. class TestScenarioFSRepository:
  18. @pytest.mark.parametrize("repo", [_ScenarioFSRepository, _ScenarioSQLRepository])
  19. def test_save_and_load(self, scenario, repo, init_sql_repo):
  20. repository = repo()
  21. repository._save(scenario)
  22. obj = repository._load(scenario.id)
  23. assert isinstance(obj, Scenario)
  24. @pytest.mark.parametrize("repo", [_ScenarioFSRepository, _ScenarioSQLRepository])
  25. def test_exists(self, scenario, repo, init_sql_repo):
  26. repository = repo()
  27. repository._save(scenario)
  28. assert repository._exists(scenario.id)
  29. assert not repository._exists("not-existed-scenario")
  30. @pytest.mark.parametrize("repo", [_ScenarioFSRepository, _ScenarioSQLRepository])
  31. def test_load_all(self, scenario, repo, init_sql_repo):
  32. repository = repo()
  33. for i in range(10):
  34. scenario.id = ScenarioId(f"scenario-{i}")
  35. repository._save(scenario)
  36. data_nodes = repository._load_all()
  37. assert len(data_nodes) == 10
  38. @pytest.mark.parametrize("repo", [_ScenarioFSRepository, _ScenarioSQLRepository])
  39. def test_load_all_with_filters(self, scenario, repo, init_sql_repo):
  40. repository = repo()
  41. for i in range(10):
  42. scenario.id = ScenarioId(f"scenario-{i}")
  43. repository._save(scenario)
  44. objs = repository._load_all(filters=[{"id": "scenario-2"}])
  45. assert len(objs) == 1
  46. @pytest.mark.parametrize("repo", [_ScenarioFSRepository, _ScenarioSQLRepository])
  47. def test_delete(self, scenario, repo, init_sql_repo):
  48. repository = repo()
  49. repository._save(scenario)
  50. repository._delete(scenario.id)
  51. with pytest.raises(ModelNotFound):
  52. repository._load(scenario.id)
  53. @pytest.mark.parametrize("repo", [_ScenarioFSRepository, _ScenarioSQLRepository])
  54. def test_delete_all(self, scenario, repo, init_sql_repo):
  55. repository = repo()
  56. for i in range(10):
  57. scenario.id = ScenarioId(f"scenario-{i}")
  58. repository._save(scenario)
  59. assert len(repository._load_all()) == 10
  60. repository._delete_all()
  61. assert len(repository._load_all()) == 0
  62. @pytest.mark.parametrize("repo", [_ScenarioFSRepository, _ScenarioSQLRepository])
  63. def test_delete_many(self, scenario, repo, init_sql_repo):
  64. repository = repo()
  65. for i in range(10):
  66. scenario.id = ScenarioId(f"scenario-{i}")
  67. repository._save(scenario)
  68. objs = repository._load_all()
  69. assert len(objs) == 10
  70. ids = [x.id for x in objs[:3]]
  71. repository._delete_many(ids)
  72. assert len(repository._load_all()) == 7
  73. @pytest.mark.parametrize("repo", [_ScenarioFSRepository, _ScenarioSQLRepository])
  74. def test_delete_by(self, scenario, repo, init_sql_repo):
  75. repository = repo()
  76. # Create 5 entities with version 1.0 and 5 entities with version 2.0
  77. for i in range(10):
  78. scenario.id = ScenarioId(f"scenario-{i}")
  79. scenario._version = f"{(i+1) // 5}.0"
  80. repository._save(scenario)
  81. objs = repository._load_all()
  82. assert len(objs) == 10
  83. repository._delete_by("version", "1.0")
  84. assert len(repository._load_all()) == 5
  85. @pytest.mark.parametrize("repo", [_ScenarioFSRepository, _ScenarioSQLRepository])
  86. def test_search(self, scenario, repo, init_sql_repo):
  87. repository = repo()
  88. for i in range(10):
  89. scenario.id = ScenarioId(f"scenario-{i}")
  90. repository._save(scenario)
  91. assert len(repository._load_all()) == 10
  92. objs = repository._search("id", "scenario-2")
  93. assert len(objs) == 1
  94. assert isinstance(objs[0], Scenario)
  95. objs = repository._search("id", "scenario-2", filters=[{"version": "random_version_number"}])
  96. assert len(objs) == 1
  97. assert isinstance(objs[0], Scenario)
  98. assert repository._search("id", "scenario-2", filters=[{"version": "non_existed_version"}]) == []
  99. @pytest.mark.parametrize("repo", [_ScenarioFSRepository, _ScenarioSQLRepository])
  100. def test_export(self, tmpdir, scenario, repo, init_sql_repo):
  101. repository = repo()
  102. repository._save(scenario)
  103. repository._export(scenario.id, tmpdir.strpath)
  104. dir_path = repository.dir_path if repo == _ScenarioFSRepository else os.path.join(tmpdir.strpath, "scenario")
  105. assert os.path.exists(os.path.join(dir_path, f"{scenario.id}.json"))