Browse Source

Merge pull request #936 from Avaiga/feature/#797-custom-exposed-type-sql-and-parquet

Feature/#797 custom exposed type sql and parquet
Toan Quach 1 year ago
parent
commit
d298a58942

+ 1 - 0
taipy/core/data/_abstract_sql.py

@@ -117,6 +117,7 @@ class _AbstractSQLDataNode(DataNode, _AbstractTabularDataNode):
             editor_expiration_date,
             **properties,
         )
+        _AbstractTabularDataNode.__init__(self, **properties)
         self._engine = None
         if not self._last_edit_date:  # type: ignore
             self._last_edit_date = datetime.now()

+ 9 - 6
taipy/core/data/parquet.py

@@ -158,6 +158,8 @@ class ParquetDataNode(DataNode, _AbstractFileDataNode, _AbstractTabularDataNode)
             editor_expiration_date,
             **properties,
         )
+        _AbstractTabularDataNode.__init__(self, **properties)
+
         self._path = properties.get(self.__PATH_KEY, properties.get(self.__DEFAULT_PATH_KEY))
 
         if self._path and ".data" in self._path:
@@ -249,13 +251,14 @@ class ParquetDataNode(DataNode, _AbstractFileDataNode, _AbstractTabularDataNode)
         }
         kwargs.update(self.properties[self.__WRITE_KWARGS_PROPERTY])
         kwargs.update(write_kwargs)
-        if isinstance(data, pd.DataFrame):
-            data.to_parquet(self._path, **kwargs)
+        if isinstance(data, pd.Series):
+            df = pd.DataFrame(data)
         else:
-            _df = pd.DataFrame(data)
-            # Ensure that the columns are strings, otherwise writing will fail with pandas 1.3.5
-            _df.columns = _df.columns.astype(str)
-            _df.to_parquet(self._path, **kwargs)
+            df = self._convert_data_to_dataframe(self.properties[self._EXPOSED_TYPE_PROPERTY], data)
+
+        # Ensure that the columns are strings, otherwise writing will fail with pandas 1.3.5
+        df.columns = df.columns.astype(str)
+        df.to_parquet(self._path, **kwargs)
         self.track_edit(timestamp=datetime.now(), job_id=job_id)
 
     def read_with_kwargs(self, **read_kwargs):

+ 8 - 38
taipy/core/data/sql_table.py

@@ -10,9 +10,8 @@
 # specific language governing permissions and limitations under the License.
 
 from datetime import datetime, timedelta
-from typing import Any, Dict, List, Optional, Set, Tuple, Union
+from typing import Any, Dict, List, Optional, Set
 
-import numpy as np
 import pandas as pd
 from sqlalchemy import MetaData, Table
 
@@ -123,26 +122,12 @@ class SQLTableDataNode(_AbstractSQLDataNode):
 
     def __insert_data(self, data, engine, connection, delete_table: bool = False) -> None:
         table = self._create_table(engine)
-        if isinstance(data, pd.DataFrame):
-            self.__insert_dataframe(data, table, connection, delete_table)
-            return
-
-        if isinstance(data, np.ndarray):
-            data = data.tolist()
-        if not isinstance(data, list):
-            data = [data]
-
-        if len(data) == 0:
-            self.__delete_all_rows(table, connection, delete_table)
-            return
-
-        if isinstance(data[0], (tuple, list)):
-            self.__insert_tuples(data, table, connection, delete_table)
-        elif isinstance(data[0], dict):
-            self.__insert_dicts(data, table, connection, delete_table)
-        # If data is a primitive type, it will be inserted as a tuple of one element.
-        else:
-            self.__insert_tuples([(x,) for x in data], table, connection, delete_table)
+        self.__insert_dataframe(
+            self._convert_data_to_dataframe(self.properties[self._EXPOSED_TYPE_PROPERTY], data),
+            table,
+            connection,
+            delete_table,
+        )
 
     def _create_table(self, engine) -> Table:
         return Table(
@@ -161,24 +146,9 @@ class SQLTableDataNode(_AbstractSQLDataNode):
         connection.execute(table.insert(), data)
 
     @classmethod
-    def __insert_dataframe(
-        cls, df: pd.DataFrame, table: Any, connection: Any, delete_table: bool
-    ) -> None:
+    def __insert_dataframe(cls, df: pd.DataFrame, table: Any, connection: Any, delete_table: bool) -> None:
         cls.__insert_dicts(df.to_dict(orient="records"), table, connection, delete_table)
 
-    @classmethod
-    def __insert_tuples(cls, data: List[Union[Tuple, List]], table: Any, connection: Any, delete_table: bool) -> None:
-        """
-        This method will look up the length of the first object of the list and build the insert through
-        creation of a string of '?' equivalent to the length of the element. The '?' character is used as
-        placeholder for a tuple of same size.
-        """
-        cls.__delete_all_rows(table, connection, delete_table)
-        markers = ",".join("?" * len(data[0]))
-        ins = "INSERT INTO {tablename} VALUES ({markers})"
-        ins = ins.format(tablename=table.name, markers=markers)
-        connection.execute(ins, data)
-
     @classmethod
     def __delete_all_rows(cls, table: Any, connection: Any, delete_table: bool) -> None:
         if delete_table:

+ 139 - 0
tests/core/data/test_filter_parquet_data_node.py

@@ -0,0 +1,139 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import os
+import pathlib
+from importlib import util
+
+import numpy as np
+import pandas as pd
+import pytest
+from pandas.testing import assert_frame_equal
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.operator import JoinOperator, Operator
+from taipy.core.data.parquet import ParquetDataNode
+
+
+@pytest.fixture(scope="function", autouse=True)
+def cleanup():
+    yield
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
+    if os.path.isfile(path):
+        os.remove(path)
+
+
+class MyCustomObject:
+    def __init__(self, id, integer, text):
+        self.id = id
+        self.integer = integer
+        self.text = text
+
+
+class MyOtherCustomObject:
+    def __init__(self, id, sentence):
+        self.id = id
+        self.sentence = sentence
+
+
+def create_custom_class(**kwargs):
+    return MyOtherCustomObject(id=kwargs["id"], sentence=kwargs["text"])
+
+
+class TestFilterParquetDataNode:
+    __engine = ["pyarrow"]
+    if util.find_spec("fastparquet"):
+        __engine.append("fastparquet")
+
+    def test_filter_pandas_exposed_type(self, parquet_file_path):
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "pandas"})
+        dn.write(
+            [
+                {"foo": 1, "bar": 1},
+                {"foo": 1, "bar": 2},
+                {"foo": 1},
+                {"foo": 2, "bar": 2},
+                {"bar": 2},
+            ]
+        )
+
+        # Test datanode indexing and slicing
+        assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None]))
+        assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2]))
+        assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
+
+        # Test filter data
+        filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
+        filtered_by_indexing = dn[dn["foo"] == 1]
+        expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+        filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
+        filtered_by_indexing = dn[dn["foo"] != 1]
+        expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+        filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
+        filtered_by_indexing = dn[dn["bar"] == 2]
+        expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+        filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
+        filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
+        expected_data = pd.DataFrame(
+            [
+                {"foo": 1.0, "bar": 1.0},
+                {"foo": 1.0, "bar": 2.0},
+                {"foo": 2.0, "bar": 2.0},
+                {"bar": 2.0},
+            ]
+        )
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+    def test_filter_numpy_exposed_type(self, parquet_file_path):
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy"})
+        dn.write(
+            [
+                [1, 1],
+                [1, 2],
+                [1, 3],
+                [2, 1],
+                [2, 2],
+                [2, 3],
+            ]
+        )
+
+        # Test datanode indexing and slicing
+        assert np.array_equal(dn[0], np.array([1, 1]))
+        assert np.array_equal(dn[1], np.array([1, 2]))
+        assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
+        assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
+        assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
+
+        # Test filter data
+        assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
+        assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
+
+        assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
+        assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
+
+        assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
+        assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
+
+        assert np.array_equal(
+            dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR),
+            np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
+        )
+        assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))

+ 207 - 0
tests/core/data/test_filter_sql_table_data_node.py

