Browse Source

added custom exposed type using dataclass

Toan Quach 1 year ago
parent
commit
a9cfde524c

+ 1 - 1
taipy/core/data/based_custom_exposed_type.py

@@ -30,5 +30,5 @@ class BasedCustomExposedType:
         return self.__column_names
 
     @property
-    def to_json(self):
+    def to_dict(self):
         return {col: getattr(self, col) for col in self.__column_names}

+ 13 - 9
taipy/core/data/csv.py

@@ -11,6 +11,7 @@
 
 import csv
 import os
+from dataclasses import asdict, is_dataclass
 from datetime import datetime, timedelta
 from os.path import isfile
 from typing import Any, Dict, List, Optional, Set
@@ -225,19 +226,22 @@ class CSVDataNode(DataNode, _AbstractFileDataNode, _AbstractTabularDataNode):
             return data
         elif exposed_type == self._EXPOSED_TYPE_NUMPY and isinstance(data, np.ndarray):
             return pd.DataFrame(data)
-        elif (
-            isinstance(data, list)
-            and not isinstance(exposed_type, str)
-            and all(isinstance(row, exposed_type) for row in data)
-        ):
-            return pd.DataFrame.from_records([row.to_dict() for row in data])
+        elif isinstance(data, list) and not isinstance(exposed_type, str):
+            if all(is_dataclass(row) for row in data):
+                return pd.DataFrame.from_records([asdict(row) for row in data])
+            # return pd.DataFrame.from_records([row.to_dict() for row in data])
         else:
             return pd.DataFrame(data)
 
     def _write(self, data: Any):
