test_data_repositories.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import pytest
  13. from taipy.core.data._data_fs_repository import _DataFSRepository
  14. from taipy.core.data.data_node import DataNode, DataNodeId
  15. from taipy.core.exceptions import ModelNotFound
  16. class TestDataNodeRepository:
  17. @pytest.mark.parametrize("repo", [_DataFSRepository])
  18. def test_save_and_load(self, data_node: DataNode, repo):
  19. repository = repo()
  20. repository._save(data_node)
  21. loaded_data_node = repository._load(data_node.id)
  22. assert isinstance(loaded_data_node, DataNode)
  23. assert data_node.id == loaded_data_node.id
  24. assert data_node._config_id == loaded_data_node._config_id
  25. assert data_node._owner_id == loaded_data_node._owner_id
  26. assert data_node._parent_ids == loaded_data_node._parent_ids
  27. assert data_node._scope == loaded_data_node._scope
  28. assert data_node._last_edit_date == loaded_data_node._last_edit_date
  29. assert data_node._edit_in_progress == loaded_data_node._edit_in_progress
  30. assert data_node._version == loaded_data_node._version
  31. assert data_node._validity_period == loaded_data_node._validity_period
  32. assert data_node._editor_id == loaded_data_node._editor_id
  33. assert data_node._editor_expiration_date == loaded_data_node._editor_expiration_date
  34. assert data_node._edits == loaded_data_node._edits
  35. assert data_node._properties == loaded_data_node._properties
  36. @pytest.mark.parametrize("repo", [_DataFSRepository])
  37. def test_exists(self, data_node, repo):
  38. repository = repo()
  39. repository._save(data_node)
  40. assert repository._exists(data_node.id)
  41. assert not repository._exists("not-existed-data-node")
  42. @pytest.mark.parametrize("repo", [_DataFSRepository])
  43. def test_load_all(self, data_node, repo):
  44. repository = repo()
  45. for i in range(10):
  46. data_node.id = DataNodeId(f"data_node-{i}")
  47. repository._save(data_node)
  48. data_nodes = repository._load_all()
  49. assert len(data_nodes) == 10
  50. @pytest.mark.parametrize("repo", [_DataFSRepository])
  51. def test_load_all_with_filters(self, data_node, repo):
  52. repository = repo()
  53. for i in range(10):
  54. data_node.id = DataNodeId(f"data_node-{i}")
  55. data_node._owner_id = f"task-{i}"
  56. repository._save(data_node)
  57. objs = repository._load_all(filters=[{"owner_id": "task-2"}])
  58. assert len(objs) == 1
  59. @pytest.mark.parametrize("repo", [_DataFSRepository])
  60. def test_delete(self, data_node, repo):
  61. repository = repo()
  62. repository._save(data_node)
  63. repository._delete(data_node.id)
  64. with pytest.raises(ModelNotFound):
  65. repository._load(data_node.id)
  66. @pytest.mark.parametrize("repo", [_DataFSRepository])
  67. def test_delete_all(self, data_node, repo):
  68. repository = repo()
  69. for i in range(10):
  70. data_node.id = DataNodeId(f"data_node-{i}")
  71. repository._save(data_node)
  72. assert len(repository._load_all()) == 10
  73. repository._delete_all()
  74. assert len(repository._load_all()) == 0
  75. @pytest.mark.parametrize("repo", [_DataFSRepository])
  76. def test_delete_many(self, data_node, repo):
  77. repository = repo()
  78. for i in range(10):
  79. data_node.id = DataNodeId(f"data_node-{i}")
  80. repository._save(data_node)
  81. objs = repository._load_all()
  82. assert len(objs) == 10
  83. ids = [x.id for x in objs[:3]]
  84. repository._delete_many(ids)
  85. assert len(repository._load_all()) == 7
  86. @pytest.mark.parametrize("repo", [_DataFSRepository])
  87. def test_delete_by(self, data_node, repo):
  88. repository = repo()
  89. # Create 5 entities with version 1.0 and 5 entities with version 2.0
  90. for i in range(10):
  91. data_node.id = DataNodeId(f"data_node-{i}")
  92. data_node._version = f"{(i+1) // 5}.0"
  93. repository._save(data_node)
  94. objs = repository._load_all()
  95. assert len(objs) == 10
  96. repository._delete_by("version", "1.0")
  97. assert len(repository._load_all()) == 5
  98. @pytest.mark.parametrize("repo", [_DataFSRepository])
  99. def test_search(self, data_node, repo):
  100. repository = repo()
  101. for i in range(10):
  102. data_node.id = DataNodeId(f"data_node-{i}")
  103. data_node._owner_id = f"task-{i}"
  104. repository._save(data_node)
  105. assert len(repository._load_all()) == 10
  106. objs = repository._search("owner_id", "task-2")
  107. assert len(objs) == 1
  108. assert isinstance(objs[0], DataNode)
  109. objs = repository._search("owner_id", "task-2", filters=[{"version": "random_version_number"}])
  110. assert len(objs) == 1
  111. assert isinstance(objs[0], DataNode)
  112. assert repository._search("owner_id", "task-2", filters=[{"version": "non_existed_version"}]) == []
  113. @pytest.mark.parametrize("repo", [_DataFSRepository])
  114. def test_export(self, tmpdir, data_node, repo):
  115. repository = repo()
  116. repository._save(data_node)
  117. repository._export(data_node.id, tmpdir.strpath)
  118. dir_path = repository.dir_path if repo == _DataFSRepository else os.path.join(tmpdir.strpath, "data_node")
  119. assert os.path.exists(os.path.join(dir_path, f"{data_node.id}.json"))