@@ -0,0 +1,207 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+from importlib import util
+from unittest.mock import patch
+
+import numpy as np
+import pandas as pd
+from pandas.testing import assert_frame_equal
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.operator import JoinOperator, Operator
+from taipy.core.data.sql_table import SQLTableDataNode
+
+
+class MyCustomObject:
+    def __init__(self, foo=None, bar=None, *args, **kwargs):
+        self.foo = foo
+        self.bar = bar
+        self.args = args
+        self.kwargs = kwargs
+
+
+class TestFilterSQLTableDataNode:
+    __pandas_properties = [
+        {
+            "db_name": "taipy",
+            "db_engine": "sqlite",
+            "table_name": "example",
+            "db_extra_args": {
+                "TrustServerCertificate": "yes",
+                "other": "value",
+            },
+        },
+    ]
+
+    if util.find_spec("pyodbc"):
+        __pandas_properties.append(
+            {
+                "db_username": "sa",
+                "db_password": "Passw0rd",
+                "db_name": "taipy",
+                "db_engine": "mssql",
+                "table_name": "example",
+                "db_extra_args": {
+                    "TrustServerCertificate": "yes",
+                },
+            },
+        )
+
+    if util.find_spec("pymysql"):
+        __pandas_properties.append(
+            {
+                "db_username": "sa",
+                "db_password": "Passw0rd",
+                "db_name": "taipy",
+                "db_engine": "mysql",
+                "table_name": "example",
+                "db_extra_args": {
+                    "TrustServerCertificate": "yes",
+                },
+            },
+        )
+
+    if util.find_spec("psycopg2"):
+        __pandas_properties.append(
+            {
+                "db_username": "sa",
+                "db_password": "Passw0rd",
+                "db_name": "taipy",
+                "db_engine": "postgresql",
+                "table_name": "example",
+                "db_extra_args": {
+                    "TrustServerCertificate": "yes",
+                },
+            },
+        )
+
+    def test_filter_pandas_exposed_type(self, tmp_sqlite_sqlite3_file_path):
+        folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
+        properties = {
+            "db_engine": "sqlite",
+            "table_name": "example",
+            "db_name": db_name,
+            "sqlite_folder_path": folder_path,
+            "sqlite_file_extension": file_extension,
+            "exposed_type": "pandas",
+        }
+        dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=properties)
+        dn.write(
+            pd.DataFrame(
+                [
+                    {"foo": 1, "bar": 1},
+                    {"foo": 1, "bar": 2},
+                    {"foo": 1, "bar": 3},
+                    {"foo": 2, "bar": 1},
+                    {"foo": 2, "bar": 2},
+                    {"foo": 2, "bar": 3},
+                ]
+            )
+        )
+
+        # Test datanode indexing and slicing
+        assert dn["foo"].equals(pd.Series([1, 1, 1, 2, 2, 2]))
+        assert dn["bar"].equals(pd.Series([1, 2, 3, 1, 2, 3]))
+        assert dn[:2].equals(pd.DataFrame([{"foo": 1, "bar": 1}, {"foo": 1, "bar": 2}]))
+
+        # Test filter data
+        filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
+        filtered_by_indexing = dn[dn["foo"] == 1]
+        expected_data = pd.DataFrame([{"foo": 1, "bar": 1}, {"foo": 1, "bar": 2}, {"foo": 1, "bar": 3}])
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+        filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
+        filtered_by_indexing = dn[dn["foo"] != 1]
+        expected_data = pd.DataFrame([{"foo": 2, "bar": 1}, {"foo": 2, "bar": 2}, {"foo": 2, "bar": 3}])
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+        filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
+        filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
+        expected_data = pd.DataFrame(
+            [
+                {"foo": 1, "bar": 1},
+                {"foo": 1, "bar": 2},
+                {"foo": 2, "bar": 1},
+                {"foo": 2, "bar": 2},
+            ]
+        )
+        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
+        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
+
+    def test_filter_numpy_exposed_type(self, tmp_sqlite_sqlite3_file_path):
+        folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
+        properties = {
+            "db_engine": "sqlite",
+            "table_name": "example",
+            "db_name": db_name,
+            "sqlite_folder_path": folder_path,
+            "sqlite_file_extension": file_extension,
+            "exposed_type": "numpy",
+        }
+        dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=properties)
+        dn.write(
+            pd.DataFrame(
+                [
+                    {"foo": 1, "bar": 1},
+                    {"foo": 1, "bar": 2},
+                    {"foo": 1, "bar": 3},
+                    {"foo": 2, "bar": 1},
+                    {"foo": 2, "bar": 2},
+                    {"foo": 2, "bar": 3},
+                ]
+            )
+        )
+
+        # Test datanode indexing and slicing
+        assert np.array_equal(dn[0], np.array([1, 1]))
+        assert np.array_equal(dn[1], np.array([1, 2]))
+        assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
+        assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
+        assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
+
+        # Test filter data
+        assert np.array_equal(dn.filter(("foo", 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
+        assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
+
+        assert np.array_equal(dn.filter(("foo", 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
+        assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
+
+        assert np.array_equal(dn.filter(("bar", 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
+        assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
+
+        assert np.array_equal(
+            dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR),
+            np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
+        )
+        assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))
+
+    def test_filter_does_not_read_all_entities(self, tmp_sqlite_sqlite3_file_path):
+        folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
+        properties = {
+            "db_engine": "sqlite",
+            "table_name": "example",
+            "db_name": db_name,
+            "sqlite_folder_path": folder_path,
+            "sqlite_file_extension": file_extension,
+            "exposed_type": "numpy",
+        }
+        dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=properties)
+
+        # SQLTableDataNode.filter() should not call the MongoCollectionDataNode._read() method
+        with patch.object(SQLTableDataNode, "_read") as read_mock:
+            dn.filter(("foo", 1, Operator.EQUAL))
+            dn.filter(("bar", 2, Operator.NOT_EQUAL))
+            dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
+
+            assert read_mock["_read"].call_count == 0

+ 0 - 327
tests/core/data/test_parquet_data_node.py

@@ -15,21 +15,17 @@ from datetime import datetime
 from importlib import util
 from time import sleep
 
-import numpy as np
 import pandas as pd
 import pytest
-from pandas.testing import assert_frame_equal
 
 from taipy.config.common.scope import Scope
 from taipy.config.config import Config
 from taipy.config.exceptions.exceptions import InvalidConfigurationId
 from taipy.core.data._data_manager import _DataManager
 from taipy.core.data.data_node_id import DataNodeId
-from taipy.core.data.operator import JoinOperator, Operator
 from taipy.core.data.parquet import ParquetDataNode
 from taipy.core.exceptions.exceptions import (
     InvalidExposedType,
-    NoData,
     UnknownCompressionAlgorithm,
     UnknownParquetEngine,
 )
@@ -143,81 +139,12 @@ class TestParquetDataNode:
         data_modin = parquet_data_node_as_modin.read()
         assert isinstance(data_modin, pd.DataFrame)
 
-    @pytest.mark.parametrize("engine", __engine)
-    def test_read_file(self, engine, parquet_file_path):
-        not_existing_parquet = ParquetDataNode(
-            "foo", Scope.SCENARIO, properties={"path": "nonexistent.parquet", "engine": engine}
-        )
-        with pytest.raises(NoData):
-            assert not_existing_parquet.read() is None
-            not_existing_parquet.read_or_raise()
-
-        df = pd.read_parquet(parquet_file_path)
-        # Create ParquetDataNode without exposed_type (Default is pandas.DataFrame)
-        parquet_data_node_as_pandas = ParquetDataNode(
-            "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "engine": engine}
-        )
-        data_pandas = parquet_data_node_as_pandas.read()
-        assert isinstance(data_pandas, pd.DataFrame)
-        assert len(data_pandas) == 2
-        assert data_pandas.equals(df)
-        assert np.array_equal(data_pandas.to_numpy(), df.to_numpy())
-
-        # Create ParquetDataNode with numpy exposed_type
-        parquet_data_node_as_numpy = ParquetDataNode(
-            "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy", "engine": engine}
-        )
-        data_numpy = parquet_data_node_as_numpy.read()
-        assert isinstance(data_numpy, np.ndarray)
-        assert len(data_numpy) == 2
-        assert np.array_equal(data_numpy, df.to_numpy())
-
-    @pytest.mark.parametrize("engine", __engine)
-    def test_read_folder(self, engine):
-        parquet_folder_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
-
-        df = pd.read_parquet(parquet_folder_path)
-        parquet_data_node_as_pandas = ParquetDataNode(
-            "bar", Scope.SCENARIO, properties={"path": parquet_folder_path, "engine": engine}
-        )
-        data_pandas = parquet_data_node_as_pandas.read()
-        assert isinstance(data_pandas, pd.DataFrame)
-        assert len(data_pandas) == 5
-        assert data_pandas.equals(df)
-        assert np.array_equal(data_pandas.to_numpy(), df.to_numpy())
-
     def test_set_path(self):
         dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": "foo.parquet"})
         assert dn.path == "foo.parquet"
         dn.path = "bar.parquet"
         assert dn.path == "bar.parquet"
 
-    @pytest.mark.parametrize("engine", __engine)
-    def test_read_write_after_modify_path(self, engine):
-        path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
-        new_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": engine})
-        read_data = dn.read()
-        assert read_data is not None
-        dn.path = new_path
-        with pytest.raises(FileNotFoundError):
-            dn.read()
-        dn.write(read_data)
-        assert dn.read().equals(read_data)
-
-    def test_read_custom_exposed_type(self):
-        example_parquet_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
-
-        dn = ParquetDataNode(
-            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": MyCustomObject}
-        )
-        assert all(isinstance(obj, MyCustomObject) for obj in dn.read())
-
-        dn = ParquetDataNode(
-            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": create_custom_class}
-        )
-        assert all(isinstance(obj, MyOtherCustomObject) for obj in dn.read())
-
     def test_raise_error_unknown_parquet_engine(self):
         path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
         with pytest.raises(UnknownParquetEngine):
@@ -233,23 +160,6 @@ class TestParquetDataNode:
         with pytest.raises(InvalidExposedType):
             ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "foo"})
 
-    def test_read_empty_data(self, tmpdir_factory):
-        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
-        empty_df = pd.DataFrame([])
-        empty_df.to_parquet(temp_file_path)
-
-        # Pandas
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
-        assert dn.read().equals(empty_df)
-
-        # Numpy
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "numpy"})
-        assert np.array_equal(dn.read(), empty_df.to_numpy())
-
-        # Custom
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": MyCustomObject})
-        assert dn.read() == []
-
     def test_get_system_file_modified_date_instead_of_last_edit_date(self, tmpdir_factory):
         temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
         pd.DataFrame([]).to_parquet(temp_file_path)
