# Copyright 2021-2024 Avaiga Private Limited # # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. import os import pathlib from datetime import datetime from importlib import util from time import sleep import numpy as np import pandas as pd import pytest from pandas.testing import assert_frame_equal from taipy.config.common.scope import Scope from taipy.config.config import Config from taipy.config.exceptions.exceptions import InvalidConfigurationId from taipy.core.data._data_manager import _DataManager from taipy.core.data.data_node_id import DataNodeId from taipy.core.data.operator import JoinOperator, Operator from taipy.core.data.parquet import ParquetDataNode from taipy.core.exceptions.exceptions import ( InvalidExposedType, NoData, UnknownCompressionAlgorithm, UnknownParquetEngine, ) @pytest.fixture(scope="function", autouse=True) def cleanup(): yield path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet") if os.path.isfile(path): os.remove(path) class MyCustomObject: def __init__(self, id, integer, text): self.id = id self.integer = integer self.text = text class MyOtherCustomObject: def __init__(self, id, sentence): self.id = id self.sentence = sentence def create_custom_class(**kwargs): return MyOtherCustomObject(id=kwargs["id"], sentence=kwargs["text"]) class TestParquetDataNode: __engine = ["pyarrow"] if util.find_spec("fastparquet"): __engine.append("fastparquet") def test_create(self): path = "data/node/path" compression = "snappy" dn = ParquetDataNode( "foo_bar", Scope.SCENARIO, properties={"path": path, "compression": compression, "name": "super name"} ) assert isinstance(dn, ParquetDataNode) assert dn.storage_type() == "parquet" assert dn.config_id == "foo_bar" assert dn.name == "super name" assert dn.scope == Scope.SCENARIO assert dn.id is not None assert dn.owner_id is None assert dn.last_edit_date is None assert dn.job_ids == [] assert not dn.is_ready_for_reading assert dn.path == path assert dn.exposed_type == "pandas" assert dn.compression == "snappy" assert dn.engine == "pyarrow" with pytest.raises(InvalidConfigurationId): dn = ParquetDataNode("foo bar", Scope.SCENARIO, properties={"path": path, "name": "super name"}) def test_get_user_properties(self, parquet_file_path): dn_1 = ParquetDataNode("dn_1", Scope.SCENARIO, properties={"path": parquet_file_path}) assert dn_1._get_user_properties() == {} dn_2 = ParquetDataNode( "dn_2", Scope.SCENARIO, properties={ "exposed_type": "numpy", "default_data": "foo", "default_path": parquet_file_path, "engine": "pyarrow", "compression": "snappy", "read_kwargs": {"columns": ["a", "b"]}, "write_kwargs": {"index": False}, "foo": "bar", }, ) # exposed_type, default_data, default_path, path, engine, compression, read_kwargs, write_kwargs # are filtered out assert dn_2._get_user_properties() == {"foo": "bar"} def test_new_parquet_data_node_with_existing_file_is_ready_for_reading(self, parquet_file_path): not_ready_dn_cfg = Config.configure_data_node( "not_ready_data_node_config_id", "parquet", path="NOT_EXISTING.parquet" ) not_ready_dn = _DataManager._bulk_get_or_create([not_ready_dn_cfg])[not_ready_dn_cfg] assert not not_ready_dn.is_ready_for_reading ready_dn_cfg = Config.configure_data_node("ready_data_node_config_id", "parquet", path=parquet_file_path) ready_dn = _DataManager._bulk_get_or_create([ready_dn_cfg])[ready_dn_cfg] assert ready_dn.is_ready_for_reading @pytest.mark.parametrize( ["properties", "exists"], [ ({}, False), ({"default_data": {"a": ["foo", "bar"]}}, True), ], ) def test_create_with_default_data(self, properties, exists): dn = ParquetDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"), properties=properties) assert os.path.exists(dn.path) is exists @pytest.mark.parametrize("engine", __engine) def test_modin_deprecated_in_favor_of_pandas(self, engine, parquet_file_path): # Create ParquetDataNode with modin exposed_type props = {"path": parquet_file_path, "exposed_type": "modin", "engine": engine} parquet_data_node_as_modin = ParquetDataNode("bar", Scope.SCENARIO, properties=props) assert parquet_data_node_as_modin.properties["exposed_type"] == "pandas" data_modin = parquet_data_node_as_modin.read() assert isinstance(data_modin, pd.DataFrame) @pytest.mark.parametrize("engine", __engine) def test_read_file(self, engine, parquet_file_path): not_existing_parquet = ParquetDataNode( "foo", Scope.SCENARIO, properties={"path": "nonexistent.parquet", "engine": engine} ) with pytest.raises(NoData): assert not_existing_parquet.read() is None not_existing_parquet.read_or_raise() df = pd.read_parquet(parquet_file_path) # Create ParquetDataNode without exposed_type (Default is pandas.DataFrame) parquet_data_node_as_pandas = ParquetDataNode( "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "engine": engine} ) data_pandas = parquet_data_node_as_pandas.read() assert isinstance(data_pandas, pd.DataFrame) assert len(data_pandas) == 2 assert data_pandas.equals(df) assert np.array_equal(data_pandas.to_numpy(), df.to_numpy()) # Create ParquetDataNode with numpy exposed_type parquet_data_node_as_numpy = ParquetDataNode( "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy", "engine": engine} ) data_numpy = parquet_data_node_as_numpy.read() assert isinstance(data_numpy, np.ndarray) assert len(data_numpy) == 2 assert np.array_equal(data_numpy, df.to_numpy()) @pytest.mark.parametrize("engine", __engine) def test_read_folder(self, engine): parquet_folder_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example") df = pd.read_parquet(parquet_folder_path) parquet_data_node_as_pandas = ParquetDataNode( "bar", Scope.SCENARIO, properties={"path": parquet_folder_path, "engine": engine} ) data_pandas = parquet_data_node_as_pandas.read() assert isinstance(data_pandas, pd.DataFrame) assert len(data_pandas) == 5 assert data_pandas.equals(df) assert np.array_equal(data_pandas.to_numpy(), df.to_numpy()) def test_set_path(self): dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": "foo.parquet"}) assert dn.path == "foo.parquet" dn.path = "bar.parquet" assert dn.path == "bar.parquet" @pytest.mark.parametrize("engine", __engine) def test_read_write_after_modify_path(self, engine): path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet") new_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet") dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": engine}) read_data = dn.read() assert read_data is not None dn.path = new_path with pytest.raises(FileNotFoundError): dn.read() dn.write(read_data) assert dn.read().equals(read_data) def test_read_custom_exposed_type(self): example_parquet_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet") dn = ParquetDataNode( "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": MyCustomObject} ) assert all(isinstance(obj, MyCustomObject) for obj in dn.read()) dn = ParquetDataNode( "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": create_custom_class} ) assert all(isinstance(obj, MyOtherCustomObject) for obj in dn.read()) def test_raise_error_unknown_parquet_engine(self): path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet") with pytest.raises(UnknownParquetEngine): ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": "foo"}) def test_raise_error_unknown_compression_algorithm(self): path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet") with pytest.raises(UnknownCompressionAlgorithm): ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "compression": "foo"}) def test_raise_error_invalid_exposed_type(self): path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet") with pytest.raises(InvalidExposedType): ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "foo"}) def test_read_empty_data(self, tmpdir_factory): temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet")) empty_df = pd.DataFrame([]) empty_df.to_parquet(temp_file_path) # Pandas dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"}) assert dn.read().equals(empty_df) # Numpy dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "numpy"}) assert np.array_equal(dn.read(), empty_df.to_numpy()) # Custom dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": MyCustomObject}) assert dn.read() == [] def test_get_system_file_modified_date_instead_of_last_edit_date(self, tmpdir_factory): temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet")) pd.DataFrame([]).to_parquet(temp_file_path) dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"}) dn.write(pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})) previous_edit_date = dn.last_edit_date sleep(0.1) pd.DataFrame(pd.DataFrame(data={"col1": [5, 6], "col2": [7, 8]})).to_parquet(temp_file_path) new_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path)) assert previous_edit_date < dn.last_edit_date assert new_edit_date == dn.last_edit_date sleep(0.1) dn.write(pd.DataFrame(data={"col1": [9, 10], "col2": [10, 12]})) assert new_edit_date < dn.last_edit_date os.unlink(temp_file_path) def test_get_system_folder_modified_date_instead_of_last_edit_date(self, tmpdir_factory): temp_folder_path = tmpdir_factory.mktemp("data").strpath temp_file_path = os.path.join(temp_folder_path, "temp.parquet") pd.DataFrame([]).to_parquet(temp_file_path) dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_folder_path}) initial_edit_date = dn.last_edit_date # Sleep so that the file can be created successfully on Ubuntu sleep(0.1) pd.DataFrame(pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})).to_parquet(temp_file_path) first_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path)) assert dn.last_edit_date > initial_edit_date assert dn.last_edit_date == first_edit_date sleep(0.1) pd.DataFrame(pd.DataFrame(data={"col1": [5, 6], "col2": [7, 8]})).to_parquet(temp_file_path) second_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path)) assert dn.last_edit_date > first_edit_date assert dn.last_edit_date == second_edit_date os.unlink(temp_file_path) @pytest.mark.skipif(not util.find_spec("fastparquet"), reason="Append parquet requires fastparquet to be installed") @pytest.mark.parametrize( "content", [ ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]), (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])), ], ) def test_append_pandas(self, parquet_file_path, default_data_frame, content): dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path}) assert_frame_equal(dn.read(), default_data_frame) dn.append(content) assert_frame_equal( dn.read(), pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True), ) @pytest.mark.parametrize( "data", [ [{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}], pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]), ], ) def test_write_to_disk(self, tmpdir_factory, data): temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet")) dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path}) dn.write(data) assert pathlib.Path(temp_file_path).exists() assert isinstance(dn.read(), pd.DataFrame) def test_filter_pandas_exposed_type(self, parquet_file_path): dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "pandas"}) dn.write( [ {"foo": 1, "bar": 1}, {"foo": 1, "bar": 2}, {"foo": 1}, {"foo": 2, "bar": 2}, {"bar": 2}, ] ) # Test datanode indexing and slicing assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None])) assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2])) assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}])) # Test filter data filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL)) filtered_by_indexing = dn[dn["foo"] == 1] expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}]) assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data) assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data) filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL)) filtered_by_indexing = dn[dn["foo"] != 1] expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}]) assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data) assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data) filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL)) filtered_by_indexing = dn[dn["bar"] == 2] expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}]) assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data) assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data) filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR) filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)] expected_data = pd.DataFrame( [ {"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}, ] ) assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data) assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data) def test_filter_numpy_exposed_type(self, parquet_file_path): dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy"}) dn.write( [ [1, 1], [1, 2], [1, 3], [2, 1], [2, 2], [2, 3], ] ) # Test datanode indexing and slicing assert np.array_equal(dn[0], np.array([1, 1])) assert np.array_equal(dn[1], np.array([1, 2])) assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]])) assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2])) assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]])) # Test filter data assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]])) assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]])) assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]])) assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]])) assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]])) assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]])) assert np.array_equal( dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR), np.array([[1, 1], [1, 2], [2, 1], [2, 2]]), ) assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]])) @pytest.mark.parametrize("engine", __engine) def test_pandas_parquet_config_kwargs(self, engine, tmpdir_factory): read_kwargs = {"filters": [("integer", "<", 10)], "columns": ["integer"]} temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet")) dn = ParquetDataNode( "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "read_kwargs": read_kwargs} ) df = pd.read_csv(os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv")) dn.write(df) assert set(pd.read_parquet(temp_file_path).columns) == {"id", "integer", "text"} assert set(dn.read().columns) == set(read_kwargs["columns"]) # !!! filter doesn't work with `fastparquet` without partition_cols if engine == "pyarrow": assert len(dn.read()) != len(df) assert len(dn.read()) == 2 @pytest.mark.parametrize("engine", __engine) def test_kwarg_precedence(self, engine, tmpdir_factory, default_data_frame): # Precedence: # 1. Class read/write methods # 2. Defined in read_kwargs and write_kwargs, in properties # 3. Defined top-level in properties temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet")) temp_file_2_path = str(tmpdir_factory.mktemp("data").join("temp_2.parquet")) df = default_data_frame.copy(deep=True) # Write # 3 comp3 = "snappy" dn = ParquetDataNode( "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "compression": comp3} ) dn.write(df) df.to_parquet(path=temp_file_2_path, compression=comp3, engine=engine) with open(temp_file_2_path, "rb") as tf: with pathlib.Path(temp_file_path).open("rb") as f: assert f.read() == tf.read() # 3 and 2 comp2 = "gzip" dn = ParquetDataNode( "foo", Scope.SCENARIO, properties={ "path": temp_file_path, "engine": engine, "compression": comp3, "write_kwargs": {"compression": comp2}, }, ) dn.write(df) df.to_parquet(path=temp_file_2_path, compression=comp2, engine=engine) with open(temp_file_2_path, "rb") as tf: with pathlib.Path(temp_file_path).open("rb") as f: assert f.read() == tf.read() # 3, 2 and 1 comp1 = "brotli" dn = ParquetDataNode( "foo", Scope.SCENARIO, properties={ "path": temp_file_path, "engine": engine, "compression": comp3, "write_kwargs": {"compression": comp2}, }, ) dn.write_with_kwargs(df, compression=comp1) df.to_parquet(path=temp_file_2_path, compression=comp1, engine=engine) with open(temp_file_2_path, "rb") as tf: with pathlib.Path(temp_file_path).open("rb") as f: assert f.read() == tf.read() # Read df.to_parquet(temp_file_path, engine=engine) # 2 cols2 = ["a", "b"] dn = ParquetDataNode( "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}}, ) assert set(dn.read().columns) == set(cols2) # 1 cols1 = ["a"] dn = ParquetDataNode( "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}}, ) assert set(dn.read_with_kwargs(columns=cols1).columns) == set(cols1) def test_partition_cols(self, tmpdir_factory, default_data_frame: pd.DataFrame): temp_dir_path = str(tmpdir_factory.mktemp("data").join("temp_dir")) write_kwargs = {"partition_cols": ["a", "b"]} dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_dir_path, "write_kwargs": write_kwargs}) # type: ignore dn.write(default_data_frame) assert pathlib.Path(temp_dir_path).is_dir() # dtypes change during round-trip with partition_cols pd.testing.assert_frame_equal( dn.read().sort_index(axis=1), default_data_frame.sort_index(axis=1), check_dtype=False, check_categorical=False, ) def test_read_with_kwargs_never_written(self): path = "data/node/path" dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path}) assert dn.read_with_kwargs() is None