|
@@ -12,12 +12,15 @@
|
|
|
import os
|
|
|
import pathlib
|
|
|
import uuid
|
|
|
-from datetime import datetime
|
|
|
+from datetime import datetime, timedelta
|
|
|
from importlib import util
|
|
|
from time import sleep
|
|
|
|
|
|
+import freezegun
|
|
|
+import numpy as np
|
|
|
import pandas as pd
|
|
|
import pytest
|
|
|
+from pandas.testing import assert_frame_equal
|
|
|
|
|
|
from taipy.config.common.scope import Scope
|
|
|
from taipy.config.config import Config
|
|
@@ -230,3 +233,132 @@ class TestParquetDataNode:
|
|
|
|
|
|
assert ".data" not in dn.path
|
|
|
assert os.path.exists(dn.path)
|
|
|
+
|
|
|
+ def test_get_downloadable_path(self):
|
|
|
+ path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "pandas"})
|
|
|
+ assert dn._get_downloadable_path() == path
|
|
|
+
|
|
|
+ def test_get_downloadable_path_with_not_existing_file(self):
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": "NOT_EXISTING.parquet"})
|
|
|
+ assert dn._get_downloadable_path() == ""
|
|
|
+
|
|
|
+ def test_get_downloadable_path_as_directory_should_return_nothing(self):
|
|
|
+ path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path})
|
|
|
+ assert dn._get_downloadable_path() == ""
|
|
|
+
|
|
|
+ def test_upload(self, parquet_file_path, tmpdir_factory):
|
|
|
+ old_parquet_path = tmpdir_factory.mktemp("data").join("df.parquet").strpath
|
|
|
+ old_data = pd.DataFrame([{"a": 0, "b": 1, "c": 2}, {"a": 3, "b": 4, "c": 5}])
|
|
|
+
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": old_parquet_path, "exposed_type": "pandas"})
|
|
|
+ dn.write(old_data)
|
|
|
+ old_last_edit_date = dn.last_edit_date
|
|
|
+
|
|
|
+ upload_content = pd.read_parquet(parquet_file_path)
|
|
|
+
|
|
|
+ with freezegun.freeze_time(old_last_edit_date + timedelta(seconds=1)):
|
|
|
+ dn._upload(parquet_file_path)
|
|
|
+
|
|
|
+ assert_frame_equal(dn.read(), upload_content) # The content of the dn should change to the uploaded content
|
|
|
+ assert dn.last_edit_date > old_last_edit_date
|
|
|
+ assert dn.path == old_parquet_path # The path of the dn should not change
|
|
|
+
|
|
|
+ def test_upload_with_upload_check_pandas(self, parquet_file_path, tmpdir_factory):
|
|
|
+ old_parquet_path = tmpdir_factory.mktemp("data").join("df.parquet").strpath
|
|
|
+ old_data = pd.DataFrame([{"a": 0, "b": 1, "c": 2}, {"a": 3, "b": 4, "c": 5}])
|
|
|
+
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": old_parquet_path, "exposed_type": "pandas"})
|
|
|
+ dn.write(old_data)
|
|
|
+ old_last_edit_date = dn.last_edit_date
|
|
|
+
|
|
|
+ def check_data_column(upload_path, upload_data):
|
|
|
+ return upload_path.endswith(".parquet") and upload_data.columns.tolist() == ["a", "b", "c"]
|
|
|
+
|
|
|
+ not_exists_parquet_path = tmpdir_factory.mktemp("data").join("not_exists.parquet").strpath
|
|
|
+ reasons = dn._upload(not_exists_parquet_path, upload_checker=check_data_column)
|
|
|
+ assert bool(reasons) is False
|
|
|
+ assert (
|
|
|
+ str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.parquet can not be read,"
|
|
|
+ f' therefore is not a valid data file for data node "{dn.id}"'
|
|
|
+ )
|
|
|
+
|
|
|
+ not_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_parquet").strpath
|
|
|
+ old_data.to_parquet(not_parquet_path, index=False)
|
|
|
+ # The upload should fail when the file is not a parquet
|
|
|
+ reasons = dn._upload(not_parquet_path, upload_checker=check_data_column)
|
|
|
+ assert bool(reasons) is False
|
|
|
+ assert (
|
|
|
+ str(list(reasons._reasons[dn.id])[0])
|
|
|
+ == f'The uploaded file wrong_format_df.not_parquet has invalid data for data node "{dn.id}"'
|
|
|
+ )
|
|
|
+
|
|
|
+ wrong_format_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.parquet").strpath
|
|
|
+ pd.DataFrame([{"a": 1, "b": 2, "d": 3}, {"a": 4, "b": 5, "d": 6}]).to_parquet(
|
|
|
+ wrong_format_parquet_path, index=False
|
|
|
+ )
|
|
|
+ # The upload should fail when check_data_column() return False
|
|
|
+ reasons = dn._upload(wrong_format_parquet_path, upload_checker=check_data_column)
|
|
|
+ assert bool(reasons) is False
|
|
|
+ assert (
|
|
|
+ str(list(reasons._reasons[dn.id])[0])
|
|
|
+ == f'The uploaded file wrong_format_df.parquet has invalid data for data node "{dn.id}"'
|
|
|
+ )
|
|
|
+
|
|
|
+ assert_frame_equal(dn.read(), old_data) # The content of the dn should not change when upload fails
|
|
|
+ assert dn.last_edit_date == old_last_edit_date # The last edit date should not change when upload fails
|
|
|
+ assert dn.path == old_parquet_path # The path of the dn should not change
|
|
|
+
|
|
|
+ # The upload should succeed when check_data_column() return True
|
|
|
+ assert dn._upload(parquet_file_path, upload_checker=check_data_column)
|
|
|
+
|
|
|
+ def test_upload_with_upload_check_numpy(self, tmpdir_factory):
|
|
|
+ old_parquet_path = tmpdir_factory.mktemp("data").join("df.parquet").strpath
|
|
|
+ old_data = np.array([[1, 2, 3], [4, 5, 6]])
|
|
|
+
|
|
|
+ new_parquet_path = tmpdir_factory.mktemp("data").join("new_upload_data.parquet").strpath
|
|
|
+ new_data = np.array([[1, 2, 3], [4, 5, 6]])
|
|
|
+ pd.DataFrame(new_data, columns=["a", "b", "c"]).to_parquet(new_parquet_path, index=False)
|
|
|
+
|
|
|
+ dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": old_parquet_path, "exposed_type": "numpy"})
|
|
|
+ dn.write(old_data)
|
|
|
+ old_last_edit_date = dn.last_edit_date
|
|
|
+
|
|
|
+ def check_data_is_positive(upload_path, upload_data):
|
|
|
+ return upload_path.endswith(".parquet") and np.all(upload_data > 0)
|
|
|
+
|
|
|
+ not_exists_parquet_path = tmpdir_factory.mktemp("data").join("not_exists.parquet").strpath
|
|
|
+ reasons = dn._upload(not_exists_parquet_path, upload_checker=check_data_is_positive)
|
|
|
+ assert bool(reasons) is False
|
|
|
+ assert (
|
|
|
+ str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.parquet can not be read,"
|
|
|
+ f' therefore is not a valid data file for data node "{dn.id}"'
|
|
|
+ )
|
|
|
+
|
|
|
+ not_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_parquet").strpath
|
|
|
+ pd.DataFrame(old_data, columns=["a", "b", "c"]).to_parquet(not_parquet_path, index=False)
|
|
|
+ # The upload should fail when the file is not a parquet
|
|
|
+ reasons = dn._upload(not_parquet_path, upload_checker=check_data_is_positive)
|
|
|
+ assert (
|
|
|
+ str(list(reasons._reasons[dn.id])[0])
|
|
|
+ == f'The uploaded file wrong_format_df.not_parquet has invalid data for data node "{dn.id}"'
|
|
|
+ )
|
|
|
+
|
|
|
+ wrong_format_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.parquet").strpath
|
|
|
+ pd.DataFrame(np.array([[-1, 2, 3], [-4, -5, -6]]), columns=["a", "b", "c"]).to_parquet(
|
|
|
+ wrong_format_parquet_path, index=False
|
|
|
+ )
|
|
|
+ # The upload should fail when check_data_is_positive() return False
|
|
|
+ reasons = dn._upload(wrong_format_parquet_path, upload_checker=check_data_is_positive)
|
|
|
+ assert (
|
|
|
+ str(list(reasons._reasons[dn.id])[0])
|
|
|
+ == f'The uploaded file wrong_format_df.parquet has invalid data for data node "{dn.id}"'
|
|
|
+ )
|
|
|
+
|
|
|
+ np.array_equal(dn.read(), old_data) # The content of the dn should not change when upload fails
|
|
|
+ assert dn.last_edit_date == old_last_edit_date # The last edit date should not change when upload fails
|
|
|
+ assert dn.path == old_parquet_path # The path of the dn should not change
|
|
|
+
|
|
|
+ # The upload should succeed when check_data_is_positive() return True
|
|
|
+ assert dn._upload(new_parquet_path, upload_checker=check_data_is_positive)
|