123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import os
- import pytest
- from taipy.core.data._data_fs_repository import _DataFSRepository
- from taipy.core.data.data_node import DataNode, DataNodeId
- from taipy.core.exceptions import ModelNotFound
- class TestDataNodeRepository:
- @pytest.mark.parametrize("repo", [_DataFSRepository])
- def test_save_and_load(self, data_node: DataNode, repo):
- repository = repo()
- repository._save(data_node)
- loaded_data_node = repository._load(data_node.id)
- assert isinstance(loaded_data_node, DataNode)
- assert data_node.id == loaded_data_node.id
- assert data_node._config_id == loaded_data_node._config_id
- assert data_node._owner_id == loaded_data_node._owner_id
- assert data_node._parent_ids == loaded_data_node._parent_ids
- assert data_node._scope == loaded_data_node._scope
- assert data_node._last_edit_date == loaded_data_node._last_edit_date
- assert data_node._edit_in_progress == loaded_data_node._edit_in_progress
- assert data_node._version == loaded_data_node._version
- assert data_node._validity_period == loaded_data_node._validity_period
- assert data_node._editor_id == loaded_data_node._editor_id
- assert data_node._editor_expiration_date == loaded_data_node._editor_expiration_date
- assert data_node._edits == loaded_data_node._edits
- assert data_node._properties == loaded_data_node._properties
- @pytest.mark.parametrize("repo", [_DataFSRepository])
- def test_exists(self, data_node, repo):
- repository = repo()
- repository._save(data_node)
- assert repository._exists(data_node.id)
- assert not repository._exists("not-existed-data-node")
- @pytest.mark.parametrize("repo", [_DataFSRepository])
- def test_load_all(self, data_node, repo):
- repository = repo()
- for i in range(10):
- data_node.id = DataNodeId(f"data_node-{i}")
- repository._save(data_node)
- data_nodes = repository._load_all()
- assert len(data_nodes) == 10
- @pytest.mark.parametrize("repo", [_DataFSRepository])
- def test_load_all_with_filters(self, data_node, repo):
- repository = repo()
- for i in range(10):
- data_node.id = DataNodeId(f"data_node-{i}")
- data_node._owner_id = f"task-{i}"
- repository._save(data_node)
- objs = repository._load_all(filters=[{"owner_id": "task-2"}])
- assert len(objs) == 1
- @pytest.mark.parametrize("repo", [_DataFSRepository])
- def test_delete(self, data_node, repo):
- repository = repo()
- repository._save(data_node)
- repository._delete(data_node.id)
- with pytest.raises(ModelNotFound):
- repository._load(data_node.id)
- @pytest.mark.parametrize("repo", [_DataFSRepository])
- def test_delete_all(self, data_node, repo):
- repository = repo()
- for i in range(10):
- data_node.id = DataNodeId(f"data_node-{i}")
- repository._save(data_node)
- assert len(repository._load_all()) == 10
- repository._delete_all()
- assert len(repository._load_all()) == 0
- @pytest.mark.parametrize("repo", [_DataFSRepository])
- def test_delete_many(self, data_node, repo):
- repository = repo()
- for i in range(10):
- data_node.id = DataNodeId(f"data_node-{i}")
- repository._save(data_node)
- objs = repository._load_all()
- assert len(objs) == 10
- ids = [x.id for x in objs[:3]]
- repository._delete_many(ids)
- assert len(repository._load_all()) == 7
- @pytest.mark.parametrize("repo", [_DataFSRepository])
- def test_delete_by(self, data_node, repo):
- repository = repo()
- # Create 5 entities with version 1.0 and 5 entities with version 2.0
- for i in range(10):
- data_node.id = DataNodeId(f"data_node-{i}")
- data_node._version = f"{(i+1) // 5}.0"
- repository._save(data_node)
- objs = repository._load_all()
- assert len(objs) == 10
- repository._delete_by("version", "1.0")
- assert len(repository._load_all()) == 5
- @pytest.mark.parametrize("repo", [_DataFSRepository])
- def test_search(self, data_node, repo):
- repository = repo()
- for i in range(10):
- data_node.id = DataNodeId(f"data_node-{i}")
- data_node._owner_id = f"task-{i}"
- repository._save(data_node)
- assert len(repository._load_all()) == 10
- objs = repository._search("owner_id", "task-2")
- assert len(objs) == 1
- assert isinstance(objs[0], DataNode)
- objs = repository._search("owner_id", "task-2", filters=[{"version": "random_version_number"}])
- assert len(objs) == 1
- assert isinstance(objs[0], DataNode)
- assert repository._search("owner_id", "task-2", filters=[{"version": "non_existed_version"}]) == []
- @pytest.mark.parametrize("repo", [_DataFSRepository])
- def test_export(self, tmpdir, data_node, repo):
- repository = repo()
- repository._save(data_node)
- repository._export(data_node.id, tmpdir.strpath)
- dir_path = repository.dir_path if repo == _DataFSRepository else os.path.join(tmpdir.strpath, "data_node")
- assert os.path.exists(os.path.join(dir_path, f"{data_node.id}.json"))
|