浏览代码

fixed parquet cannot write custom exposed type

Toan Quach 1 年之前
父节点
当前提交
f09a57bace

+ 8 - 6
taipy/core/data/parquet.py

@@ -158,6 +158,8 @@ class ParquetDataNode(DataNode, _AbstractFileDataNode, _AbstractTabularDataNode)
             editor_expiration_date,
             **properties,
         )
+        _AbstractTabularDataNode.__init__(self, **properties)
+
         self._path = properties.get(self.__PATH_KEY, properties.get(self.__DEFAULT_PATH_KEY))
 
         if self._path and ".data" in self._path:
@@ -249,13 +251,13 @@ class ParquetDataNode(DataNode, _AbstractFileDataNode, _AbstractTabularDataNode)
         }
         kwargs.update(self.properties[self.__WRITE_KWARGS_PROPERTY])
         kwargs.update(write_kwargs)
-        if isinstance(data, pd.DataFrame):
-            data.to_parquet(self._path, **kwargs)
+        if isinstance(data, pd.Series):
+            df = pd.DataFrame(data)
         else:
-            _df = pd.DataFrame(data)
-            # Ensure that the columns are strings, otherwise writing will fail with pandas 1.3.5
-            _df.columns = _df.columns.astype(str)
-            _df.to_parquet(self._path, **kwargs)
+            df = self._convert_data_to_dataframe(self.properties[self._EXPOSED_TYPE_PROPERTY], data)
+
+        df.columns = df.columns.astype(str)
+        df.to_parquet(self._path, **kwargs)
         self.track_edit(timestamp=datetime.now(), job_id=job_id)
 
     def read_with_kwargs(self, **read_kwargs):

+ 139 - 0
tests/core/data/test_filter_parquet_data_node.py

@@ -0,0 +1,139 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import os
+import pathlib
+from importlib import util
+
+import numpy as np
+import pandas as pd
+import pytest
+from pandas.testing import assert_frame_equal
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.operator import JoinOperator, Operator
+from taipy.core.data.parquet import ParquetDataNode
+
+
+@pytest.fixture(scope="function", autouse=True)
+def cleanup():
+    yield
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
+    if os.path.isfile(path):
+        os.remove(path)
+
+
+class MyCustomObject:
+    def __init__(self, id, integer, text):
+        self.id = id
+        self.integer = integer
+        self.text = text
+
+
+class MyOtherCustomObject:
+    def __init__(self, id, sentence):
+        self.id = id
+        self.sentence = sentence
+
+
+def create_custom_class(**kwargs):
+    return MyOtherCustomObject(id=kwargs["id"], sentence=kwargs["text"])
+
+
+class TestFilterParquetDataNode:
+    __engine = ["pyarrow"]
+    if util.find_spec("fastparquet"):
+        __engine.append("fastparquet")
+
+    def test_filter_pandas_exposed_type(self, parquet_file_path):
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "pandas"})
+        dn.write(
+            [
+                {"foo": 1, "bar": 1},
+                {"foo": 1, "bar": 2},
+                {"foo": 1},
+                {"foo": 2, "bar": 2},
+                {"bar": 2},
+            ]
+        )
+
+        # Test datanode indexing and slicing
+        assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None]))
+        assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2]))
+        assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
+
+        # Test filter data
+        filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
+        filtered_by_indexing = dn[dn["foo"] == 1]
+        expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+        filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
+        filtered_by_indexing = dn[dn["foo"] != 1]
+        expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+        filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
+        filtered_by_indexing = dn[dn["bar"] == 2]
+        expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+        filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
+        filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
+        expected_data = pd.DataFrame(
+            [
+                {"foo": 1.0, "bar": 1.0},
+                {"foo": 1.0, "bar": 2.0},
+                {"foo": 2.0, "bar": 2.0},
+                {"bar": 2.0},
+            ]
+        )
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+    def test_filter_numpy_exposed_type(self, parquet_file_path):
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy"})
+        dn.write(
+            [
+                [1, 1],
+                [1, 2],
+                [1, 3],
+                [2, 1],
+                [2, 2],
+                [2, 3],
+            ]
+        )
+
+        # Test datanode indexing and slicing
+        assert np.array_equal(dn[0], np.array([1, 1]))
+        assert np.array_equal(dn[1], np.array([1, 2]))
+        assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
+        assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
+        assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
+
+        # Test filter data
+        assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
+        assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
+
+        assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
+        assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
+
+        assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
+        assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
+
+        assert np.array_equal(
+            dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR),
+            np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
+        )
+        assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))

