123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- # Copyright 2021-2024 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import pytest
- from taipy.config.common.scope import Scope
- from taipy.config.exceptions.exceptions import InvalidConfigurationId
- from taipy.core.data.data_node import DataNode
- from taipy.core.data.data_node_id import DataNodeId
- from taipy.core.data.generic import GenericDataNode
- from taipy.core.exceptions.exceptions import MissingReadFunction, MissingRequiredProperty, MissingWriteFunction
- def read_fct():
- return TestGenericDataNode.data
- def read_fct_with_args(inp):
- return [i + inp for i in TestGenericDataNode.data]
- def write_fct(data):
- data.append(data[-1] + 1)
- def write_fct_with_args(data, inp):
- for _ in range(inp):
- data.append(data[-1] + 1)
- def read_fct_modify_data_node_name(data_node_id: DataNodeId, name: str):
- import taipy.core as tp
- data_node = tp.get(data_node_id)
- assert isinstance(data_node, DataNode)
- data_node.name = name # type:ignore
- return data_node
- def reset_data():
- TestGenericDataNode.data = list(range(10))
- class TestGenericDataNode:
- data = list(range(10))
- def test_create(self):
- dn = GenericDataNode(
- "foo_bar", Scope.SCENARIO, properties={"read_fct": read_fct, "write_fct": write_fct, "name": "super name"}
- )
- assert isinstance(dn, GenericDataNode)
- assert dn.storage_type() == "generic"
- assert dn.config_id == "foo_bar"
- assert dn.name == "super name"
- assert dn.scope == Scope.SCENARIO
- assert dn.id is not None
- assert dn.owner_id is None
- assert dn.last_edit_date is not None
- assert dn.job_ids == []
- assert dn.is_ready_for_reading
- assert dn.properties["read_fct"] == read_fct
- assert dn.properties["write_fct"] == write_fct
- dn_1 = GenericDataNode(
- "foo", Scope.SCENARIO, properties={"read_fct": read_fct, "write_fct": None, "name": "foo"}
- )
- assert isinstance(dn, GenericDataNode)
- assert dn_1.storage_type() == "generic"
- assert dn_1.config_id == "foo"
- assert dn_1.name == "foo"
- assert dn_1.scope == Scope.SCENARIO
- assert dn_1.id is not None
- assert dn_1.owner_id is None
- assert dn_1.last_edit_date is not None
- assert dn_1.job_ids == []
- assert dn_1.is_ready_for_reading
- assert dn_1.properties["read_fct"] == read_fct
- assert dn_1.properties["write_fct"] is None
- dn_2 = GenericDataNode(
- "xyz", Scope.SCENARIO, properties={"read_fct": None, "write_fct": write_fct, "name": "xyz"}
- )
- assert isinstance(dn, GenericDataNode)
- assert dn_2.storage_type() == "generic"
- assert dn_2.config_id == "xyz"
- assert dn_2.name == "xyz"
- assert dn_2.scope == Scope.SCENARIO
- assert dn_2.id is not None
- assert dn_2.owner_id is None
- assert dn_2.last_edit_date is not None
- assert dn_2.job_ids == []
- assert dn_2.is_ready_for_reading
- assert dn_2.properties["read_fct"] is None
- assert dn_2.properties["write_fct"] == write_fct
- dn_3 = GenericDataNode("xyz", Scope.SCENARIO, properties={"read_fct": read_fct, "name": "xyz"})
- assert isinstance(dn, GenericDataNode)
- assert dn_3.storage_type() == "generic"
- assert dn_3.config_id == "xyz"
- assert dn_3.name == "xyz"
- assert dn_3.scope == Scope.SCENARIO
- assert dn_3.id is not None
- assert dn_3.owner_id is None
- assert dn_3.last_edit_date is not None
- assert dn_3.job_ids == []
- assert dn_3.is_ready_for_reading
- assert dn_3.properties["read_fct"] == read_fct
- assert dn_3.properties["write_fct"] is None
- dn_4 = GenericDataNode("xyz", Scope.SCENARIO, properties={"write_fct": write_fct, "name": "xyz"})
- assert isinstance(dn, GenericDataNode)
- assert dn_4.storage_type() == "generic"
- assert dn_4.config_id == "xyz"
- assert dn_4.name == "xyz"
- assert dn_4.scope == Scope.SCENARIO
- assert dn_4.id is not None
- assert dn_4.owner_id is None
- assert dn_4.last_edit_date is not None
- assert dn_4.job_ids == []
- assert dn_4.is_ready_for_reading
- assert dn_4.properties["read_fct"] is None
- assert dn_4.properties["write_fct"] == write_fct
- with pytest.raises(InvalidConfigurationId):
- GenericDataNode("foo bar", Scope.SCENARIO, properties={"read_fct": read_fct, "write_fct": write_fct})
- def test_get_user_properties(self):
- dn_1 = GenericDataNode(
- "dn_1",
- Scope.SCENARIO,
- properties={
- "read_fct": read_fct,
- "write_fct": write_fct,
- "read_fct_args": 1,
- "write_fct_args": 2,
- "foo": "bar",
- },
- )
- # read_fct, read_fct_args, write_fct, write_fct_args are filtered out
- assert dn_1._get_user_properties() == {"foo": "bar"}
- def test_create_with_missing_parameters(self):
- with pytest.raises(MissingRequiredProperty):
- GenericDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"))
- with pytest.raises(MissingRequiredProperty):
- GenericDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"), properties={})
- def test_read_write_generic_datanode(self):
- generic_dn = GenericDataNode("foo", Scope.SCENARIO, properties={"read_fct": read_fct, "write_fct": write_fct})
- assert generic_dn.read() == self.data
- assert len(generic_dn.read()) == 10
- generic_dn.write(self.data)
- assert generic_dn.read() == self.data
- assert len(generic_dn.read()) == 11
- generic_dn_1 = GenericDataNode("bar", Scope.SCENARIO, properties={"read_fct": read_fct, "write_fct": None})
- assert generic_dn_1.read() == self.data
- assert len(generic_dn_1.read()) == 11
- with pytest.raises(MissingWriteFunction):
- generic_dn_1.write(self.data)
- generic_dn_2 = GenericDataNode("xyz", Scope.SCENARIO, properties={"read_fct": None, "write_fct": write_fct})
- generic_dn_2.write(self.data)
- assert len(self.data) == 12
- with pytest.raises(MissingReadFunction):
- generic_dn_2.read()
- generic_dn_3 = GenericDataNode("bar", Scope.SCENARIO, properties={"read_fct": None, "write_fct": None})
- with pytest.raises(MissingReadFunction):
- generic_dn_3.read()
- with pytest.raises(MissingWriteFunction):
- generic_dn_3.write(self.data)
- reset_data()
- def test_read_write_generic_datanode_with_arguments(self):
- generic_dn = GenericDataNode(
- "foo",
- Scope.SCENARIO,
- properties={
- "read_fct": read_fct_with_args,
- "write_fct": write_fct_with_args,
- "read_fct_args": [1],
- "write_fct_args": [2],
- },
- )
- assert all(a + 1 == b for a, b in zip(self.data, generic_dn.read()))
- assert len(generic_dn.read()) == 10
- generic_dn.write(self.data)
- assert len(generic_dn.read()) == 12
- reset_data()
- def test_read_write_generic_datanode_with_non_list_arguments(self):
- generic_dn = GenericDataNode(
- "foo",
- Scope.SCENARIO,
- properties={
- "read_fct": read_fct_with_args,
- "write_fct": write_fct_with_args,
- "read_fct_args": 1,
- "write_fct_args": 2,
- },
- )
- assert all(a + 1 == b for a, b in zip(self.data, generic_dn.read()))
- assert len(generic_dn.read()) == 10
- generic_dn.write(self.data)
- assert len(generic_dn.read()) == 12
- reset_data()
- def test_save_data_node_when_read(self):
- generic_dn = GenericDataNode(
- "foo", Scope.SCENARIO, properties={"read_fct": read_fct_modify_data_node_name, "write_fct": write_fct}
- )
- generic_dn._properties["read_fct_args"] = (generic_dn.id, "bar")
- generic_dn.read()
- assert generic_dn.name == "bar"
|