@@ -297,243 +207,6 @@ class TestParquetDataNode:
 
         os.unlink(temp_file_path)
 
-    @pytest.mark.skipif(not util.find_spec("fastparquet"), reason="Append parquet requires fastparquet to be installed")
-    @pytest.mark.parametrize(
-        "content",
-        [
-            ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
-            (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
-        ],
-    )
-    def test_append_pandas(self, parquet_file_path, default_data_frame, content):
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path})
-        assert_frame_equal(dn.read(), default_data_frame)
-
-        dn.append(content)
-        assert_frame_equal(
-            dn.read(),
-            pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
-        )
-
-    @pytest.mark.parametrize(
-        "data",
-        [
-            [{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}],
-            pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
-        ],
-    )
-    def test_write_to_disk(self, tmpdir_factory, data):
-        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path})
-        dn.write(data)
-
-        assert pathlib.Path(temp_file_path).exists()
-        assert isinstance(dn.read(), pd.DataFrame)
-
-    def test_filter_pandas_exposed_type(self, parquet_file_path):
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "pandas"})
-        dn.write(
-            [
-                {"foo": 1, "bar": 1},
-                {"foo": 1, "bar": 2},
-                {"foo": 1},
-                {"foo": 2, "bar": 2},
-                {"bar": 2},
-            ]
-        )
-
-        # Test datanode indexing and slicing
-        assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None]))
-        assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2]))
-        assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
-
-        # Test filter data
-        filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
-        filtered_by_indexing = dn[dn["foo"] == 1]
-        expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
-        filtered_by_indexing = dn[dn["foo"] != 1]
-        expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
-        filtered_by_indexing = dn[dn["bar"] == 2]
-        expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
-        filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
-        expected_data = pd.DataFrame(
-            [
-                {"foo": 1.0, "bar": 1.0},
-                {"foo": 1.0, "bar": 2.0},
-                {"foo": 2.0, "bar": 2.0},
-                {"bar": 2.0},
-            ]
-        )
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-    def test_filter_numpy_exposed_type(self, parquet_file_path):
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy"})
-        dn.write(
-            [
-                [1, 1],
-                [1, 2],
-                [1, 3],
-                [2, 1],
-                [2, 2],
-                [2, 3],
-            ]
-        )
-
-        # Test datanode indexing and slicing
-        assert np.array_equal(dn[0], np.array([1, 1]))
-        assert np.array_equal(dn[1], np.array([1, 2]))
-        assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
-        assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
-        assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
-
-        # Test filter data
-        assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
-        assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
-
-        assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
-        assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
-
-        assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
-        assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
-
-        assert np.array_equal(
-            dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR),
-            np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
-        )
-        assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))
-
-    @pytest.mark.parametrize("engine", __engine)
-    def test_pandas_parquet_config_kwargs(self, engine, tmpdir_factory):
-        read_kwargs = {"filters": [("integer", "<", 10)], "columns": ["integer"]}
-        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
-        dn = ParquetDataNode(
-            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "read_kwargs": read_kwargs}
-        )
-
-        df = pd.read_csv(os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv"))
-        dn.write(df)
-
-        assert set(pd.read_parquet(temp_file_path).columns) == {"id", "integer", "text"}
-        assert set(dn.read().columns) == set(read_kwargs["columns"])
-
-        # !!! filter doesn't work with `fastparquet` without partition_cols
-        if engine == "pyarrow":
-            assert len(dn.read()) != len(df)
-            assert len(dn.read()) == 2
-
-    @pytest.mark.parametrize("engine", __engine)
-    def test_kwarg_precedence(self, engine, tmpdir_factory, default_data_frame):
-        # Precedence:
-        # 1. Class read/write methods
-        # 2. Defined in read_kwargs and write_kwargs, in properties
-        # 3. Defined top-level in properties
-
-        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
-        temp_file_2_path = str(tmpdir_factory.mktemp("data").join("temp_2.parquet"))
-        df = default_data_frame.copy(deep=True)
-
-        # Write
-        # 3
-        comp3 = "snappy"
-        dn = ParquetDataNode(
-            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "compression": comp3}
-        )
-        dn.write(df)
-        df.to_parquet(path=temp_file_2_path, compression=comp3, engine=engine)
-        with open(temp_file_2_path, "rb") as tf:
-            with pathlib.Path(temp_file_path).open("rb") as f:
-                assert f.read() == tf.read()
-
-        # 3 and 2
-        comp2 = "gzip"
-        dn = ParquetDataNode(
-            "foo",
-            Scope.SCENARIO,
-            properties={
-                "path": temp_file_path,
-                "engine": engine,
-                "compression": comp3,
-                "write_kwargs": {"compression": comp2},
-            },
-        )
-        dn.write(df)
-        df.to_parquet(path=temp_file_2_path, compression=comp2, engine=engine)
-        with open(temp_file_2_path, "rb") as tf:
-            with pathlib.Path(temp_file_path).open("rb") as f:
-                assert f.read() == tf.read()
-
-        # 3, 2 and 1
-        comp1 = "brotli"
-        dn = ParquetDataNode(
-            "foo",
-            Scope.SCENARIO,
-            properties={
-                "path": temp_file_path,
-                "engine": engine,
-                "compression": comp3,
-                "write_kwargs": {"compression": comp2},
-            },
-        )
-        dn.write_with_kwargs(df, compression=comp1)
-        df.to_parquet(path=temp_file_2_path, compression=comp1, engine=engine)
-        with open(temp_file_2_path, "rb") as tf:
-            with pathlib.Path(temp_file_path).open("rb") as f:
-                assert f.read() == tf.read()
-
-        # Read
-        df.to_parquet(temp_file_path, engine=engine)
-        # 2
-        cols2 = ["a", "b"]
-        dn = ParquetDataNode(
-            "foo",
-            Scope.SCENARIO,
-            properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
-        )
-        assert set(dn.read().columns) == set(cols2)
-
-        # 1
-        cols1 = ["a"]
-        dn = ParquetDataNode(
-            "foo",
-            Scope.SCENARIO,
-            properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
-        )
-        assert set(dn.read_with_kwargs(columns=cols1).columns) == set(cols1)
-
-    def test_partition_cols(self, tmpdir_factory, default_data_frame: pd.DataFrame):
-        temp_dir_path = str(tmpdir_factory.mktemp("data").join("temp_dir"))
-
-        write_kwargs = {"partition_cols": ["a", "b"]}
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_dir_path, "write_kwargs": write_kwargs})  # type: ignore
-        dn.write(default_data_frame)
-
-        assert pathlib.Path(temp_dir_path).is_dir()
-        # dtypes change during round-trip with partition_cols
-        pd.testing.assert_frame_equal(
-            dn.read().sort_index(axis=1),
-            default_data_frame.sort_index(axis=1),
-            check_dtype=False,
-            check_categorical=False,
-        )
-
-    def test_read_with_kwargs_never_written(self):
-        path = "data/node/path"
-        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path})
-        assert dn.read_with_kwargs() is None
-
     def test_migrate_to_new_path(self, tmp_path):
         _base_path = os.path.join(tmp_path, ".data")
         path = os.path.join(_base_path, "test.parquet")

