test_generic_data_node.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import pytest
  12. from taipy import Scope
  13. from taipy.common.config import Config
  14. from taipy.common.config.exceptions.exceptions import InvalidConfigurationId
  15. from taipy.core.data._data_manager_factory import _DataManagerFactory
  16. from taipy.core.data.data_node import DataNode
  17. from taipy.core.data.data_node_id import DataNodeId
  18. from taipy.core.data.generic import GenericDataNode
  19. from taipy.core.exceptions.exceptions import MissingReadFunction, MissingRequiredProperty, MissingWriteFunction
  20. def read_fct():
  21. return TestGenericDataNode.data
  22. def read_fct_with_args(inp):
  23. return [i + inp for i in TestGenericDataNode.data]
  24. def write_fct(data):
  25. data.append(data[-1] + 1)
  26. def write_fct_with_args(data, inp):
  27. for _ in range(inp):
  28. data.append(data[-1] + 1)
  29. def read_fct_modify_data_node_name(data_node_id: DataNodeId, name: str):
  30. import taipy.core as tp
  31. data_node = tp.get(data_node_id)
  32. assert isinstance(data_node, DataNode)
  33. data_node.name = name # type:ignore
  34. return data_node
  35. def reset_data():
  36. TestGenericDataNode.data = list(range(10))
  37. class TestGenericDataNode:
  38. data = list(range(10))
  39. def test_create_with_both_read_fct_and_write_fct(self):
  40. data_manager = _DataManagerFactory._build_manager()
  41. generic_dn_config = Config.configure_generic_data_node(
  42. id="foo_bar", read_fct=read_fct, write_fct=write_fct, name="super name"
  43. )
  44. dn = data_manager._create(generic_dn_config, None, None)
  45. assert isinstance(dn, GenericDataNode)
  46. assert dn.storage_type() == "generic"
  47. assert dn.config_id == "foo_bar"
  48. assert dn.name == "super name"
  49. assert dn.scope == Scope.SCENARIO
  50. assert dn.id is not None
  51. assert dn.owner_id is None
  52. assert dn.last_edit_date is not None
  53. assert dn.job_ids == []
  54. assert dn.is_ready_for_reading
  55. assert dn.properties["read_fct"] == read_fct
  56. assert dn.properties["write_fct"] == write_fct
  57. with pytest.raises(InvalidConfigurationId):
  58. GenericDataNode("foo bar", Scope.SCENARIO, properties={"read_fct": read_fct, "write_fct": write_fct})
  59. def test_create_with_read_fct_and_none_write_fct(self):
  60. data_manager = _DataManagerFactory._build_manager()
  61. generic_dn_config = Config.configure_generic_data_node(id="foo", read_fct=read_fct, write_fct=None, name="foo")
  62. dn = data_manager._create(generic_dn_config, None, None)
  63. assert isinstance(dn, GenericDataNode)
  64. assert dn.storage_type() == "generic"
  65. assert dn.config_id == "foo"
  66. assert dn.name == "foo"
  67. assert dn.scope == Scope.SCENARIO
  68. assert dn.id is not None
  69. assert dn.owner_id is None
  70. assert dn.last_edit_date is not None
  71. assert dn.job_ids == []
  72. assert dn.is_ready_for_reading
  73. assert dn.properties["read_fct"] == read_fct
  74. assert dn.properties["write_fct"] is None
  75. def test_create_with_write_fct_and_none_read_fct(self):
  76. data_manager = _DataManagerFactory._build_manager()
  77. generic_dn_config = Config.configure_generic_data_node(id="xyz", read_fct=None, write_fct=write_fct, name="xyz")
  78. dn = data_manager._create(generic_dn_config, None, None)
  79. assert isinstance(dn, GenericDataNode)
  80. assert dn.storage_type() == "generic"
  81. assert dn.config_id == "xyz"
  82. assert dn.name == "xyz"
  83. assert dn.scope == Scope.SCENARIO
  84. assert dn.id is not None
  85. assert dn.owner_id is None
  86. assert dn.last_edit_date is not None
  87. assert dn.job_ids == []
  88. assert dn.is_ready_for_reading
  89. assert dn.properties["read_fct"] is None
  90. assert dn.properties["write_fct"] == write_fct
  91. def test_create_with_read_fct(self):
  92. data_manager = _DataManagerFactory._build_manager()
  93. generic_dn_config = Config.configure_generic_data_node(id="acb", read_fct=read_fct, name="acb")
  94. dn = data_manager._create(generic_dn_config, None, None)
  95. assert isinstance(dn, GenericDataNode)
  96. assert dn.storage_type() == "generic"
  97. assert dn.config_id == "acb"
  98. assert dn.name == "acb"
  99. assert dn.scope == Scope.SCENARIO
  100. assert dn.id is not None
  101. assert dn.owner_id is None
  102. assert dn.last_edit_date is not None
  103. assert dn.job_ids == []
  104. assert dn.is_ready_for_reading
  105. assert dn.properties["read_fct"] == read_fct
  106. assert dn.properties["write_fct"] is None
  107. def test_create_with_write_fct(self):
  108. data_manager = _DataManagerFactory._build_manager()
  109. generic_dn_config = Config.configure_generic_data_node(id="mno", write_fct=write_fct, name="mno")
  110. dn = data_manager._create(generic_dn_config, None, None)
  111. assert isinstance(dn, GenericDataNode)
  112. assert dn.storage_type() == "generic"
  113. assert dn.config_id == "mno"
  114. assert dn.name == "mno"
  115. assert dn.scope == Scope.SCENARIO
  116. assert dn.id is not None
  117. assert dn.owner_id is None
  118. assert dn.last_edit_date is not None
  119. assert dn.job_ids == []
  120. assert dn.is_ready_for_reading
  121. assert dn.properties["read_fct"] is None
  122. assert dn.properties["write_fct"] == write_fct
  123. def test_get_user_properties(self):
  124. dn_1 = GenericDataNode(
  125. "dn_1",
  126. Scope.SCENARIO,
  127. properties={
  128. "read_fct": read_fct,
  129. "write_fct": write_fct,
  130. "read_fct_args": 1,
  131. "write_fct_args": 2,
  132. "foo": "bar",
  133. },
  134. )
  135. # read_fct, read_fct_args, write_fct, write_fct_args are filtered out
  136. assert dn_1._get_user_properties() == {"foo": "bar"}
  137. def test_create_with_missing_parameters(self):
  138. with pytest.raises(MissingRequiredProperty):
  139. GenericDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"))
  140. with pytest.raises(MissingRequiredProperty):
  141. GenericDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"), properties={})
  142. def test_read_write_generic_datanode(self):
  143. generic_dn = GenericDataNode("foo", Scope.SCENARIO, properties={"read_fct": read_fct, "write_fct": write_fct})
  144. _DataManagerFactory._build_manager()._repository._save(generic_dn)
  145. assert generic_dn.read() == self.data
  146. assert len(generic_dn.read()) == 10
  147. generic_dn.write(self.data)
  148. assert generic_dn.read() == self.data
  149. assert len(generic_dn.read()) == 11
  150. generic_dn_1 = GenericDataNode("bar", Scope.SCENARIO, properties={"read_fct": read_fct, "write_fct": None})
  151. assert generic_dn_1.read() == self.data
  152. assert len(generic_dn_1.read()) == 11
  153. with pytest.raises(MissingWriteFunction):
  154. generic_dn_1.write(self.data)
  155. generic_dn_2 = GenericDataNode("xyz", Scope.SCENARIO, properties={"read_fct": None, "write_fct": write_fct})
  156. _DataManagerFactory._build_manager()._repository._save(generic_dn_2)
  157. generic_dn_2.write(self.data)
  158. assert len(self.data) == 12
  159. with pytest.raises(MissingReadFunction):
  160. generic_dn_2.read()
  161. generic_dn_3 = GenericDataNode("bar", Scope.SCENARIO, properties={"read_fct": None, "write_fct": None})
  162. _DataManagerFactory._build_manager()._repository._save(generic_dn_3)
  163. with pytest.raises(MissingReadFunction):
  164. generic_dn_3.read()
  165. with pytest.raises(MissingWriteFunction):
  166. generic_dn_3.write(self.data)
  167. reset_data()
  168. def test_read_write_generic_datanode_with_arguments(self):
  169. generic_dn = GenericDataNode(
  170. "foo",
  171. Scope.SCENARIO,
  172. properties={
  173. "read_fct": read_fct_with_args,
  174. "write_fct": write_fct_with_args,
  175. "read_fct_args": [1],
  176. "write_fct_args": [2],
  177. },
  178. )
  179. _DataManagerFactory._build_manager()._repository._save(generic_dn)
  180. assert all(a + 1 == b for a, b in zip(self.data, generic_dn.read()))
  181. assert len(generic_dn.read()) == 10
  182. generic_dn.write(self.data)
  183. assert len(generic_dn.read()) == 12
  184. reset_data()
  185. def test_read_write_generic_datanode_with_non_list_arguments(self):
  186. generic_dn = GenericDataNode(
  187. "foo",
  188. Scope.SCENARIO,
  189. properties={
  190. "read_fct": read_fct_with_args,
  191. "write_fct": write_fct_with_args,
  192. "read_fct_args": 1,
  193. "write_fct_args": 2,
  194. },
  195. )
  196. _DataManagerFactory._build_manager()._repository._save(generic_dn)
  197. assert all(a + 1 == b for a, b in zip(self.data, generic_dn.read()))
  198. assert len(generic_dn.read()) == 10
  199. generic_dn.write(self.data)
  200. assert len(generic_dn.read()) == 12
  201. reset_data()
  202. def test_save_data_node_when_read(self):
  203. generic_dn = GenericDataNode(
  204. "foo", Scope.SCENARIO, properties={"read_fct": read_fct_modify_data_node_name, "write_fct": write_fct}
  205. )
  206. _DataManagerFactory._build_manager()._repository._save(generic_dn)
  207. generic_dn._properties["read_fct_args"] = (generic_dn.id, "bar")
  208. generic_dn.read()
  209. assert generic_dn.name == "bar"