123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- from dataclasses import dataclass
- from datetime import datetime
- from unittest.mock import patch
- import mongomock
- import pymongo
- import pytest
- from bson import ObjectId
- from bson.errors import InvalidDocument
- from taipy import Scope
- from taipy.common.config import Config
- from taipy.core import MongoDefaultDocument
- from taipy.core.common._mongo_connector import _connect_mongodb
- from taipy.core.data._data_manager_factory import _DataManagerFactory
- from taipy.core.data.data_node_id import DataNodeId
- from taipy.core.data.mongo import MongoCollectionDataNode
- from taipy.core.data.operator import JoinOperator, Operator
- from taipy.core.exceptions.exceptions import InvalidCustomDocument, MissingRequiredProperty
- @pytest.fixture(scope="function", autouse=True)
- def clear_mongo_connection_cache():
- _connect_mongodb.cache_clear()
- @dataclass
- class CustomObjectWithoutArgs:
- def __init__(self, foo=None, bar=None):
- self.foo = foo
- self.bar = bar
- class CustomObjectWithCustomEncoder:
- def __init__(self, _id=None, integer=None, text=None, time=None):
- self.id = _id
- self.integer = integer
- self.text = text
- self.time = time
- def encode(self):
- return {"_id": self.id, "integer": self.integer, "text": self.text, "time": self.time.isoformat()}
- class CustomObjectWithCustomEncoderDecoder(CustomObjectWithCustomEncoder):
- @classmethod
- def decode(cls, data):
- return cls(data["_id"], data["integer"], data["text"], datetime.fromisoformat(data["time"]))
- class TestMongoCollectionDataNode:
- __properties = [
- {
- "db_username": "",
- "db_password": "",
- "db_name": "taipy",
- "collection_name": "foo",
- "custom_document": MongoDefaultDocument,
- "db_extra_args": {
- "ssl": "true",
- "retrywrites": "false",
- "maxIdleTimeMS": "120000",
- },
- }
- ]
- @pytest.mark.parametrize("properties", __properties)
- def test_create(self, properties):
- mongo_dn_config = Config.configure_mongo_collection_data_node("foo_bar", **properties)
- mongo_dn = _DataManagerFactory._build_manager()._create(mongo_dn_config, None, None)
- assert isinstance(mongo_dn, MongoCollectionDataNode)
- assert mongo_dn.storage_type() == "mongo_collection"
- assert mongo_dn.config_id == "foo_bar"
- assert mongo_dn.scope == Scope.SCENARIO
- assert mongo_dn.id is not None
- assert mongo_dn.owner_id is None
- assert mongo_dn.job_ids == []
- assert mongo_dn.is_ready_for_reading
- assert mongo_dn.custom_document == MongoDefaultDocument
- @pytest.mark.parametrize("properties", __properties)
- def test_get_user_properties(self, properties):
- custom_properties = properties.copy()
- custom_properties["foo"] = "bar"
- mongo_dn = MongoCollectionDataNode(
- "foo_bar",
- Scope.SCENARIO,
- properties=custom_properties,
- )
- assert mongo_dn._get_user_properties() == {"foo": "bar"}
- @pytest.mark.parametrize(
- "properties",
- [
- {},
- {"db_username": "foo"},
- {"db_username": "foo", "db_password": "foo"},
- {"db_username": "foo", "db_password": "foo", "db_name": "foo"},
- ],
- )
- def test_create_with_missing_parameters(self, properties):
- with pytest.raises(MissingRequiredProperty):
- MongoCollectionDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"))
- with pytest.raises(MissingRequiredProperty):
- MongoCollectionDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"), properties=properties)
- @pytest.mark.parametrize("properties", __properties)
- def test_raise_error_invalid_custom_document(self, properties):
- custom_properties = properties.copy()
- custom_properties["custom_document"] = "foo"
- with pytest.raises(InvalidCustomDocument):
- MongoCollectionDataNode(
- "foo",
- Scope.SCENARIO,
- properties=custom_properties,
- )
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- def test_read(self, properties):
- mock_client = pymongo.MongoClient("localhost")
- mock_client[properties["db_name"]][properties["collection_name"]].insert_many(
- [
- {"foo": "baz", "bar": "qux"},
- {"foo": "quux", "bar": "quuz"},
- {"foo": "corge"},
- {"bar": "grault"},
- {"KWARGS_KEY": "KWARGS_VALUE"},
- {},
- ]
- )
- mongo_dn = MongoCollectionDataNode(
- "foo",
- Scope.SCENARIO,
- properties=properties,
- )
- data = mongo_dn.read()
- assert isinstance(data, list)
- assert isinstance(data[0], MongoDefaultDocument)
- assert isinstance(data[1], MongoDefaultDocument)
- assert isinstance(data[2], MongoDefaultDocument)
- assert isinstance(data[3], MongoDefaultDocument)
- assert isinstance(data[4], MongoDefaultDocument)
- assert isinstance(data[5], MongoDefaultDocument)
- assert isinstance(data[0]._id, ObjectId)
- assert data[0].foo == "baz"
- assert data[0].bar == "qux"
- assert isinstance(data[1]._id, ObjectId)
- assert data[1].foo == "quux"
- assert data[1].bar == "quuz"
- assert isinstance(data[2]._id, ObjectId)
- assert data[2].foo == "corge"
- assert isinstance(data[3]._id, ObjectId)
- assert data[3].bar == "grault"
- assert isinstance(data[4]._id, ObjectId)
- assert data[4].KWARGS_KEY == "KWARGS_VALUE"
- assert isinstance(data[5]._id, ObjectId)
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- def test_read_empty_as(self, properties):
- mongo_dn = MongoCollectionDataNode(
- "foo",
- Scope.SCENARIO,
- properties=properties,
- )
- data = mongo_dn.read()
- assert isinstance(data, list)
- assert len(data) == 0
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- @pytest.mark.parametrize(
- "data",
- [
- ([{"foo": 1, "a": 2}, {"foo": 3, "bar": 4}]),
- ({"a": 1, "bar": 2}),
- ],
- )
- def test_read_wrong_object_properties_name(self, properties, data):
- custom_properties = properties.copy()
- custom_properties["custom_document"] = CustomObjectWithoutArgs
- mongo_dn = MongoCollectionDataNode(
- "foo",
- Scope.SCENARIO,
- properties=custom_properties,
- )
- _DataManagerFactory._build_manager()._repository._save(mongo_dn)
- mongo_dn.write(data)
- with pytest.raises(TypeError):
- data = mongo_dn.read()
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- @pytest.mark.parametrize(
- "data",
- [
- ([{"foo": 11, "bar": 22}, {"foo": 33, "bar": 44}]),
- ({"foz": 1, "baz": 2}),
- ],
- )
- def test_append(self, properties, data):
- mongo_dn = MongoCollectionDataNode("foo", Scope.SCENARIO, properties=properties)
- _DataManagerFactory._build_manager()._repository._save(mongo_dn)
- mongo_dn.append(data)
- original_data = [{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}]
- mongo_dn.write(original_data)
- mongo_dn.append(data)
- assert len(mongo_dn.read()) == len(data if isinstance(data, list) else [data]) + len(original_data)
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- @pytest.mark.parametrize(
- "data,written_data",
- [
- ([{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}], [{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}]),
- ({"foo": 1, "bar": 2}, [{"foo": 1, "bar": 2}]),
- ],
- )
- def test_write(self, properties, data, written_data):
- mongo_dn = MongoCollectionDataNode("foo", Scope.SCENARIO, properties=properties)
- _DataManagerFactory._build_manager()._repository._save(mongo_dn)
- mongo_dn.write(data)
- read_objects = mongo_dn.read()
- for read_object, written_dict in zip(read_objects, written_data):
- assert isinstance(read_object._id, ObjectId)
- assert read_object.foo == written_dict["foo"]
- assert read_object.bar == written_dict["bar"]
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- @pytest.mark.parametrize(
- "data",
- [
- [],
- ],
- )
- def test_write_empty_list(self, properties, data):
- mongo_dn = MongoCollectionDataNode(
- "foo",
- Scope.SCENARIO,
- properties=properties,
- )
- _DataManagerFactory._build_manager()._repository._save(mongo_dn)
- mongo_dn.write(data)
- assert len(mongo_dn.read()) == 0
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- def test_write_non_serializable(self, properties):
- mongo_dn = MongoCollectionDataNode("foo", Scope.SCENARIO, properties=properties)
- data = {"a": 1, "b": mongo_dn}
- with pytest.raises(InvalidDocument):
- mongo_dn.write(data)
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- def test_write_custom_encoder(self, properties):
- custom_properties = properties.copy()
- custom_properties["custom_document"] = CustomObjectWithCustomEncoder
- mongo_dn = MongoCollectionDataNode("foo", Scope.SCENARIO, properties=custom_properties)
- _DataManagerFactory._build_manager()._repository._save(mongo_dn)
- data = [
- CustomObjectWithCustomEncoder("1", 1, "abc", datetime.now()),
- CustomObjectWithCustomEncoder("2", 2, "def", datetime.now()),
- ]
- mongo_dn.write(data)
- read_data = mongo_dn.read()
- assert isinstance(read_data[0], CustomObjectWithCustomEncoder)
- assert isinstance(read_data[1], CustomObjectWithCustomEncoder)
- assert read_data[0].id == "1"
- assert read_data[0].integer == 1
- assert read_data[0].text == "abc"
- assert isinstance(read_data[0].time, str)
- assert read_data[1].id == "2"
- assert read_data[1].integer == 2
- assert read_data[1].text == "def"
- assert isinstance(read_data[1].time, str)
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- def test_write_custom_encoder_decoder(self, properties):
- custom_properties = properties.copy()
- custom_properties["custom_document"] = CustomObjectWithCustomEncoderDecoder
- mongo_dn = MongoCollectionDataNode("foo", Scope.SCENARIO, properties=custom_properties)
- _DataManagerFactory._build_manager()._repository._save(mongo_dn)
- data = [
- CustomObjectWithCustomEncoderDecoder("1", 1, "abc", datetime.now()),
- CustomObjectWithCustomEncoderDecoder("2", 2, "def", datetime.now()),
- ]
- mongo_dn.write(data)
- read_data = mongo_dn.read()
- assert isinstance(read_data[0], CustomObjectWithCustomEncoderDecoder)
- assert isinstance(read_data[1], CustomObjectWithCustomEncoderDecoder)
- assert read_data[0].id == "1"
- assert read_data[0].integer == 1
- assert read_data[0].text == "abc"
- assert isinstance(read_data[0].time, datetime)
- assert read_data[1].id == "2"
- assert read_data[1].integer == 2
- assert read_data[1].text == "def"
- assert isinstance(read_data[1].time, datetime)
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- def test_filter(self, properties):
- mock_client = pymongo.MongoClient("localhost")
- mock_client[properties["db_name"]][properties["collection_name"]].insert_many(
- [
- {"foo": 1, "bar": 1},
- {"foo": 1, "bar": 2},
- {"foo": 1},
- {"foo": 2, "bar": 2},
- {"bar": 2},
- {"KWARGS_KEY": "KWARGS_VALUE"},
- ]
- )
- mongo_dn = MongoCollectionDataNode(
- "foo",
- Scope.SCENARIO,
- properties=properties,
- )
- assert len(mongo_dn.filter(("foo", 1, Operator.EQUAL))) == 3
- assert len(mongo_dn.filter(("foo", 1, Operator.NOT_EQUAL))) == 3
- assert len(mongo_dn.filter(("bar", 2, Operator.EQUAL))) == 3
- assert len(mongo_dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)) == 4
- assert mongo_dn["foo"] == [1, 1, 1, 2, None, None]
- assert mongo_dn["bar"] == [1, 2, None, 2, 2, None]
- assert [m.__dict__ for m in mongo_dn[:3]] == [m.__dict__ for m in mongo_dn.read()[:3]]
- assert mongo_dn[["foo", "bar"]] == [
- {"foo": 1, "bar": 1},
- {"foo": 1, "bar": 2},
- {"foo": 1},
- {"foo": 2, "bar": 2},
- {"bar": 2},
- {},
- ]
- @mongomock.patch(servers=(("localhost", 27017),))
- @pytest.mark.parametrize("properties", __properties)
- def test_filter_does_not_read_all_entities(self, properties):
- mongo_dn = MongoCollectionDataNode("foo", Scope.SCENARIO, properties=properties)
- # MongoCollectionDataNode.filter() should not call the MongoCollectionDataNode._read() method
- with patch.object(MongoCollectionDataNode, "_read") as read_mock:
- mongo_dn.filter(("foo", 1, Operator.EQUAL))
- mongo_dn.filter(("bar", 2, Operator.NOT_EQUAL))
- mongo_dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
- assert read_mock["_read"].call_count == 0
|