+ 188 - 0
tests/core/data/test_read_parquet_data_node.py

@@ -0,0 +1,188 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import os
+import pathlib
+from importlib import util
+
+import numpy as np
+import pandas as pd
+import pytest
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.parquet import ParquetDataNode
+from taipy.core.exceptions.exceptions import NoData
+
+
+@pytest.fixture(scope="function", autouse=True)
+def cleanup():
+    yield
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
+    if os.path.isfile(path):
+        os.remove(path)
+
+
+class MyCustomObject:
+    def __init__(self, id, integer, text):
+        self.id = id
+        self.integer = integer
+        self.text = text
+
+
+class MyOtherCustomObject:
+    def __init__(self, id, sentence):
+        self.id = id
+        self.sentence = sentence
+
+
+class MyCustomXYObject:
+    def __init__(self, x, y):
+        self.x = x
+        self.y = y
+
+
+def create_custom_class(**kwargs):
+    return MyOtherCustomObject(id=kwargs["id"], sentence=kwargs["text"])
+
+
+def create_custom_xy_class(**kwargs):
+    return MyCustomXYObject(x=kwargs["x"], y=kwargs["y"])
+
+
+class TestReadParquetDataNode:
+    __engine = ["pyarrow"]
+    if util.find_spec("fastparquet"):
+        __engine.append("fastparquet")
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_raise_no_data(self, engine, parquet_file_path):
+        not_existing_parquet = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": "nonexistent.parquet", "engine": engine}
+        )
+        with pytest.raises(NoData):
+            assert not_existing_parquet.read() is None
+            not_existing_parquet.read_or_raise()
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_parquet_file_pandas(self, engine, parquet_file_path):
+        df = pd.read_parquet(parquet_file_path)
+        parquet_data_node_as_pandas = ParquetDataNode(
+            "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "engine": engine}
+        )
+        data_pandas = parquet_data_node_as_pandas.read()
+        assert isinstance(data_pandas, pd.DataFrame)
+        assert len(data_pandas) == 2
+        assert data_pandas.equals(df)
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_parquet_file_numpy(self, engine, parquet_file_path):
+        df = pd.read_parquet(parquet_file_path)
+        parquet_data_node_as_numpy = ParquetDataNode(
+            "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy", "engine": engine}
+        )
+        data_numpy = parquet_data_node_as_numpy.read()
+        assert isinstance(data_numpy, np.ndarray)
+        assert len(data_numpy) == 2
+        assert np.array_equal(data_numpy, df.to_numpy())
+
+    def test_read_custom_exposed_type(self):
+        example_parquet_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
+
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": MyCustomObject}
+        )
+        assert all(isinstance(obj, MyCustomObject) for obj in dn.read())
+
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": create_custom_class}
+        )
+        assert all(isinstance(obj, MyOtherCustomObject) for obj in dn.read())
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_parquet_folder_pandas(self, engine):
+        parquet_folder_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
+
+        df = pd.read_parquet(parquet_folder_path)
+        parquet_data_node_as_pandas = ParquetDataNode(
+            "bar", Scope.SCENARIO, properties={"path": parquet_folder_path, "engine": engine}
+        )
+        data_pandas = parquet_data_node_as_pandas.read()
+        assert isinstance(data_pandas, pd.DataFrame)
+        assert len(data_pandas) == 5
+        assert data_pandas.equals(df)
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_parquet_folder_numpy(self, engine):
+        parquet_folder_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
+
+        df = pd.read_parquet(parquet_folder_path)
+        parquet_data_node_as_pandas = ParquetDataNode(
+            "bar", Scope.SCENARIO, properties={"path": parquet_folder_path, "engine": engine, "exposed_type": "numpy"}
+        )
+        data_numpy = parquet_data_node_as_pandas.read()
+        assert isinstance(data_numpy, np.ndarray)
+        assert len(data_numpy) == 5
+        assert np.array_equal(data_numpy, df.to_numpy())
+
+    def test_read_folder_custom_exposed_type(self):
+        example_parquet_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
+
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": MyCustomXYObject}
+        )
+        dn.read()
+        assert all(isinstance(obj, MyCustomXYObject) for obj in dn.read())
+
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": create_custom_xy_class}
+        )
+        assert all(isinstance(obj, MyCustomXYObject) for obj in dn.read())
+
+    def test_read_empty_data(self, tmpdir_factory):
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        empty_df = pd.DataFrame([])
+        empty_df.to_parquet(temp_file_path)
+
+        # Pandas
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
+        assert dn.read().equals(empty_df)
+
+        # Numpy
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "numpy"})
+        assert np.array_equal(dn.read(), empty_df.to_numpy())
+
+        # Custom
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": MyCustomObject})
+        assert dn.read() == []
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_pandas_parquet_config_kwargs(self, engine, tmpdir_factory):
+        read_kwargs = {"filters": [("integer", "<", 10)], "columns": ["integer"]}
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "read_kwargs": read_kwargs}
+        )
+
+        df = pd.read_csv(os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv"))
+        dn.write(df)
+
+        assert set(pd.read_parquet(temp_file_path).columns) == {"id", "integer", "text"}
+        assert set(dn.read().columns) == set(read_kwargs["columns"])
+
+        # !!! filter doesn't work with `fastparquet` without partition_cols
+        if engine == "pyarrow":
+            assert len(dn.read()) != len(df)
+            assert len(dn.read()) == 2
+
+    def test_read_with_kwargs_never_written(self):
+        path = "data/node/path"
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path})
+        assert dn.read_with_kwargs() is None

+ 176 - 0
tests/core/data/test_read_sql_table_data_node.py