+ 0 - 327
tests/core/data/test_parquet_data_node.py

@@ -15,21 +15,17 @@ from datetime import datetime
 from importlib import util
 from time import sleep
 
-import numpy as np
 import pandas as pd
 import pytest
-from pandas.testing import assert_frame_equal
 
 from taipy.config.common.scope import Scope
 from taipy.config.config import Config
 from taipy.config.exceptions.exceptions import InvalidConfigurationId
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.data.data_node_id import DataNodeId
-from taipy.core.data.operator import JoinOperator, Operator
 from taipy.core.data.parquet import ParquetDataNode
 from taipy.core.exceptions.exceptions import (
     InvalidExposedType,
-    NoData,
     UnknownCompressionAlgorithm,
     UnknownParquetEngine,
 )
@@ -143,81 +139,12 @@ class TestParquetDataNode:
         data_modin = parquet_data_node_as_modin.read()
         assert isinstance(data_modin, pd.DataFrame)
 
-    @pytest.mark.parametrize("engine", __engine)
-    def test_read_file(self, engine, parquet_file_path):
-        not_existing_parquet = ParquetDataNode(
-            "foo", Scope.SCENARIO, properties={"path": "nonexistent.parquet", "engine": engine}
-        )
-        with pytest.raises(NoData):
-            assert not_existing_parquet.read() is None
-            not_existing_parquet.read_or_raise()
-
-        df = pd.read_parquet(parquet_file_path)
-        # Create ParquetDataNode without exposed_type (Default is pandas.DataFrame)
-        parquet_data_node_as_pandas = ParquetDataNode(
-            "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "engine": engine}
-        )
-        data_pandas = parquet_data_node_as_pandas.read()
-        assert isinstance(data_pandas, pd.DataFrame)
-        assert len(data_pandas) == 2
-        assert data_pandas.equals(df)
-        assert np.array_equal(data_pandas.to_numpy(), df.to_numpy())
-
-        # Create ParquetDataNode with numpy exposed_type
-        parquet_data_node_as_numpy = ParquetDataNode(
-            "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy", "engine": engine}
-        )
-        data_numpy = parquet_data_node_as_numpy.read()
-        assert isinstance(data_numpy, np.ndarray)
-        assert len(data_numpy) == 2
-        assert np.array_equal(data_numpy, df.to_numpy())
-
-    @pytest.mark.parametrize("engine", __engine)
-    def test_read_folder(self, engine):
-        parquet_folder_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
-
-        df = pd.read_parquet(parquet_folder_path)
-        parquet_data_node_as_pandas = ParquetDataNode(
-            "bar", Scope.SCENARIO, properties={"path": parquet_folder_path, "engine": engine}
-        )
-        data_pandas = parquet_data_node_as_pandas.read()
-        assert isinstance(data_pandas, pd.DataFrame)
-        assert len(data_pandas) == 5
-        assert data_pandas.equals(df)
-        assert np.array_equal(data_pandas.to_numpy(), df.to_numpy())
-
     def test_set_path(self):
         dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": "foo.parquet"})
         assert dn.path == "foo.parquet"
         dn.path = "bar.parquet"
         assert dn.path == "bar.parquet"
 
-    @pytest.mark.parametrize("engine", __engine)
-    def test_read_write_after_modify_path(self, engine):
-        path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
-        new_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": engine})
-        read_data = dn.read()
-        assert read_data is not None
-        dn.path = new_path
-        with pytest.raises(FileNotFoundError):
-            dn.read()
-        dn.write(read_data)
-        assert dn.read().equals(read_data)
-
-    def test_read_custom_exposed_type(self):
-        example_parquet_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
-
-        dn = ParquetDataNode(
-            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": MyCustomObject}
-        )
-        assert all(isinstance(obj, MyCustomObject) for obj in dn.read())
-
-        dn = ParquetDataNode(
-            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": create_custom_class}
-        )
-        assert all(isinstance(obj, MyOtherCustomObject) for obj in dn.read())
-
     def test_raise_error_unknown_parquet_engine(self):
         path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
         with pytest.raises(UnknownParquetEngine):
