|
@@ -16,8 +16,10 @@ from datetime import datetime
|
|
|
from importlib import util
|
|
|
from time import sleep
|
|
|
|
|
|
+import numpy as np
|
|
|
import pandas as pd
|
|
|
import pytest
|
|
|
+from pandas.testing import assert_frame_equal
|
|
|
|
|
|
from taipy.config.common.scope import Scope
|
|
|
from taipy.config.config import Config
|
|
@@ -221,3 +223,94 @@ class TestParquetDataNode:
|
|
|
|
|
|
assert ".data" not in dn.path
|
|
|
assert os.path.exists(dn.path)
|
|
|
+
|
|
|
+ def test_get_downloadable_path(self):
|
|
|
+ path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "pandas"})
|
|
|
+ assert dn._get_downloadable_path() == path
|
|
|
+
|
|
|
+ def test_get_downloadable_path_with_not_existing_file(self):
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": "NOT_EXISTING.parquet"})
|
|
|
+ assert dn._get_downloadable_path() == ""
|
|
|
+
|
|
|
+ def test_get_downloadable_path_as_directory_should_return_nothing(self):
|
|
|
+ path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path})
|
|
|
+ assert dn._get_downloadable_path() == ""
|
|
|
+
|
|
|
+ def test_upload(self, parquet_file_path, tmpdir_factory):
|
|
|
+ old_parquet_path = tmpdir_factory.mktemp("data").join("df.parquet").strpath
|
|
|
+ old_data = pd.DataFrame([{"a": 0, "b": 1, "c": 2}, {"a": 3, "b": 4, "c": 5}])
|
|
|
+
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": old_parquet_path, "exposed_type": "pandas"})
|
|
|
+ dn.write(old_data)
|
|
|
+ old_last_edit_date = dn.last_edit_date
|
|
|
+
|
|
|
+ upload_content = pd.read_parquet(parquet_file_path)
|
|
|
+ dn._upload(parquet_file_path)
|
|
|
+
|
|
|
+ assert_frame_equal(dn.read(), upload_content) # The content of the dn should change to the uploaded content
|
|
|
+ assert dn.last_edit_date > old_last_edit_date
|
|
|
+ assert dn.path == old_parquet_path # The path of the dn should not change
|
|
|
+
|
|
|
+ def test_upload_with_upload_check_pandas(self, parquet_file_path, tmpdir_factory):
|
|
|
+ old_parquet_path = tmpdir_factory.mktemp("data").join("df.parquet").strpath
|
|
|
+ old_data = pd.DataFrame([{"a": 0, "b": 1, "c": 2}, {"a": 3, "b": 4, "c": 5}])
|
|
|
+
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": old_parquet_path, "exposed_type": "pandas"})
|
|
|
+ dn.write(old_data)
|
|
|
+ old_last_edit_date = dn.last_edit_date
|
|
|
+
|
|
|
+ def check_data_column(upload_path, upload_data):
|
|
|
+ return upload_path.endswith(".parquet") and upload_data.columns.tolist() == ["a", "b", "c"]
|
|
|
+
|
|
|
+ wrong_format_not_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_parquet").strpath
|
|
|
+ wrong_format_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.parquet").strpath
|
|
|
+ pd.DataFrame([{"a": 1, "b": 2, "d": 3}, {"a": 4, "b": 5, "d": 6}]).to_parquet(
|
|
|
+ wrong_format_parquet_path, index=False
|
|
|
+ )
|
|
|
+
|
|
|
+ # The upload should fail when the file is not a parquet
|
|
|
+ assert not dn._upload(wrong_format_not_parquet_path, upload_checker=check_data_column)
|
|
|
+
|
|
|
+ # The upload should fail when check_data_column() return False
|
|
|
+ assert not dn._upload(wrong_format_parquet_path, upload_checker=check_data_column)
|
|
|
+
|
|
|
+ assert_frame_equal(dn.read(), old_data) # The content of the dn should not change when upload fails
|
|
|
+ assert dn.last_edit_date == old_last_edit_date # The last edit date should not change when upload fails
|
|
|
+ assert dn.path == old_parquet_path # The path of the dn should not change
|
|
|
+
|
|
|
+ # The upload should succeed when check_data_column() return True
|
|
|
+ assert dn._upload(parquet_file_path, upload_checker=check_data_column)
|
|
|
+
|
|
|
+ def test_upload_with_upload_check_numpy(self, tmpdir_factory):
|
|
|
+ old_parquet_path = tmpdir_factory.mktemp("data").join("df.parquet").strpath
|
|
|
+ old_data = np.array([[1, 2, 3], [4, 5, 6]])
|
|
|
+
|
|
|
+ new_parquet_path = tmpdir_factory.mktemp("data").join("new_upload_data.parquet").strpath
|
|
|
+ new_data = np.array([[1, 2, 3], [4, 5, 6]])
|
|
|
+ pd.DataFrame(new_data).to_parquet(new_parquet_path, index=False)
|
|
|
+
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": old_parquet_path, "exposed_type": "numpy"})
|
|
|
+ dn.write(old_data)
|
|
|
+ old_last_edit_date = dn.last_edit_date
|
|
|
+
|
|
|
+ def check_data_is_positive(upload_path, upload_data):
|
|
|
+ return upload_path.endswith(".parquet") and np.all(upload_data > 0)
|
|
|
+
|
|
|
+ wrong_format_not_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_parquet").strpath
|
|
|
+ wrong_format_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.parquet").strpath
|
|
|
+ pd.DataFrame(np.array([[-1, 2, 3], [-4, -5, -6]])).to_parquet(wrong_format_parquet_path, index=False)
|
|
|
+
|
|
|
+ # The upload should fail when the file is not a parquet
|
|
|
+ assert not dn._upload(wrong_format_not_parquet_path, upload_checker=check_data_is_positive)
|
|
|
+
|
|
|
+ # The upload should fail when check_data_is_positive() return False
|
|
|
+ assert not dn._upload(wrong_format_parquet_path, upload_checker=check_data_is_positive)
|
|
|
+
|
|
|
+ np.array_equal(dn.read(), old_data) # The content of the dn should not change when upload fails
|
|
|
+ assert dn.last_edit_date == old_last_edit_date # The last edit date should not change when upload fails
|
|
|
+ assert dn.path == old_parquet_path # The path of the dn should not change
|
|
|
+
|
|
|
+ # The upload should succeed when check_data_is_positive() return True
|
|
|
+ assert dn._upload(new_parquet_path, upload_checker=check_data_is_positive)
|