@@ -0,0 +1,176 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+from importlib import util
+from unittest.mock import patch
+
+import numpy as np
+import pandas as pd
+import pytest
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.sql_table import SQLTableDataNode
+
+
+class MyCustomObject:
+    def __init__(self, foo=None, bar=None, *args, **kwargs):
+        self.foo = foo
+        self.bar = bar
+        self.args = args
+        self.kwargs = kwargs
+
+
+class TestReadSQLTableDataNode:
+    __pandas_properties = [
+        {
+            "db_name": "taipy",
+            "db_engine": "sqlite",
+            "table_name": "example",
+            "db_extra_args": {
+                "TrustServerCertificate": "yes",
+                "other": "value",
+            },
+        },
+    ]
+
+    if util.find_spec("pyodbc"):
+        __pandas_properties.append(
+            {
+                "db_username": "sa",
+                "db_password": "Passw0rd",
+                "db_name": "taipy",
+                "db_engine": "mssql",
+                "table_name": "example",
+                "db_extra_args": {
+                    "TrustServerCertificate": "yes",
+                },
+            },
+        )
+
+    if util.find_spec("pymysql"):
+        __pandas_properties.append(
+            {
+                "db_username": "sa",
+                "db_password": "Passw0rd",
+                "db_name": "taipy",
+                "db_engine": "mysql",
+                "table_name": "example",
+                "db_extra_args": {
+                    "TrustServerCertificate": "yes",
+                },
+            },
+        )
+
+    if util.find_spec("psycopg2"):
+        __pandas_properties.append(
+            {
+                "db_username": "sa",
+                "db_password": "Passw0rd",
+                "db_name": "taipy",
+                "db_engine": "postgresql",
+                "table_name": "example",
+                "db_extra_args": {
+                    "TrustServerCertificate": "yes",
+                },
+            },
+        )
+
+    @staticmethod
+    def mock_read_value():
+        return {"foo": ["baz", "quux", "corge"], "bar": ["quux", "quuz", None]}
+
+    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
+    def test_read_pandas(self, pandas_properties):
+        custom_properties = pandas_properties.copy()
+
+        sql_data_node_as_pandas = SQLTableDataNode(
+            "foo",
+            Scope.SCENARIO,
+            properties=custom_properties,
+        )
+
+        with patch("sqlalchemy.engine.Engine.connect") as engine_mock:
+            cursor_mock = engine_mock.return_value.__enter__.return_value
+            cursor_mock.execute.return_value = self.mock_read_value()
+
+            pandas_data = sql_data_node_as_pandas.read()
+            assert isinstance(pandas_data, pd.DataFrame)
+            assert pandas_data.equals(pd.DataFrame(self.mock_read_value()))
+
+    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
+    def test_read_numpy(self, pandas_properties):
+        custom_properties = pandas_properties.copy()
+        custom_properties["exposed_type"] = "numpy"
+
+        sql_data_node_as_pandas = SQLTableDataNode(
+            "foo",
+            Scope.SCENARIO,
+            properties=custom_properties,
+        )
+
+        with patch("sqlalchemy.engine.Engine.connect") as engine_mock:
+            cursor_mock = engine_mock.return_value.__enter__.return_value
+            cursor_mock.execute.return_value = self.mock_read_value()
+
+            numpy_data = sql_data_node_as_pandas.read()
+            assert isinstance(numpy_data, np.ndarray)
+            assert np.array_equal(numpy_data, pd.DataFrame(self.mock_read_value()).to_numpy())
+
+    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
+    def test_read_custom_exposed_type(self, pandas_properties):
+        custom_properties = pandas_properties.copy()
+
+        custom_properties.pop("db_extra_args")
+        custom_properties["exposed_type"] = MyCustomObject
+        sql_data_node = SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
+
+        mock_return_data = [
+            {"foo": "baz", "bar": "qux"},
+            {"foo": "quux", "bar": "quuz"},
+            {"foo": "corge"},
+            {"bar": "grault"},
+            {"KWARGS_KEY": "KWARGS_VALUE"},
+            {},
+        ]
+
+        with patch("sqlalchemy.engine.Engine.connect") as engine_mock:
+            cursor_mock = engine_mock.return_value.__enter__.return_value
+            cursor_mock.execute.return_value = mock_return_data
+            custom_data = sql_data_node.read()
+
+        for row_mock_data, row_custom in zip(mock_return_data, custom_data):
+            assert isinstance(row_custom, MyCustomObject)
+            assert row_custom.foo == row_mock_data.pop("foo", None)
+            assert row_custom.bar == row_mock_data.pop("bar", None)
+            assert row_custom.kwargs == row_mock_data
+
+    @pytest.mark.parametrize(
+        "tmp_sqlite_path",
+        [
+            "tmp_sqlite_db_file_path",
+            "tmp_sqlite_sqlite3_file_path",
+        ],
+    )
+    def test_sqlite_read_file_with_different_extension(self, tmp_sqlite_path, request):
+        tmp_sqlite_path = request.getfixturevalue(tmp_sqlite_path)
+        folder_path, db_name, file_extension = tmp_sqlite_path
+        properties = {
+            "db_engine": "sqlite",
+            "table_name": "example",
+            "db_name": db_name,
+            "sqlite_folder_path": folder_path,
+            "sqlite_file_extension": file_extension,
+        }
+
+        dn = SQLTableDataNode("sqlite_dn", Scope.SCENARIO, properties=properties)
+        data = dn.read()
+
+        assert data.equals(pd.DataFrame([{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}]))

+ 1 - 326
tests/core/data/test_sql_table_data_node.py

@@ -12,14 +12,10 @@
 from importlib import util
 from unittest.mock import patch
 
-import numpy as np
-import pandas as pd
 import pytest
-from pandas.testing import assert_frame_equal
 
 from taipy.config.common.scope import Scope
 from taipy.core.data.data_node_id import DataNodeId
-from taipy.core.data.operator import JoinOperator, Operator
 from taipy.core.data.sql_table import SQLTableDataNode
 from taipy.core.exceptions.exceptions import InvalidExposedType, MissingRequiredProperty
 
@@ -140,122 +136,6 @@ class TestSQLTableDataNode:
         assert sql_data_node_as_modin.properties["exposed_type"] == "pandas"
         assert sql_data_node_as_modin.read() == "pandas"
 