-        self.__convert_data_to_dataframe(data).to_csv(
-            self._path, index=False, encoding=self.properties[self.__ENCODING_KEY]
-        )
+        if self.properties[self.__HAS_HEADER_PROPERTY]:
+            self.__convert_data_to_dataframe(data).to_csv(
+                self._path, index=False, encoding=self.properties[self.__ENCODING_KEY]
+            )
+        else:
+            self.__convert_data_to_dataframe(data).to_csv(
+                self._path, index=False, encoding=self.properties[self.__ENCODING_KEY], header=None
+            )
 
     def write_with_column_names(self, data: Any, columns: Optional[List[str]] = None, job_id: Optional[JobId] = None):
         """Write a selection of columns.

+ 2 - 234
tests/core/data/test_csv_data_node.py

@@ -14,10 +14,8 @@ import pathlib
 from datetime import datetime
 from time import sleep
 
-import numpy as np
 import pandas as pd
 import pytest
-from pandas.testing import assert_frame_equal
 
 from taipy.config.common.scope import Scope
 from taipy.config.config import Config
@@ -25,8 +23,7 @@ from taipy.config.exceptions.exceptions import InvalidConfigurationId
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.data.csv import CSVDataNode
 from taipy.core.data.data_node_id import DataNodeId
-from taipy.core.data.operator import JoinOperator, Operator
-from taipy.core.exceptions.exceptions import InvalidExposedType, NoData
+from taipy.core.exceptions.exceptions import InvalidExposedType
 
 
 @pytest.fixture(scope="function", autouse=True)
@@ -65,9 +62,7 @@ class TestCSVDataNode:
         assert dn.exposed_type == "pandas"
 
         with pytest.raises(InvalidConfigurationId):
-            dn = CSVDataNode(
-                "foo bar", Scope.SCENARIO, properties={"path": path, "has_header": False, "name": "super name"}
-            )
+            CSVDataNode("foo bar", Scope.SCENARIO, properties={"path": path, "has_header": False, "name": "super name"})
 
     def test_modin_deprecated_in_favor_of_pandas(self):
         path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
@@ -117,146 +112,6 @@ class TestCSVDataNode:
         dn = CSVDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"), properties=properties)
         assert os.path.exists(dn.path) is exists
 
-    def test_read_with_header_pandas(self):
-        not_existing_csv = CSVDataNode("foo", Scope.SCENARIO, properties={"path": "WRONG.csv", "has_header": True})
-        with pytest.raises(NoData):
-            assert not_existing_csv.read() is None
-            not_existing_csv.read_or_raise()
-
-        path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
-        # # Create CSVDataNode without exposed_type (Default is pandas.DataFrame)
-        csv_data_node_as_pandas = CSVDataNode("bar", Scope.SCENARIO, properties={"path": path})
-        data_pandas = csv_data_node_as_pandas.read()
-        assert isinstance(data_pandas, pd.DataFrame)
-        assert len(data_pandas) == 10
-        assert np.array_equal(data_pandas.to_numpy(), pd.read_csv(path).to_numpy())
-
-    def test_read_with_header_numpy(self):
-        path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
-        # Create CSVDataNode with numpy exposed_type
-        csv_data_node_as_numpy = CSVDataNode(
-            "bar", Scope.SCENARIO, properties={"path": path, "has_header": True, "exposed_type": "numpy"}
-        )
-        data_numpy = csv_data_node_as_numpy.read()
-        assert isinstance(data_numpy, np.ndarray)
-        assert len(data_numpy) == 10
-        assert np.array_equal(data_numpy, pd.read_csv(path).to_numpy())
-
-    def test_read_with_header_custom_exposed_type(self):
-        path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
-        csv_data_node_as_pandas = CSVDataNode("bar", Scope.SCENARIO, properties={"path": path})
-        data_pandas = csv_data_node_as_pandas.read()
-
-        path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
-        # Create the same CSVDataNode but with custom exposed_type
-        csv_data_node_as_custom_object = CSVDataNode(
-            "bar", Scope.SCENARIO, properties={"path": path, "exposed_type": MyCustomObject}
-        )
-        data_custom = csv_data_node_as_custom_object.read()
-        assert isinstance(data_custom, list)
-        assert len(data_custom) == 10
-
-        for (_, row_pandas), row_custom in zip(data_pandas.iterrows(), data_custom):
-            assert isinstance(row_custom, MyCustomObject)
-            assert row_pandas["id"] == row_custom.id
-            assert str(row_pandas["integer"]) == row_custom.integer
-            assert row_pandas["text"] == row_custom.text
-
-    def test_read_without_header(self):
-        not_existing_csv = CSVDataNode("foo", Scope.SCENARIO, properties={"path": "WRONG.csv", "has_header": False})
-        with pytest.raises(NoData):
-            assert not_existing_csv.read() is None
-            not_existing_csv.read_or_raise()
-
-        path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
-        # Create CSVDataNode without exposed_type (Default is pandas.DataFrame)
-        csv_data_node_as_pandas = CSVDataNode("bar", Scope.SCENARIO, properties={"path": path, "has_header": False})
-        data_pandas = csv_data_node_as_pandas.read()
-        assert isinstance(data_pandas, pd.DataFrame)
-        assert len(data_pandas) == 11
-        assert np.array_equal(data_pandas.to_numpy(), pd.read_csv(path, header=None).to_numpy())
-
-        # Create CSVDataNode with numpy exposed_type
-        csv_data_node_as_numpy = CSVDataNode(
-            "qux", Scope.SCENARIO, properties={"path": path, "has_header": False, "exposed_type": "numpy"}
-        )
-        data_numpy = csv_data_node_as_numpy.read()
-        assert isinstance(data_numpy, np.ndarray)
-        assert len(data_numpy) == 11
-        assert np.array_equal(data_numpy, pd.read_csv(path, header=None).to_numpy())
-
-        # Create the same CSVDataNode but with custom exposed_type
-        csv_data_node_as_custom_object = CSVDataNode(
-            "quux", Scope.SCENARIO, properties={"path": path, "has_header": False, "exposed_type": MyCustomObject}
-        )
-        data_custom = csv_data_node_as_custom_object.read()
-        assert isinstance(data_custom, list)
-        assert len(data_custom) == 11
-
-        for (_, row_pandas), row_custom in zip(data_pandas.iterrows(), data_custom):
-            assert isinstance(row_custom, MyCustomObject)
-            assert row_pandas[0] == row_custom.id
-            assert str(row_pandas[1]) == row_custom.integer
-            assert row_pandas[2] == row_custom.text
-
-    @pytest.mark.parametrize(
-        "content",
-        [
-            ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
-            (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
-            ([[11, 22, 33], [44, 55, 66]]),
-        ],
-    )
-    def test_append(self, csv_file, default_data_frame, content):
-        csv_dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": csv_file})
-        assert_frame_equal(csv_dn.read(), default_data_frame)
-
-        csv_dn.append(content)
-        assert_frame_equal(
-            csv_dn.read(),
-            pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
-        )
-
-    @pytest.mark.parametrize(
-        "content,columns",
-        [
-            ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}], None),
-            ([[11, 22, 33], [44, 55, 66]], None),
-            ([[11, 22, 33], [44, 55, 66]], ["e", "f", "g"]),
-        ],
-    )
-    def test_write(self, csv_file, default_data_frame, content, columns):
-        csv_dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": csv_file})
-        assert np.array_equal(csv_dn.read().values, default_data_frame.values)
-        if not columns:
-            csv_dn.write(content)
-            df = pd.DataFrame(content)
-        else:
-            csv_dn.write_with_column_names(content, columns)
-            df = pd.DataFrame(content, columns=columns)
-        assert np.array_equal(csv_dn.read().values, df.values)
-
-        csv_dn.write(None)
-        assert len(csv_dn.read()) == 0
-
-    def test_write_with_different_encoding(self, csv_file):
-        data = pd.DataFrame([{"≥a": 1, "b": 2}])
-
-        utf8_dn = CSVDataNode("utf8_dn", Scope.SCENARIO, properties={"default_path": csv_file})
-        utf16_dn = CSVDataNode("utf16_dn", Scope.SCENARIO, properties={"default_path": csv_file, "encoding": "utf-16"})
-
-        # If a file is written with utf-8 encoding, it can only be read with utf-8, not utf-16 encoding
-        utf8_dn.write(data)
-        assert np.array_equal(utf8_dn.read(), data)
-        with pytest.raises(UnicodeError):
-            utf16_dn.read()
-
-        # If a file is written with utf-16 encoding, it can only be read with utf-16, not utf-8 encoding
-        utf16_dn.write(data)
-        assert np.array_equal(utf16_dn.read(), data)
-        with pytest.raises(UnicodeError):
-            utf8_dn.read()
-
     def test_set_path(self):
         dn = CSVDataNode("foo", Scope.SCENARIO, properties={"default_path": "foo.csv"})
         assert dn.path == "foo.csv"
@@ -280,93 +135,6 @@ class TestCSVDataNode:
         dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "pandas"})
         assert isinstance(dn.read(), pd.DataFrame)
 
-    def test_filter_pandas_exposed_type(self, csv_file):
-        dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": csv_file, "exposed_type": "pandas"})
-        dn.write(
-            [
-                {"foo": 1, "bar": 1},
-                {"foo": 1, "bar": 2},
-                {"foo": 1},
-                {"foo": 2, "bar": 2},
-                {"bar": 2},
-            ]
-        )
-
-        # Test datanode indexing and slicing
-        assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None]))
-        assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2]))
-        assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
-
-        # Test filter data
-        filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
-        filtered_by_indexing = dn[dn["foo"] == 1]
-        expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
-        filtered_by_indexing = dn[dn["foo"] != 1]
-        expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
-        filtered_by_indexing = dn[dn["bar"] == 2]
-        expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
-        filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
-        expected_data = pd.DataFrame(
-            [
-                {"foo": 1.0, "bar": 1.0},
-                {"foo": 1.0, "bar": 2.0},
-                {"foo": 2.0, "bar": 2.0},
-                {"bar": 2.0},
-            ]
-        )
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-    def test_filter_numpy_exposed_type(self, csv_file):
-        dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": csv_file, "exposed_type": "numpy"})
-        dn.write(
-            np.array(
-                [
-                    [1, 1],
-                    [1, 2],
-                    [1, 3],
-                    [2, 1],
-                    [2, 2],
-                    [2, 3],
-                ]
-            )
-        )
-
-        # Test datanode indexing and slicing
-        assert np.array_equal(dn[0], np.array([1, 1]))
-        assert np.array_equal(dn[1], np.array([1, 2]))
-        assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
-        assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
-        assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
-
-        # Test filter data
-        assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
-        assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
-
-        assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
-        assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
-
-        assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
-        assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
-
-        assert np.array_equal(
-            dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR),
-            np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
-        )
-        assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))
-
     def test_raise_error_invalid_exposed_type(self):
         path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")
         with pytest.raises(InvalidExposedType):

+ 126 - 0
tests/core/data/test_filter_csv_data_node.py

@@ -0,0 +1,126 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import os
+import pathlib
+
+import numpy as np
+import pandas as pd
+import pytest
+from pandas.testing import assert_frame_equal
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.csv import CSVDataNode
+from taipy.core.data.operator import JoinOperator, Operator
+
+
+@pytest.fixture(scope="function", autouse=True)
+def cleanup():
+    yield
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.csv")
+    if os.path.isfile(path):
+        os.remove(path)
+
+
+class MyCustomObject:
+    def __init__(self, id, integer, text):
+        self.id = id
+        self.integer = integer
+        self.text = text
+
+
+def test_filter_pandas_exposed_type(csv_file):
+    dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": csv_file, "exposed_type": "pandas"})
+    dn.write(
+        [
+            {"foo": 1, "bar": 1},
+            {"foo": 1, "bar": 2},
+            {"foo": 1},
+            {"foo": 2, "bar": 2},
+            {"bar": 2},
+        ]
+    )
+
+    # Test datanode indexing and slicing
+    assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None]))
+    assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2]))
+    assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
+
+    # Test filter data
+    filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
+    filtered_by_indexing = dn[dn["foo"] == 1]
+    expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
+    assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+    assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+    filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
+    filtered_by_indexing = dn[dn["foo"] != 1]
+    expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
+    assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+    assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+    filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
+    filtered_by_indexing = dn[dn["bar"] == 2]
+    expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
+    assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+    assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+    filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
+    filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
+    expected_data = pd.DataFrame(
+        [
+            {"foo": 1.0, "bar": 1.0},
+            {"foo": 1.0, "bar": 2.0},
+            {"foo": 2.0, "bar": 2.0},
+            {"bar": 2.0},
+        ]
+    )
+    assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+    assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+
+def test_filter_numpy_exposed_type(csv_file):
+    dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": csv_file, "exposed_type": "numpy"})
+    dn.write(
+        np.array(
+            [
+                [1, 1],
+                [1, 2],
+                [1, 3],
+                [2, 1],
+                [2, 2],
+                [2, 3],
+            ]
+        )
+    )
+
+    # Test datanode indexing and slicing
+    assert np.array_equal(dn[0], np.array([1, 1]))
+    assert np.array_equal(dn[1], np.array([1, 2]))
+    assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
+    assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
+    assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
+
+    # Test filter data
+    assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
+    assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
+
+    assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
+    assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
+
+    assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
+    assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
+
+    assert np.array_equal(
+        dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR),
+        np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
+    )
+    assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))

+ 125 - 0
tests/core/data/test_read_csv_data_node.py

@@ -0,0 +1,125 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import dataclasses
+import os
+import pathlib
+
+import numpy as np
+import pandas as pd
+import pytest
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.csv import CSVDataNode
+from taipy.core.exceptions.exceptions import NoData
+
+DATA_SAMPLE_PATH = "data_sample/example.csv"
+
+
+@dataclasses.dataclass
+class MyCustomObject:
+    id: int
+    integer: int
+    text: str
+
+
+def test_raise_no_data_with_header():
+    not_existing_csv = CSVDataNode("foo", Scope.SCENARIO, properties={"path": "WRONG.csv", "has_header": True})
+    with pytest.raises(NoData):
+        assert not_existing_csv.read() is None
+        not_existing_csv.read_or_raise()
+
+
+def test_read_with_header_pandas():
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), DATA_SAMPLE_PATH)
+
+    csv_data_node_as_pandas = CSVDataNode("bar", Scope.SCENARIO, properties={"path": path})
+    data_pandas = csv_data_node_as_pandas.read()
+    assert isinstance(data_pandas, pd.DataFrame)
+    assert len(data_pandas) == 10
+    assert np.array_equal(data_pandas.to_numpy(), pd.read_csv(path).to_numpy())
+
+
+def test_read_with_header_numpy():
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), DATA_SAMPLE_PATH)
+
+    csv_data_node_as_numpy = CSVDataNode(
+        "bar", Scope.SCENARIO, properties={"path": path, "has_header": True, "exposed_type": "numpy"}
+    )
+    data_numpy = csv_data_node_as_numpy.read()
+    assert isinstance(data_numpy, np.ndarray)
+    assert len(data_numpy) == 10
+    assert np.array_equal(data_numpy, pd.read_csv(path).to_numpy())
+
+
+def test_read_with_header_custom_exposed_type():
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), DATA_SAMPLE_PATH)
+    data_pandas = pd.read_csv(path)
+
+    csv_data_node_as_custom_object = CSVDataNode(
+        "bar", Scope.SCENARIO, properties={"path": path, "exposed_type": MyCustomObject}
+    )
+    data_custom = csv_data_node_as_custom_object.read()
+    assert isinstance(data_custom, list)
+    assert len(data_custom) == 10
+
+    for (_, row_pandas), row_custom in zip(data_pandas.iterrows(), data_custom):
+        assert isinstance(row_custom, MyCustomObject)
+        assert row_pandas["id"] == row_custom.id
+        assert str(row_pandas["integer"]) == row_custom.integer
+        assert row_pandas["text"] == row_custom.text
+
+
+def test_raise_no_data_without_header():
+    not_existing_csv = CSVDataNode("foo", Scope.SCENARIO, properties={"path": "WRONG.csv", "has_header": False})
+    with pytest.raises(NoData):
+        assert not_existing_csv.read() is None
+        not_existing_csv.read_or_raise()
+
+
+def test_read_without_header_pandas():
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), DATA_SAMPLE_PATH)
+
+    csv_data_node_as_pandas = CSVDataNode("bar", Scope.SCENARIO, properties={"path": path, "has_header": False})
+    data_pandas = csv_data_node_as_pandas.read()
+    assert isinstance(data_pandas, pd.DataFrame)
+    assert len(data_pandas) == 11
+    assert np.array_equal(data_pandas.to_numpy(), pd.read_csv(path, header=None).to_numpy())
+
+
+def test_read_without_header_numpy():
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), DATA_SAMPLE_PATH)
+
+    csv_data_node_as_numpy = CSVDataNode(
+        "qux", Scope.SCENARIO, properties={"path": path, "has_header": False, "exposed_type": "numpy"}
+    )
+    data_numpy = csv_data_node_as_numpy.read()
+    assert isinstance(data_numpy, np.ndarray)
+    assert len(data_numpy) == 11
+    assert np.array_equal(data_numpy, pd.read_csv(path, header=None).to_numpy())
+
+
+def test_read_without_header_custom_exposed_type():
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), DATA_SAMPLE_PATH)
+
+    csv_data_node_as_custom_object = CSVDataNode(
+        "quux", Scope.SCENARIO, properties={"path": path, "has_header": False, "exposed_type": MyCustomObject}
+    )
+    data_custom = csv_data_node_as_custom_object.read()
+    assert isinstance(data_custom, list)
+    assert len(data_custom) == 11
+
+    data_pandas = pd.read_csv(path, header=None)
+    for (_, row_pandas), row_custom in zip(data_pandas.iterrows(), data_custom):
+        assert isinstance(row_custom, MyCustomObject)
+        assert row_pandas[0] == row_custom.id
+        assert str(row_pandas[1]) == row_custom.integer
+        assert row_pandas[2] == row_custom.text

+ 180 - 0
tests/core/data/test_write_csv_data_node.py

@@ -0,0 +1,180 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import dataclasses
+import os
+import pathlib
+
+import numpy as np
+import pandas as pd
+import pytest
+from pandas.testing import assert_frame_equal
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.csv import CSVDataNode
+
+
+@pytest.fixture(scope="function")
+def tmp_csv_file():
+    return os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.csv")
+
+
+@pytest.fixture(scope="function", autouse=True)
+def cleanup(tmp_csv_file):
+    yield
+    if os.path.isfile(tmp_csv_file):
+        os.remove(tmp_csv_file)
+
+
+@dataclasses.dataclass
+class MyCustomObject:
+    id: int
+    integer: int
+    text: str
+
+    def __eq__(self, val) -> bool:
+        return self.id == val.id and self.integer == val.integer and self.text == val.text
+
+    def __post_init__(self):
+        for field in dataclasses.fields(self):
+            value = getattr(self, field.name)
+            if not isinstance(value, field.type):
+                setattr(self, field.name, field.type(value))
+
+
+@pytest.mark.parametrize(
+    "content",
+    [
+        ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
+        (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
+        ([[11, 22, 33], [44, 55, 66]]),
+    ],
+)
+def test_append(csv_file, default_data_frame, content):
+    csv_dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": csv_file})
+    assert_frame_equal(csv_dn.read(), default_data_frame)
+
+    csv_dn.append(content)
+    assert_frame_equal(
+        csv_dn.read(),
+        pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
+    )
+
+
+def test_write_with_header_pandas(tmp_csv_file):
+    csv_dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": tmp_csv_file})
+
+    df = pd.DataFrame([{"a": 1, "b": 2, "c": 3}, {"a": 4, "b": 5, "c": 6}])
+    csv_dn.write(df)
+    assert pd.DataFrame.equals(csv_dn.read(), df)
+
+    csv_dn.write(df["a"])
+    assert pd.DataFrame.equals(csv_dn.read(), df[["a"]])
+
+    series = pd.Series([1, 2, 3])
+    csv_dn.write(series)
+    assert np.array_equal(csv_dn.read().to_numpy(), pd.DataFrame(series).to_numpy())
+
+    csv_dn.write(None)
+    assert csv_dn.read().empty
+
+
+def test_write_with_header_numpy(tmp_csv_file):
+    csv_dn = CSVDataNode("bar", Scope.SCENARIO, properties={"path": tmp_csv_file, "exposed_type": "numpy"})
+
+    arr = np.array([[1], [2], [3], [4], [5]])
+    csv_dn.write(arr)
+    assert np.array_equal(csv_dn.read(), arr)
+
+    arr = arr[0:3]
+    csv_dn.write(arr)
+    assert np.array_equal(csv_dn.read(), arr)
+
+    csv_dn.write(None)
+    assert csv_dn.read().size == 0
+
+
+def test_write_with_header_custom_exposed_type(tmp_csv_file):
+    csv_dn = CSVDataNode("bar", Scope.SCENARIO, properties={"path": tmp_csv_file, "exposed_type": MyCustomObject})
+
+    data = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]
+    csv_dn.write(data)
+    assert all(actual == expected for actual, expected in zip(csv_dn.read(), data))
+
+    csv_dn.write(None)
+    assert csv_dn.read() == []
+
+
+def test_write_without_header_pandas(tmp_csv_file):
+    csv_dn = CSVDataNode("foo", Scope.SCENARIO, properties={"path": tmp_csv_file, "has_header": False})
+
+    df = pd.DataFrame([*zip([1, 2, 3], [4, 5, 6])])
+    csv_dn.write(df)
+    assert pd.DataFrame.equals(csv_dn.read(), df)
+
+    csv_dn.write(df[0])
+    assert pd.DataFrame.equals(csv_dn.read(), df[[0]])
+
+    series = pd.Series([1, 2, 3])
+    csv_dn.write(series)
+    assert np.array_equal(csv_dn.read().to_numpy(), pd.DataFrame(series).to_numpy())
+
+    csv_dn.write(None)
+    assert csv_dn.read().empty
+
+
+def test_write_without_header_numpy(tmp_csv_file):
+    csv_dn = CSVDataNode(
+        "bar", Scope.SCENARIO, properties={"path": tmp_csv_file, "exposed_type": "numpy", "has_header": False}
+    )
+
+    arr = np.array([[1], [2], [3], [4], [5]])
+    csv_dn.write(arr)
+    assert np.array_equal(csv_dn.read(), arr)
+
+    arr = arr[0:3]
+    csv_dn.write(arr)
+    assert np.array_equal(csv_dn.read(), arr)
+
+    csv_dn.write(None)
+    assert csv_dn.read().size == 0
+
+
+def test_write_without_header_custom_exposed_type(tmp_csv_file):
+    csv_dn = CSVDataNode(
+        "bar", Scope.SCENARIO, properties={"path": tmp_csv_file, "exposed_type": MyCustomObject, "has_header": False}
+    )
+
+    data = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]
+    csv_dn.write(data)
+    assert all(actual == expected for actual, expected in zip(csv_dn.read(), data))
+
+    csv_dn.write(None)
+    assert csv_dn.read() == []
+
+
+def test_write_with_different_encoding(csv_file):
+    data = pd.DataFrame([{"≥a": 1, "b": 2}])
+
+    utf8_dn = CSVDataNode("utf8_dn", Scope.SCENARIO, properties={"default_path": csv_file})
+    utf16_dn = CSVDataNode("utf16_dn", Scope.SCENARIO, properties={"default_path": csv_file, "encoding": "utf-16"})
+
+    # If a file is written with utf-8 encoding, it can only be read with utf-8, not utf-16 encoding
+    utf8_dn.write(data)
+    assert np.array_equal(utf8_dn.read(), data)
+    with pytest.raises(UnicodeError):
+        utf16_dn.read()
+
+    # If a file is written with utf-16 encoding, it can only be read with utf-16, not utf-8 encoding
+    utf16_dn.write(data)
+    assert np.array_equal(utf16_dn.read(), data)
+    with pytest.raises(UnicodeError):
+        utf8_dn.read()