@@ -233,23 +160,6 @@ class TestParquetDataNode:
         with pytest.raises(InvalidExposedType):
             ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "foo"})
 
-    def test_read_empty_data(self, tmpdir_factory):
-        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
-        empty_df = pd.DataFrame([])
-        empty_df.to_parquet(temp_file_path)
-
-        # Pandas
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
-        assert dn.read().equals(empty_df)
-
-        # Numpy
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "numpy"})
-        assert np.array_equal(dn.read(), empty_df.to_numpy())
-
-        # Custom
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": MyCustomObject})
-        assert dn.read() == []
-
     def test_get_system_file_modified_date_instead_of_last_edit_date(self, tmpdir_factory):
         temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
         pd.DataFrame([]).to_parquet(temp_file_path)
@@ -297,243 +207,6 @@ class TestParquetDataNode:
 
         os.unlink(temp_file_path)
 
-    @pytest.mark.skipif(not util.find_spec("fastparquet"), reason="Append parquet requires fastparquet to be installed")
-    @pytest.mark.parametrize(
-        "content",
-        [
-            ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
-            (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
-        ],
-    )
-    def test_append_pandas(self, parquet_file_path, default_data_frame, content):
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path})
-        assert_frame_equal(dn.read(), default_data_frame)
-
-        dn.append(content)
-        assert_frame_equal(
-            dn.read(),
-            pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
-        )
-
-    @pytest.mark.parametrize(
-        "data",
-        [
-            [{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}],
-            pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
-        ],
-    )
-    def test_write_to_disk(self, tmpdir_factory, data):
-        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path})
-        dn.write(data)
-
-        assert pathlib.Path(temp_file_path).exists()
-        assert isinstance(dn.read(), pd.DataFrame)
-
-    def test_filter_pandas_exposed_type(self, parquet_file_path):
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "pandas"})
-        dn.write(
-            [
-                {"foo": 1, "bar": 1},
-                {"foo": 1, "bar": 2},
-                {"foo": 1},
-                {"foo": 2, "bar": 2},
-                {"bar": 2},
-            ]
-        )
-
-        # Test datanode indexing and slicing
-        assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None]))
-        assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2]))
-        assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
-
-        # Test filter data
-        filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
-        filtered_by_indexing = dn[dn["foo"] == 1]
-        expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
-        filtered_by_indexing = dn[dn["foo"] != 1]
-        expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
-        filtered_by_indexing = dn[dn["bar"] == 2]
-        expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
-        filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
-        expected_data = pd.DataFrame(
-            [
-                {"foo": 1.0, "bar": 1.0},
-                {"foo": 1.0, "bar": 2.0},
-                {"foo": 2.0, "bar": 2.0},
-                {"bar": 2.0},
-            ]
-        )
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-    def test_filter_numpy_exposed_type(self, parquet_file_path):
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy"})
-        dn.write(
-            [
-                [1, 1],
-                [1, 2],
-                [1, 3],
-                [2, 1],
-                [2, 2],
-                [2, 3],
-            ]
-        )
-
-        # Test datanode indexing and slicing
-        assert np.array_equal(dn[0], np.array([1, 1]))
-        assert np.array_equal(dn[1], np.array([1, 2]))
-        assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
-        assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
-        assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
-
-        # Test filter data
-        assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
-        assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
-
-        assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
-        assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
-
-        assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
-        assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
-
-        assert np.array_equal(
-            dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR),
-            np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
-        )
-        assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))
-
-    @pytest.mark.parametrize("engine", __engine)
-    def test_pandas_parquet_config_kwargs(self, engine, tmpdir_factory):
-        read_kwargs = {"filters": [("integer", "<", 10)], "columns": ["integer"]}
-        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
-        dn = ParquetDataNode(
-            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "read_kwargs": read_kwargs}
-        )
-
-        df = pd.read_csv(os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv"))
-        dn.write(df)
-
-        assert set(pd.read_parquet(temp_file_path).columns) == {"id", "integer", "text"}
-        assert set(dn.read().columns) == set(read_kwargs["columns"])
-
-        # !!! filter doesn't work with `fastparquet` without partition_cols
-        if engine == "pyarrow":
-            assert len(dn.read()) != len(df)
-            assert len(dn.read()) == 2
-
-    @pytest.mark.parametrize("engine", __engine)
-    def test_kwarg_precedence(self, engine, tmpdir_factory, default_data_frame):
-        # Precedence:
-        # 1. Class read/write methods
-        # 2. Defined in read_kwargs and write_kwargs, in properties
-        # 3. Defined top-level in properties
-
-        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
-        temp_file_2_path = str(tmpdir_factory.mktemp("data").join("temp_2.parquet"))
-        df = default_data_frame.copy(deep=True)
-
-        # Write
-        # 3
-        comp3 = "snappy"
-        dn = ParquetDataNode(
-            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "compression": comp3}
-        )
-        dn.write(df)
-        df.to_parquet(path=temp_file_2_path, compression=comp3, engine=engine)
-        with open(temp_file_2_path, "rb") as tf:
-            with pathlib.Path(temp_file_path).open("rb") as f:
-                assert f.read() == tf.read()
-
-        # 3 and 2
-        comp2 = "gzip"
-        dn = ParquetDataNode(
-            "foo",
-            Scope.SCENARIO,
-            properties={
-                "path": temp_file_path,
-                "engine": engine,
-                "compression": comp3,
-                "write_kwargs": {"compression": comp2},
-            },
-        )
-        dn.write(df)
-        df.to_parquet(path=temp_file_2_path, compression=comp2, engine=engine)
-        with open(temp_file_2_path, "rb") as tf:
-            with pathlib.Path(temp_file_path).open("rb") as f:
-                assert f.read() == tf.read()
-
-        # 3, 2 and 1
-        comp1 = "brotli"
-        dn = ParquetDataNode(
-            "foo",
-            Scope.SCENARIO,
-            properties={
-                "path": temp_file_path,
-                "engine": engine,
-                "compression": comp3,
-                "write_kwargs": {"compression": comp2},
-            },
-        )
-        dn.write_with_kwargs(df, compression=comp1)
-        df.to_parquet(path=temp_file_2_path, compression=comp1, engine=engine)
-        with open(temp_file_2_path, "rb") as tf:
-            with pathlib.Path(temp_file_path).open("rb") as f:
-                assert f.read() == tf.read()
-
-        # Read
-        df.to_parquet(temp_file_path, engine=engine)
-        # 2
-        cols2 = ["a", "b"]
-        dn = ParquetDataNode(
-            "foo",
-            Scope.SCENARIO,
-            properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
-        )
-        assert set(dn.read().columns) == set(cols2)
-
-        # 1
-        cols1 = ["a"]
-        dn = ParquetDataNode(
-            "foo",
-            Scope.SCENARIO,
-            properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
-        )
-        assert set(dn.read_with_kwargs(columns=cols1).columns) == set(cols1)
-
-    def test_partition_cols(self, tmpdir_factory, default_data_frame: pd.DataFrame):
-        temp_dir_path = str(tmpdir_factory.mktemp("data").join("temp_dir"))
-
-        write_kwargs = {"partition_cols": ["a", "b"]}
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_dir_path, "write_kwargs": write_kwargs})  # type: ignore
-        dn.write(default_data_frame)
-
-        assert pathlib.Path(temp_dir_path).is_dir()
-        # dtypes change during round-trip with partition_cols
-        pd.testing.assert_frame_equal(
-            dn.read().sort_index(axis=1),
-            default_data_frame.sort_index(axis=1),
-            check_dtype=False,
-            check_categorical=False,
-        )
-
-    def test_read_with_kwargs_never_written(self):
-        path = "data/node/path"
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path})
-        assert dn.read_with_kwargs() is None
-
     def test_migrate_to_new_path(self, tmp_path):
         _base_path = os.path.join(tmp_path, ".data")
         path = os.path.join(_base_path, "test.parquet")