-    @patch("taipy.core.data.sql_table.SQLTableDataNode._read_as", return_value="custom")
-    @patch("taipy.core.data.sql_table.SQLTableDataNode._read_as_pandas_dataframe", return_value="pandas")
-    @patch("taipy.core.data.sql_table.SQLTableDataNode._read_as_numpy", return_value="numpy")
-    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
-    def test_read(
-        self,
-        mock_read_as,
-        mock_read_as_pandas_dataframe,
-        mock_read_as_numpy,
-        pandas_properties,
-    ):
-        custom_properties = pandas_properties.copy()
-        # Create SQLTableDataNode without exposed_type (Default is pandas.DataFrame)
-        sql_data_node_as_pandas = SQLTableDataNode(
-            "foo",
-            Scope.SCENARIO,
-            properties=pandas_properties,
-        )
-
-        assert sql_data_node_as_pandas.read() == "pandas"
-
-        custom_properties.pop("db_extra_args")
-        custom_properties["exposed_type"] = MyCustomObject
-        # Create the same SQLTableDataNode but with custom exposed_type
-        sql_data_node_as_custom_object = SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
-        assert sql_data_node_as_custom_object.read() == "custom"
-
-        # Create the same SQLDataSource but with numpy exposed_type
-        custom_properties["exposed_type"] = "numpy"
-        sql_data_source_as_numpy_object = SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
-
-        assert sql_data_source_as_numpy_object.read() == "numpy"
-
-
-    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
-    def test_read_as(self, pandas_properties):
-        custom_properties = pandas_properties.copy()
-
-        custom_properties.pop("db_extra_args")
-        custom_properties["exposed_type"] = MyCustomObject
-        sql_data_node = SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
-
-        with patch("sqlalchemy.engine.Engine.connect") as engine_mock:
-            cursor_mock = engine_mock.return_value.__enter__.return_value
-            cursor_mock.execute.return_value = [
-                {"foo": "baz", "bar": "qux"},
-                {"foo": "quux", "bar": "quuz"},
-                {"foo": "corge"},
-                {"bar": "grault"},
-                {"KWARGS_KEY": "KWARGS_VALUE"},
-                {},
-            ]
-            data = sql_data_node._read_as()
-
-        assert isinstance(data, list)
-        assert isinstance(data[0], MyCustomObject)
-        assert isinstance(data[1], MyCustomObject)
-        assert isinstance(data[2], MyCustomObject)
-        assert isinstance(data[3], MyCustomObject)
-        assert isinstance(data[4], MyCustomObject)
-        assert isinstance(data[5], MyCustomObject)
-
-        assert data[0].foo == "baz"
-        assert data[0].bar == "qux"
-        assert data[1].foo == "quux"
-        assert data[1].bar == "quuz"
-        assert data[2].foo == "corge"
-        assert data[2].bar is None
-        assert data[3].foo is None
-        assert data[3].bar == "grault"
-        assert data[4].foo is None
-        assert data[4].bar is None
-        assert data[4].kwargs["KWARGS_KEY"] == "KWARGS_VALUE"
-        assert data[5].foo is None
-        assert data[5].bar is None
-        assert len(data[5].args) == 0
-        assert len(data[5].kwargs) == 0
-
-        with patch("sqlalchemy.engine.Engine.connect") as engine_mock:
-            cursor_mock = engine_mock.return_value.__enter__.return_value
-            cursor_mock.execute.return_value = []
-            data_2 = sql_data_node._read_as()
-        assert isinstance(data_2, list)
-        assert len(data_2) == 0
-
-    @pytest.mark.parametrize(
-        "data,written_data,called_func",
-        [
-            ([{"a": 1, "b": 2}, {"a": 3, "b": 4}], [{"a": 1, "b": 2}, {"a": 3, "b": 4}], "__insert_dicts"),
-            ({"a": 1, "b": 2}, [{"a": 1, "b": 2}], "__insert_dicts"),
-            ([(1, 2), (3, 4)], [(1, 2), (3, 4)], "__insert_tuples"),
-            ([[1, 2], [3, 4]], [[1, 2], [3, 4]], "__insert_tuples"),
-            ((1, 2), [(1, 2)], "__insert_tuples"),
-            ([1, 2, 3, 4], [(1,), (2,), (3,), (4,)], "__insert_tuples"),
-            ("foo", [("foo",)], "__insert_tuples"),
-            (None, [(None,)], "__insert_tuples"),
-            (np.array([1, 2, 3, 4]), [(1,), (2,), (3,), (4,)], "__insert_tuples"),
-            (np.array([np.array([1, 2]), np.array([3, 4])]), [[1, 2], [3, 4]], "__insert_tuples"),
-        ],
-    )
-    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
-    def test_write_1(self, data, written_data, called_func, pandas_properties):
-        custom_properties = pandas_properties.copy()
-        custom_properties.pop("db_extra_args")
-        dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
-
-        with patch("sqlalchemy.engine.Engine.connect") as engine_mock, patch(
-            "taipy.core.data.sql_table.SQLTableDataNode._create_table"
-        ) as create_table_mock:
-            cursor_mock = engine_mock.return_value.__enter__.return_value
-            cursor_mock.execute.side_effect = None
-
-            with patch(f"taipy.core.data.sql_table.SQLTableDataNode._SQLTableDataNode{called_func}") as mck:
-                dn.write(data)
-                mck.assert_called_once_with(written_data, create_table_mock.return_value, cursor_mock, True)
-
     @pytest.mark.parametrize("pandas_properties", __pandas_properties)
     def test_raise_error_invalid_exposed_type(self, pandas_properties):
         custom_properties = pandas_properties.copy()
@@ -264,47 +144,6 @@ class TestSQLTableDataNode:
         with pytest.raises(InvalidExposedType):
             SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
 
-    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
-    def test_write_dataframe(self, pandas_properties):
-        # test write pandas dataframe
-        custom_properties = pandas_properties.copy()
-        custom_properties.pop("db_extra_args")
-        dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
-
-        df = pd.DataFrame({"a": [1, 2, 3, 4], "b": [5, 6, 7, 8]})
-        with patch("sqlalchemy.engine.Engine.connect") as engine_mock, patch(
-            "taipy.core.data.sql_table.SQLTableDataNode._create_table"
-        ):
-            cursor_mock = engine_mock.return_value.__enter__.return_value
-            cursor_mock.execute.side_effect = None
-
-            with patch("taipy.core.data.sql_table.SQLTableDataNode._SQLTableDataNode__insert_dataframe") as mck:
-                dn.write(df)
-                assert mck.call_args[0][0].equals(df)
-
-    @pytest.mark.parametrize(
-        "data",
-        [
-            [],
-            np.array([]),
-        ],
-    )
-    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
-    def test_write_empty_list(self, data, pandas_properties):
-        custom_properties = pandas_properties.copy()
-        custom_properties.pop("db_extra_args")
-        dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
-
-        with patch("sqlalchemy.engine.Engine.connect") as engine_mock, patch(
-            "taipy.core.data.sql_table.SQLTableDataNode._create_table"
-        ) as create_table_mock:
-            cursor_mock = engine_mock.return_value.__enter__.return_value
-            cursor_mock.execute.side_effect = None
-
-            with patch("taipy.core.data.sql_table.SQLTableDataNode._SQLTableDataNode__delete_all_rows") as mck:
-                dn.write(data)
-                mck.assert_called_once_with(create_table_mock.return_value, cursor_mock, True)
-
     @pytest.mark.parametrize("pandas_properties", __pandas_properties)
     @patch("pandas.read_sql_query")
     def test_engine_cache(self, _, pandas_properties):
@@ -328,172 +167,8 @@ class TestSQLTableDataNode:
             dn.db_username = "foo"
             assert dn._engine is None
 
-            dn.write(1)
+            dn.write({})
             assert dn._engine is not None
 
             dn.some_random_attribute_that_does_not_related_to_engine = "foo"
             assert dn._engine is not None
