|
@@ -267,18 +267,35 @@ class TestParquetDataNode:
|
|
def check_data_column(upload_path, upload_data):
|
|
def check_data_column(upload_path, upload_data):
|
|
return upload_path.endswith(".parquet") and upload_data.columns.tolist() == ["a", "b", "c"]
|
|
return upload_path.endswith(".parquet") and upload_data.columns.tolist() == ["a", "b", "c"]
|
|
|
|
|
|
- wrong_format_not_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_parquet").strpath
|
|
|
|
- old_data.to_parquet(wrong_format_not_parquet_path, index=False)
|
|
|
|
- wrong_format_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.parquet").strpath
|
|
|
|
- pd.DataFrame([{"a": 1, "b": 2, "d": 3}, {"a": 4, "b": 5, "d": 6}]).to_parquet(
|
|
|
|
- wrong_format_parquet_path, index=False
|
|
|
|
|
|
+ not_exists_parquet_path = tmpdir_factory.mktemp("data").join("not_exists.parquet").strpath
|
|
|
|
+ reasons = dn._upload(not_exists_parquet_path, upload_checker=check_data_column)
|
|
|
|
+ assert bool(reasons) is False
|
|
|
|
+ assert (
|
|
|
|
+ str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.parquet can not be read,"
|
|
|
|
+ f' therefore is not a valid data file for data node "{dn.id}"'
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+ not_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_parquet").strpath
|
|
|
|
+ old_data.to_parquet(not_parquet_path, index=False)
|
|
# The upload should fail when the file is not a parquet
|
|
# The upload should fail when the file is not a parquet
|
|
- assert not dn._upload(wrong_format_not_parquet_path, upload_checker=check_data_column)
|
|
|
|
|
|
+ reasons = dn._upload(not_parquet_path, upload_checker=check_data_column)
|
|
|
|
+ assert bool(reasons) is False
|
|
|
|
+ assert (
|
|
|
|
+ str(list(reasons._reasons[dn.id])[0])
|
|
|
|
+ == f'The uploaded file wrong_format_df.not_parquet has invalid data for data node "{dn.id}"'
|
|
|
|
+ )
|
|
|
|
|
|
|
|
+ wrong_format_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.parquet").strpath
|
|
|
|
+ pd.DataFrame([{"a": 1, "b": 2, "d": 3}, {"a": 4, "b": 5, "d": 6}]).to_parquet(
|
|
|
|
+ wrong_format_parquet_path, index=False
|
|
|
|
+ )
|
|
# The upload should fail when check_data_column() return False
|
|
# The upload should fail when check_data_column() return False
|
|
- assert not dn._upload(wrong_format_parquet_path, upload_checker=check_data_column)
|
|
|
|
|
|
+ reasons = dn._upload(wrong_format_parquet_path, upload_checker=check_data_column)
|
|
|
|
+ assert bool(reasons) is False
|
|
|
|
+ assert (
|
|
|
|
+ str(list(reasons._reasons[dn.id])[0])
|
|
|
|
+ == f'The uploaded file wrong_format_df.parquet has invalid data for data node "{dn.id}"'
|
|
|
|
+ )
|
|
|
|
|
|
assert_frame_equal(dn.read(), old_data) # The content of the dn should not change when upload fails
|
|
assert_frame_equal(dn.read(), old_data) # The content of the dn should not change when upload fails
|
|
assert dn.last_edit_date == old_last_edit_date # The last edit date should not change when upload fails
|
|
assert dn.last_edit_date == old_last_edit_date # The last edit date should not change when upload fails
|
|
@@ -302,18 +319,33 @@ class TestParquetDataNode:
|
|
def check_data_is_positive(upload_path, upload_data):
|
|
def check_data_is_positive(upload_path, upload_data):
|
|
return upload_path.endswith(".parquet") and np.all(upload_data > 0)
|
|
return upload_path.endswith(".parquet") and np.all(upload_data > 0)
|
|
|
|
|
|
- wrong_format_not_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_parquet").strpath
|
|
|
|
- pd.DataFrame(old_data, columns=["a", "b", "c"]).to_parquet(wrong_format_not_parquet_path, index=False)
|
|
|
|
- wrong_format_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.parquet").strpath
|
|
|
|
- pd.DataFrame(np.array([[-1, 2, 3], [-4, -5, -6]]), columns=["a", "b", "c"]).to_parquet(
|
|
|
|
- wrong_format_parquet_path, index=False
|
|
|
|
|
|
+ not_exists_parquet_path = tmpdir_factory.mktemp("data").join("not_exists.parquet").strpath
|
|
|
|
+ reasons = dn._upload(not_exists_parquet_path, upload_checker=check_data_is_positive)
|
|
|
|
+ assert bool(reasons) is False
|
|
|
|
+ assert (
|
|
|
|
+ str(list(reasons._reasons[dn.id])[0]) == "The uploaded file not_exists.parquet can not be read,"
|
|
|
|
+ f' therefore is not a valid data file for data node "{dn.id}"'
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+ not_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.not_parquet").strpath
|
|
|
|
+ pd.DataFrame(old_data, columns=["a", "b", "c"]).to_parquet(not_parquet_path, index=False)
|
|
# The upload should fail when the file is not a parquet
|
|
# The upload should fail when the file is not a parquet
|
|
- assert not dn._upload(wrong_format_not_parquet_path, upload_checker=check_data_is_positive)
|
|
|
|
|
|
+ reasons = dn._upload(not_parquet_path, upload_checker=check_data_is_positive)
|
|
|
|
+ assert (
|
|
|
|
+ str(list(reasons._reasons[dn.id])[0])
|
|
|
|
+ == f'The uploaded file wrong_format_df.not_parquet has invalid data for data node "{dn.id}"'
|
|
|
|
+ )
|
|
|
|
|
|
|
|
+ wrong_format_parquet_path = tmpdir_factory.mktemp("data").join("wrong_format_df.parquet").strpath
|
|
|
|
+ pd.DataFrame(np.array([[-1, 2, 3], [-4, -5, -6]]), columns=["a", "b", "c"]).to_parquet(
|
|
|
|
+ wrong_format_parquet_path, index=False
|
|
|
|
+ )
|
|
# The upload should fail when check_data_is_positive() return False
|
|
# The upload should fail when check_data_is_positive() return False
|
|
- assert not dn._upload(wrong_format_parquet_path, upload_checker=check_data_is_positive)
|
|
|
|
|
|
+ reasons = dn._upload(wrong_format_parquet_path, upload_checker=check_data_is_positive)
|
|
|
|
+ assert (
|
|
|
|
+ str(list(reasons._reasons[dn.id])[0])
|
|
|
|
+ == f'The uploaded file wrong_format_df.parquet has invalid data for data node "{dn.id}"'
|
|
|
|
+ )
|
|
|
|
|
|
np.array_equal(dn.read(), old_data) # The content of the dn should not change when upload fails
|
|
np.array_equal(dn.read(), old_data) # The content of the dn should not change when upload fails
|
|
assert dn.last_edit_date == old_last_edit_date # The last edit date should not change when upload fails
|
|
assert dn.last_edit_date == old_last_edit_date # The last edit date should not change when upload fails
|