+ 188 - 0
tests/core/data/test_read_parquet_data_node.py

@@ -0,0 +1,188 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import os
+import pathlib
+from importlib import util
+
+import numpy as np
+import pandas as pd
+import pytest
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.parquet import ParquetDataNode
+from taipy.core.exceptions.exceptions import NoData
+
+
+@pytest.fixture(scope="function", autouse=True)
+def cleanup():
+    yield
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
+    if os.path.isfile(path):
+        os.remove(path)
+
+
+class MyCustomObject:
+    def __init__(self, id, integer, text):
+        self.id = id
+        self.integer = integer
+        self.text = text
+
+
+class MyOtherCustomObject:
+    def __init__(self, id, sentence):
+        self.id = id
+        self.sentence = sentence
+
+
+class MyCustomXYObject:
+    def __init__(self, x, y):
+        self.x = x
+        self.y = y
+
+
+def create_custom_class(**kwargs):
+    return MyOtherCustomObject(id=kwargs["id"], sentence=kwargs["text"])
+
+
+def create_custom_xy_class(**kwargs):
+    return MyCustomXYObject(x=kwargs["x"], y=kwargs["y"])
+
+
+class TestReadParquetDataNode:
+    __engine = ["pyarrow"]
+    if util.find_spec("fastparquet"):
+        __engine.append("fastparquet")
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_raise_no_data(self, engine, parquet_file_path):
+        not_existing_parquet = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": "nonexistent.parquet", "engine": engine}
+        )
+        with pytest.raises(NoData):
+            assert not_existing_parquet.read() is None
+            not_existing_parquet.read_or_raise()
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_parquet_file_pandas(self, engine, parquet_file_path):
+        df = pd.read_parquet(parquet_file_path)
+        parquet_data_node_as_pandas = ParquetDataNode(
+            "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "engine": engine}
+        )
+        data_pandas = parquet_data_node_as_pandas.read()
+        assert isinstance(data_pandas, pd.DataFrame)
+        assert len(data_pandas) == 2
+        assert data_pandas.equals(df)
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_parquet_file_numpy(self, engine, parquet_file_path):
+        df = pd.read_parquet(parquet_file_path)
+        parquet_data_node_as_numpy = ParquetDataNode(
+            "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy", "engine": engine}
+        )
+        data_numpy = parquet_data_node_as_numpy.read()
+        assert isinstance(data_numpy, np.ndarray)
+        assert len(data_numpy) == 2
+        assert np.array_equal(data_numpy, df.to_numpy())
+
+    def test_read_custom_exposed_type(self):
+        example_parquet_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
+
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": MyCustomObject}
+        )
+        assert all(isinstance(obj, MyCustomObject) for obj in dn.read())
+
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": create_custom_class}
+        )
+        assert all(isinstance(obj, MyOtherCustomObject) for obj in dn.read())
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_parquet_folder_pandas(self, engine):
+        parquet_folder_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
+
+        df = pd.read_parquet(parquet_folder_path)
+        parquet_data_node_as_pandas = ParquetDataNode(
+            "bar", Scope.SCENARIO, properties={"path": parquet_folder_path, "engine": engine}
+        )
+        data_pandas = parquet_data_node_as_pandas.read()
+        assert isinstance(data_pandas, pd.DataFrame)
+        assert len(data_pandas) == 5
+        assert data_pandas.equals(df)
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_parquet_folder_numpy(self, engine):
+        parquet_folder_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
+
+        df = pd.read_parquet(parquet_folder_path)
+        parquet_data_node_as_pandas = ParquetDataNode(
+            "bar", Scope.SCENARIO, properties={"path": parquet_folder_path, "engine": engine, "exposed_type": "numpy"}
+        )
+        data_numpy = parquet_data_node_as_pandas.read()
+        assert isinstance(data_numpy, np.ndarray)
+        assert len(data_numpy) == 5
+        assert np.array_equal(data_numpy, df.to_numpy())
+
+    def test_read_folder_custom_exposed_type(self):
+        example_parquet_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
+
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": MyCustomXYObject}
+        )
+        dn.read()
+        assert all(isinstance(obj, MyCustomXYObject) for obj in dn.read())
+
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": create_custom_xy_class}
+        )
+        assert all(isinstance(obj, MyCustomXYObject) for obj in dn.read())
+
+    def test_read_empty_data(self, tmpdir_factory):
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        empty_df = pd.DataFrame([])
+        empty_df.to_parquet(temp_file_path)
+
+        # Pandas
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
+        assert dn.read().equals(empty_df)
+
+        # Numpy
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "numpy"})
+        assert np.array_equal(dn.read(), empty_df.to_numpy())
+
+        # Custom
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": MyCustomObject})
+        assert dn.read() == []
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_pandas_parquet_config_kwargs(self, engine, tmpdir_factory):
+        read_kwargs = {"filters": [("integer", "<", 10)], "columns": ["integer"]}
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "read_kwargs": read_kwargs}
+        )
+
+        df = pd.read_csv(os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv"))
+        dn.write(df)
+
+        assert set(pd.read_parquet(temp_file_path).columns) == {"id", "integer", "text"}
+        assert set(dn.read().columns) == set(read_kwargs["columns"])
+
+        # !!! filter doesn't work with `fastparquet` without partition_cols
+        if engine == "pyarrow":
+            assert len(dn.read()) != len(df)
+            assert len(dn.read()) == 2
+
+    def test_read_with_kwargs_never_written(self):
+        path = "data/node/path"
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path})
+        assert dn.read_with_kwargs() is None