-
-    @pytest.mark.parametrize(
-        "tmp_sqlite_path",
-        [
-            "tmp_sqlite_db_file_path",
-            "tmp_sqlite_sqlite3_file_path",
-        ],
-    )
-    def test_sqlite_read_file_with_different_extension(self, tmp_sqlite_path, request):
-        tmp_sqlite_path = request.getfixturevalue(tmp_sqlite_path)
-        folder_path, db_name, file_extension = tmp_sqlite_path
-        properties = {
-            "db_engine": "sqlite",
-            "table_name": "example",
-            "db_name": db_name,
-            "sqlite_folder_path": folder_path,
-            "sqlite_file_extension": file_extension,
-        }
-
-        dn = SQLTableDataNode("sqlite_dn", Scope.SCENARIO, properties=properties)
-        data = dn.read()
-
-        assert data.equals(pd.DataFrame([{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}]))
-
-    def test_sqlite_append_pandas(self, tmp_sqlite_sqlite3_file_path):
-        folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
-        properties = {
-            "db_engine": "sqlite",
-            "table_name": "example",
-            "db_name": db_name,
-            "sqlite_folder_path": folder_path,
-            "sqlite_file_extension": file_extension,
-        }
-
-        dn = SQLTableDataNode("sqlite_dn", Scope.SCENARIO, properties=properties)
-        original_data = pd.DataFrame([{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}])
-        data = dn.read()
-        assert_frame_equal(data, original_data)
-
-        append_data_1 = pd.DataFrame([{"foo": 5, "bar": 6}, {"foo": 7, "bar": 8}])
-        dn.append(append_data_1)
-        assert_frame_equal(dn.read(), pd.concat([original_data, append_data_1]).reset_index(drop=True))
-
-    def test_filter_pandas_exposed_type(self, tmp_sqlite_sqlite3_file_path):
-        folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
-        properties = {
-            "db_engine": "sqlite",
-            "table_name": "example",
-            "db_name": db_name,
-            "sqlite_folder_path": folder_path,
-            "sqlite_file_extension": file_extension,
-            "exposed_type": "pandas",
-        }
-        dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=properties)
-        dn.write(
-            pd.DataFrame(
-                [
-                    {"foo": 1, "bar": 1},
-                    {"foo": 1, "bar": 2},
-                    {"foo": 1, "bar": 3},
-                    {"foo": 2, "bar": 1},
-                    {"foo": 2, "bar": 2},
-                    {"foo": 2, "bar": 3},
-                ]
-            )
-        )
-
-        # Test datanode indexing and slicing
-        assert dn["foo"].equals(pd.Series([1, 1, 1, 2, 2, 2]))
-        assert dn["bar"].equals(pd.Series([1, 2, 3, 1, 2, 3]))
-        assert dn[:2].equals(pd.DataFrame([{"foo": 1, "bar": 1}, {"foo": 1, "bar": 2}]))
-
-        # Test filter data
-        filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
-        filtered_by_indexing = dn[dn["foo"] == 1]
-        expected_data = pd.DataFrame([{"foo": 1, "bar": 1}, {"foo": 1, "bar": 2}, {"foo": 1, "bar": 3}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
-        filtered_by_indexing = dn[dn["foo"] != 1]
-        expected_data = pd.DataFrame([{"foo": 2, "bar": 1}, {"foo": 2, "bar": 2}, {"foo": 2, "bar": 3}])
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-        filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
-        filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
-        expected_data = pd.DataFrame(
-            [
-                {"foo": 1, "bar": 1},
-                {"foo": 1, "bar": 2},
-                {"foo": 2, "bar": 1},
-                {"foo": 2, "bar": 2},
-            ]
-        )
-        assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
-        assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
-
-    def test_filter_numpy_exposed_type(self, tmp_sqlite_sqlite3_file_path):
-        folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
-        properties = {
-            "db_engine": "sqlite",
-            "table_name": "example",
-            "db_name": db_name,
-            "sqlite_folder_path": folder_path,
-            "sqlite_file_extension": file_extension,
-            "exposed_type": "numpy",
-        }
-        dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=properties)
-        dn.write(
-            pd.DataFrame(
-                [
-                    {"foo": 1, "bar": 1},
-                    {"foo": 1, "bar": 2},
-                    {"foo": 1, "bar": 3},
-                    {"foo": 2, "bar": 1},
-                    {"foo": 2, "bar": 2},
-                    {"foo": 2, "bar": 3},
-                ]
-            )
-        )
-
-        # Test datanode indexing and slicing
-        assert np.array_equal(dn[0], np.array([1, 1]))
-        assert np.array_equal(dn[1], np.array([1, 2]))
-        assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
-        assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
-        assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
-
-        # Test filter data
-        assert np.array_equal(dn.filter(("foo", 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
-        assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
-
-        assert np.array_equal(dn.filter(("foo", 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
-        assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
-
-        assert np.array_equal(dn.filter(("bar", 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
-        assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
-
-        assert np.array_equal(
-            dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR),
-            np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
-        )
-        assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))
-
-    def test_filter_does_not_read_all_entities(self, tmp_sqlite_sqlite3_file_path):
-        folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
-        properties = {
-            "db_engine": "sqlite",
-            "table_name": "example",
-            "db_name": db_name,
-            "sqlite_folder_path": folder_path,
-            "sqlite_file_extension": file_extension,
-            "exposed_type": "numpy",
-        }
-        dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=properties)
-
-        # SQLTableDataNode.filter() should not call the MongoCollectionDataNode._read() method
-        with patch.object(SQLTableDataNode, "_read") as read_mock:
-            dn.filter(("foo", 1, Operator.EQUAL))
-            dn.filter(("bar", 2, Operator.NOT_EQUAL))
-            dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
-
-            assert read_mock["_read"].call_count == 0

+ 236 - 0
tests/core/data/test_write_parquet_data_node.py

@@ -0,0 +1,236 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import os
+import pathlib
+from importlib import util
+
+import numpy as np
+import pandas as pd
+import pytest
+from pandas.testing import assert_frame_equal
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.parquet import ParquetDataNode
+
+
+@pytest.fixture(scope="function", autouse=True)
+def cleanup():
+    yield
+    path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
+    if os.path.isfile(path):
+        os.remove(path)
+
+
+class MyCustomObject:
+    def __init__(self, id, integer, text):
+        self.id = id
+        self.integer = integer
+        self.text = text
+
+    def __eq__(self, value) -> bool:
+        return self.id == value.id and self.integer == value.integer and self.text == value.text
+
+
+class MyOtherCustomObject:
+    def __init__(self, id, sentence):
+        self.id = id
+        self.sentence = sentence
+
+
+def create_custom_class(**kwargs):
+    return MyOtherCustomObject(id=kwargs["id"], sentence=kwargs["text"])
+
+
+class TestWriteParquetDataNode:
+    __engine = ["pyarrow"]
+    if util.find_spec("fastparquet"):
+        __engine.append("fastparquet")
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_read_write_after_modify_path(self, engine):
+        path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
+        new_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": engine})
+        read_data = dn.read()
+        assert read_data is not None
+        dn.path = new_path
+        with pytest.raises(FileNotFoundError):
+            dn.read()
+        dn.write(read_data)
+        assert dn.read().equals(read_data)
+
+    def test_write_pandas(self, tmpdir_factory):
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        parquet_dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path})
+
+        df = pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])
+        parquet_dn.write(df)
+
+        assert pathlib.Path(temp_file_path).exists()
+
+        dn_data = parquet_dn.read()
+
+        assert isinstance(dn_data, pd.DataFrame)
+        assert dn_data.equals(df)
+
+        parquet_dn.write(df["a"])
+        assert pd.DataFrame.equals(parquet_dn.read(), df[["a"]])
+
+        series = pd.Series([1, 2, 3])
+        parquet_dn.write(series)
+        assert np.array_equal(parquet_dn.read().to_numpy(), pd.DataFrame(series).to_numpy())
+
+        parquet_dn.write(None)
+        assert parquet_dn.read().empty
+
+    def test_write_numpy(self, tmpdir_factory):
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        parquet_dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "numpy"}
+        )
+
+        arr = np.array([[1], [2], [3], [4], [5]])
+        parquet_dn.write(arr)
+        assert np.array_equal(parquet_dn.read(), arr)
+
+        arr = arr[0:3]
+        parquet_dn.write(arr)
+        assert np.array_equal(parquet_dn.read(), arr)
+
+        parquet_dn.write(None)
+        assert parquet_dn.read().size == 0
+
+    def test_write_custom_exposed_type(self, tmpdir_factory):
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        parquet_dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": MyCustomObject}
+        )
+
+        data = [MyCustomObject(0, 1, "hi"), MyCustomObject(1, 2, "world"), MyCustomObject(2, 3, "text")]
+        parquet_dn.write(data)
+        assert all(actual == expected for actual, expected in zip(parquet_dn.read(), data))
+
+        parquet_dn.write(None)
+        assert parquet_dn.read() == []
+
+    @pytest.mark.parametrize("engine", __engine)
+    def test_write_kwarg_precedence(self, engine, tmpdir_factory, default_data_frame):
+        # Precedence:
+        # 1. Class read/write methods
+        # 2. Defined in read_kwargs and write_kwargs, in properties
+        # 3. Defined top-level in properties
+
+        temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
+        temp_file_2_path = str(tmpdir_factory.mktemp("data").join("temp_2.parquet"))
+        df = default_data_frame.copy(deep=True)
+
+        # Write
+        # 3
+        comp3 = "snappy"
+        dn = ParquetDataNode(
+            "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "compression": comp3}
+        )
+        dn.write(df)
+        df.to_parquet(path=temp_file_2_path, compression=comp3, engine=engine)
+        with open(temp_file_2_path, "rb") as tf:
+            with pathlib.Path(temp_file_path).open("rb") as f:
+                assert f.read() == tf.read()
+
+        # 3 and 2
+        comp2 = "gzip"
+        dn = ParquetDataNode(
+            "foo",
+            Scope.SCENARIO,
+            properties={
+                "path": temp_file_path,
+                "engine": engine,
+                "compression": comp3,
+                "write_kwargs": {"compression": comp2},
+            },
+        )
+        dn.write(df)
+        df.to_parquet(path=temp_file_2_path, compression=comp2, engine=engine)
+        with open(temp_file_2_path, "rb") as tf:
+            with pathlib.Path(temp_file_path).open("rb") as f:
+                assert f.read() == tf.read()
+
+        # 3, 2 and 1
+        comp1 = "brotli"
+        dn = ParquetDataNode(
+            "foo",
+            Scope.SCENARIO,
+            properties={
+                "path": temp_file_path,
+                "engine": engine,
+                "compression": comp3,
+                "write_kwargs": {"compression": comp2},
+            },
+        )
+        dn.write_with_kwargs(df, compression=comp1)
+        df.to_parquet(path=temp_file_2_path, compression=comp1, engine=engine)
+        with open(temp_file_2_path, "rb") as tf:
+            with pathlib.Path(temp_file_path).open("rb") as f:
+                assert f.read() == tf.read()
+
+        # Read
+        df.to_parquet(temp_file_path, engine=engine)
+        # 2
+        cols2 = ["a", "b"]
+        dn = ParquetDataNode(
+            "foo",
+            Scope.SCENARIO,
+            properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
+        )
+        assert set(dn.read().columns) == set(cols2)
+
+        # 1
+        cols1 = ["a"]
+        dn = ParquetDataNode(
+            "foo",
+            Scope.SCENARIO,
+            properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
+        )
+        assert set(dn.read_with_kwargs(columns=cols1).columns) == set(cols1)
+
+    def test_partition_cols(self, tmpdir_factory, default_data_frame: pd.DataFrame):
+        temp_dir_path = str(tmpdir_factory.mktemp("data").join("temp_dir"))
+
+        write_kwargs = {"partition_cols": ["a", "b"]}
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_dir_path, "write_kwargs": write_kwargs})  # type: ignore
+        dn.write(default_data_frame)
+
+        assert pathlib.Path(temp_dir_path).is_dir()
+        # dtypes change during round-trip with partition_cols
+        pd.testing.assert_frame_equal(
+            dn.read().sort_index(axis=1),
+            default_data_frame.sort_index(axis=1),
+            check_dtype=False,
+            check_categorical=False,
+        )
+
+    @pytest.mark.skipif(not util.find_spec("fastparquet"), reason="Append parquet requires fastparquet to be installed")
+    @pytest.mark.parametrize(
+        "content",
+        [
+            ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
+            (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
+        ],
+    )
+    def test_append_pandas(self, parquet_file_path, default_data_frame, content):
+        dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path})
+        assert_frame_equal(dn.read(), default_data_frame)
+
+        dn.append(content)
+        assert_frame_equal(
+            dn.read(),
+            pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
+        )

+ 186 - 0
tests/core/data/test_write_sql_table_data_node.py

@@ -0,0 +1,186 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+from importlib import util
+from unittest.mock import patch
+
+import numpy as np
+import pandas as pd
+import pytest
+from pandas.testing import assert_frame_equal
+
+from taipy.config.common.scope import Scope
+from taipy.core.data.sql_table import SQLTableDataNode
+
+
+class MyCustomObject:
+    def __init__(self, x=None, y=None):
+        self.x = x
+        self.y = y
+
+
+class TestWriteSQLTableDataNode:
+    __pandas_properties = [
+        {
+            "db_name": "taipy",
+            "db_engine": "sqlite",
+            "table_name": "example",
+            "db_extra_args": {
+                "TrustServerCertificate": "yes",
+                "other": "value",
+            },
+        },
+    ]
+
+    if util.find_spec("pyodbc"):
+        __pandas_properties.append(
+            {
+                "db_username": "sa",
+                "db_password": "Passw0rd",
+                "db_name": "taipy",
+                "db_engine": "mssql",
+                "table_name": "example",
+                "db_extra_args": {
+                    "TrustServerCertificate": "yes",
+                },
+            },
+        )
+
+    if util.find_spec("pymysql"):
+        __pandas_properties.append(
+            {
+                "db_username": "sa",
+                "db_password": "Passw0rd",
+                "db_name": "taipy",
+                "db_engine": "mysql",
+                "table_name": "example",
+                "db_extra_args": {
+                    "TrustServerCertificate": "yes",
+                },
+            },
+        )
+
+    if util.find_spec("psycopg2"):
+        __pandas_properties.append(
+            {
+                "db_username": "sa",
+                "db_password": "Passw0rd",
+                "db_name": "taipy",
+                "db_engine": "postgresql",
+                "table_name": "example",
+                "db_extra_args": {
+                    "TrustServerCertificate": "yes",
+                },
+            },
+        )
+
+    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
+    def test_write_pandas(self, pandas_properties):
+        custom_properties = pandas_properties.copy()
+        custom_properties.pop("db_extra_args")
+        sql_table_dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
+
+        with patch("sqlalchemy.engine.Engine.connect") as engine_mock, patch(
+            "taipy.core.data.sql_table.SQLTableDataNode._create_table"
+        ) as _:
+            cursor_mock = engine_mock.return_value.__enter__.return_value
+            cursor_mock.execute.side_effect = None
+
+            with patch("taipy.core.data.sql_table.SQLTableDataNode._SQLTableDataNode__insert_dataframe") as mck:
+                df = pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])
+                sql_table_dn.write(df)
+                assert mck.call_count == 1
+
+                sql_table_dn.write(df["a"])
+                assert mck.call_count == 2
+
+                sql_table_dn.write(pd.DataFrame())
+                assert mck.call_count == 3
+
+                series = pd.Series([1, 2, 3])
+                sql_table_dn.write(series)
+                assert mck.call_count == 4
+
+                sql_table_dn.write(None)
+                assert mck.call_count == 5
+
+    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
+    def test_write_numpy(self, pandas_properties):
+        custom_properties = pandas_properties.copy()
+        custom_properties["exposed_type"] = "numpy"
+        custom_properties.pop("db_extra_args")
+        sql_table_dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
+
+        with patch("sqlalchemy.engine.Engine.connect") as engine_mock, patch(
+            "taipy.core.data.sql_table.SQLTableDataNode._create_table"
+        ) as _:
+            cursor_mock = engine_mock.return_value.__enter__.return_value
+            cursor_mock.execute.side_effect = None
+
+            with patch("taipy.core.data.sql_table.SQLTableDataNode._SQLTableDataNode__insert_dataframe") as mck:
+                arr = np.array([[1], [2], [3], [4], [5]])
+                sql_table_dn.write(arr)
+                assert mck.call_count == 1
+
+                sql_table_dn.write(arr[0:3])
+                assert mck.call_count == 2
+
+                sql_table_dn.write(np.array([]))
+                assert mck.call_count == 3
+
+                sql_table_dn.write(None)
+                assert mck.call_count == 4
+
+    @pytest.mark.parametrize("pandas_properties", __pandas_properties)
+    def test_write_custom_exposed_type(self, pandas_properties):
+        custom_properties = pandas_properties.copy()
+        custom_properties["exposed_type"] = MyCustomObject
+        custom_properties.pop("db_extra_args")
+        sql_table_dn = SQLTableDataNode("foo", Scope.SCENARIO, properties=custom_properties)
+
+        with patch("sqlalchemy.engine.Engine.connect") as engine_mock, patch(
+            "taipy.core.data.sql_table.SQLTableDataNode._create_table"
+        ) as _:
+            cursor_mock = engine_mock.return_value.__enter__.return_value
+            cursor_mock.execute.side_effect = None
+
+            with patch("taipy.core.data.sql_table.SQLTableDataNode._SQLTableDataNode__insert_dataframe") as mck:
+                custom_data = [
+                    MyCustomObject(1, 2),
+                    MyCustomObject(3, 4),
+                    MyCustomObject(None, 2),
+                    MyCustomObject(1, None),
+                    MyCustomObject(None, None),
+                ]
+                sql_table_dn.write(custom_data)
+                assert mck.call_count == 1
+
+                sql_table_dn.write(None)
+                assert mck.call_count == 2
+
+    def test_sqlite_append_pandas(self, tmp_sqlite_sqlite3_file_path):
+        folder_path, db_name, file_extension = tmp_sqlite_sqlite3_file_path
+        properties = {
+            "db_engine": "sqlite",
+            "table_name": "example",
+            "db_name": db_name,
+            "sqlite_folder_path": folder_path,
+            "sqlite_file_extension": file_extension,
+        }
+
+        dn = SQLTableDataNode("sqlite_dn", Scope.SCENARIO, properties=properties)
+        original_data = pd.DataFrame([{"foo": 1, "bar": 2}, {"foo": 3, "bar": 4}])
+        data = dn.read()
+        assert_frame_equal(data, original_data)
+
+        append_data_1 = pd.DataFrame([{"foo": 5, "bar": 6}, {"foo": 7, "bar": 8}])
+        dn.append(append_data_1)
+        assert_frame_equal(dn.read(), pd.concat([original_data, append_data_1]).reset_index(drop=True))