+ 236 - 0
tests/core/data/test_write_parquet_data_node.py

@@ -0,0 +1,236 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import os
+import pathlib
+from importlib import util
+
+import numpy as np
+import pandas as pd
+import pytest
+from pandas.testing import assert_frame_equal
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.parquet import ParquetDataNode
+
+
+@pytest.fixture(scope="function", autouse=True)
+def cleanup():
+    yield
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
+    if os.path.isfile(path):
+        os.remove(path)
+
+
+class MyCustomObject:
+    def __init__(self, id, integer, text):
+        self.id = id
+        self.integer = integer
+        self.text = text
+
+    def __eq__(self, value) -> bool:
+        return self.id == value.id and self.integer == value.integer and self.text == value.text
+
+
+class MyOtherCustomObject:
+    def __init__(self, id, sentence):
+        self.id = id
+        self.sentence = sentence
+
+
+def create_custom_class(**kwargs):
+    return MyOtherCustomObject(id=kwargs["id"], sentence=kwargs["text"])
+
+
+class TestWriteParquetDataNode:
+    __engine = ["pyarrow"]
+    if util.find_spec("fastparquet"):
+        __engine.append("fastparquet")
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_write_after_modify_path(self, engine):
+        path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
+        new_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": engine})
+        read_data = dn.read()
+        assert read_data is not None
+        dn.path = new_path
+        with pytest.raises(FileNotFoundError):
+            dn.read()
+        dn.write(read_data)
+        assert dn.read().equals(read_data)
+
+    def test_write_pandas(self, tmpdir_factory):
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        parquet_dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path})
+
+        df = pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])
+        parquet_dn.write(df)
+
+        assert pathlib.Path(temp_file_path).exists()
+
+        dn_data = parquet_dn.read()
+
+        assert isinstance(dn_data, pd.DataFrame)
+        assert dn_data.equals(df)
+
+        parquet_dn.write(df["a"])
+        assert pd.DataFrame.equals(parquet_dn.read(), df[["a"]])
+
+        series = pd.Series([1, 2, 3])
+        parquet_dn.write(series)
+        assert np.array_equal(parquet_dn.read().to_numpy(), pd.DataFrame(series).to_numpy())
+
+        parquet_dn.write(None)
+        assert parquet_dn.read().empty
+
+    def test_write_numpy(self, tmpdir_factory):
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        parquet_dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "numpy"}
+        )
+
+        arr = np.array([[1], [2], [3], [4], [5]])
+        parquet_dn.write(arr)
+        assert np.array_equal(parquet_dn.read(), arr)
+
+        arr = arr[0:3]
+        parquet_dn.write(arr)
+        assert np.array_equal(parquet_dn.read(), arr)
+
+        parquet_dn.write(None)
+        assert parquet_dn.read().size == 0
+
+    def test_write_custom_exposed_type(self, tmpdir_factory):
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        parquet_dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": MyCustomObject}
+        )
+
+        data = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]
+        parquet_dn.write(data)
+        assert all(actual == expected for actual, expected in zip(parquet_dn.read(), data))
+
+        parquet_dn.write(None)
+        assert parquet_dn.read() == []
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_write_kwarg_precedence(self, engine, tmpdir_factory, default_data_frame):
+        # Precedence:
+        # 1. Class read/write methods
+        # 2. Defined in read_kwargs and write_kwargs, in properties
+        # 3. Defined top-level in properties
+
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        temp_file_2_path = str(tmpdir_factory.mktemp("data").join("temp_2.parquet"))
+        df = default_data_frame.copy(deep=True)
+
+        # Write
+        # 3
+        comp3 = "snappy"
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "compression": comp3}
+        )
+        dn.write(df)
+        df.to_parquet(path=temp_file_2_path, compression=comp3, engine=engine)
+        with open(temp_file_2_path, "rb") as tf:
+            with pathlib.Path(temp_file_path).open("rb") as f:
+                assert f.read() == tf.read()
+
+        # 3 and 2
+        comp2 = "gzip"
+        dn = ParquetDataNode(
+            "foo",
+            Scope.SCENARIO,
+            properties={
+                "path": temp_file_path,
+                "engine": engine,
+                "compression": comp3,
+                "write_kwargs": {"compression": comp2},
+            },
+        )
+        dn.write(df)
+        df.to_parquet(path=temp_file_2_path, compression=comp2, engine=engine)
+        with open(temp_file_2_path, "rb") as tf:
+            with pathlib.Path(temp_file_path).open("rb") as f:
+                assert f.read() == tf.read()
+
+        # 3, 2 and 1
+        comp1 = "brotli"
+        dn = ParquetDataNode(
+            "foo",
+            Scope.SCENARIO,
+            properties={
+                "path": temp_file_path,
+                "engine": engine,
+                "compression": comp3,
+                "write_kwargs": {"compression": comp2},
+            },
+        )
+        dn.write_with_kwargs(df, compression=comp1)
+        df.to_parquet(path=temp_file_2_path, compression=comp1, engine=engine)
+        with open(temp_file_2_path, "rb") as tf:
+            with pathlib.Path(temp_file_path).open("rb") as f:
+                assert f.read() == tf.read()
+
+        # Read
+        df.to_parquet(temp_file_path, engine=engine)
+        # 2
+        cols2 = ["a", "b"]
+        dn = ParquetDataNode(
+            "foo",
+            Scope.SCENARIO,
+            properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
+        )
+        assert set(dn.read().columns) == set(cols2)
+
+        # 1
+        cols1 = ["a"]
+        dn = ParquetDataNode(
+            "foo",
+            Scope.SCENARIO,
+            properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
+        )
+        assert set(dn.read_with_kwargs(columns=cols1).columns) == set(cols1)
+
+    def test_partition_cols(self, tmpdir_factory, default_data_frame: pd.DataFrame):
+        temp_dir_path = str(tmpdir_factory.mktemp("data").join("temp_dir"))
+
+        write_kwargs = {"partition_cols": ["a", "b"]}
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_dir_path, "write_kwargs": write_kwargs})  # type: ignore
+        dn.write(default_data_frame)
+
+        assert pathlib.Path(temp_dir_path).is_dir()
+        # dtypes change during round-trip with partition_cols
+        pd.testing.assert_frame_equal(
+            dn.read().sort_index(axis=1),
+            default_data_frame.sort_index(axis=1),
+            check_dtype=False,
+            check_categorical=False,
+        )
+
+    @pytest.mark.skipif(not util.find_spec("fastparquet"), reason="Append parquet requires fastparquet to be installed")
+    @pytest.mark.parametrize(
+        "content",
+        [
+            ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
+            (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
+        ],
+    )
+    def test_append_pandas(self, parquet_file_path, default_data_frame, content):
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path})
+        assert_frame_equal(dn.read(), default_data_frame)
+
+        dn.append(content)
+        assert_frame_equal(
+            dn.read(),
+            pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
+        )