test_parquet_data_node.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import pathlib
  13. from datetime import datetime
  14. from importlib import util
  15. from time import sleep
  16. import numpy as np
  17. import pandas as pd
  18. import pytest
  19. from pandas.testing import assert_frame_equal
  20. from taipy.config.common.scope import Scope
  21. from taipy.config.config import Config
  22. from taipy.config.exceptions.exceptions import InvalidConfigurationId
  23. from taipy.core.data._data_manager import _DataManager
  24. from taipy.core.data.data_node_id import DataNodeId
  25. from taipy.core.data.operator import JoinOperator, Operator
  26. from taipy.core.data.parquet import ParquetDataNode
  27. from taipy.core.exceptions.exceptions import (
  28. InvalidExposedType,
  29. NoData,
  30. UnknownCompressionAlgorithm,
  31. UnknownParquetEngine,
  32. )
  33. @pytest.fixture(scope="function", autouse=True)
  34. def cleanup():
  35. yield
  36. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
  37. if os.path.isfile(path):
  38. os.remove(path)
  39. class MyCustomObject:
  40. def __init__(self, id, integer, text):
  41. self.id = id
  42. self.integer = integer
  43. self.text = text
  44. class MyOtherCustomObject:
  45. def __init__(self, id, sentence):
  46. self.id = id
  47. self.sentence = sentence
  48. def create_custom_class(**kwargs):
  49. return MyOtherCustomObject(id=kwargs["id"], sentence=kwargs["text"])
  50. class TestParquetDataNode:
  51. __engine = ["pyarrow"]
  52. if util.find_spec("fastparquet"):
  53. __engine.append("fastparquet")
  54. def test_create(self):
  55. path = "data/node/path"
  56. compression = "snappy"
  57. dn = ParquetDataNode(
  58. "foo_bar", Scope.SCENARIO, properties={"path": path, "compression": compression, "name": "super name"}
  59. )
  60. assert isinstance(dn, ParquetDataNode)
  61. assert dn.storage_type() == "parquet"
  62. assert dn.config_id == "foo_bar"
  63. assert dn.name == "super name"
  64. assert dn.scope == Scope.SCENARIO
  65. assert dn.id is not None
  66. assert dn.owner_id is None
  67. assert dn.last_edit_date is None
  68. assert dn.job_ids == []
  69. assert not dn.is_ready_for_reading
  70. assert dn.path == path
  71. assert dn.exposed_type == "pandas"
  72. assert dn.compression == "snappy"
  73. assert dn.engine == "pyarrow"
  74. with pytest.raises(InvalidConfigurationId):
  75. dn = ParquetDataNode("foo bar", Scope.SCENARIO, properties={"path": path, "name": "super name"})
  76. def test_get_user_properties(self, parquet_file_path):
  77. dn_1 = ParquetDataNode("dn_1", Scope.SCENARIO, properties={"path": parquet_file_path})
  78. assert dn_1._get_user_properties() == {}
  79. dn_2 = ParquetDataNode(
  80. "dn_2",
  81. Scope.SCENARIO,
  82. properties={
  83. "exposed_type": "numpy",
  84. "default_data": "foo",
  85. "default_path": parquet_file_path,
  86. "engine": "pyarrow",
  87. "compression": "snappy",
  88. "read_kwargs": {"columns": ["a", "b"]},
  89. "write_kwargs": {"index": False},
  90. "foo": "bar",
  91. },
  92. )
  93. # exposed_type, default_data, default_path, path, engine, compression, read_kwargs, write_kwargs
  94. # are filtered out
  95. assert dn_2._get_user_properties() == {"foo": "bar"}
  96. def test_new_parquet_data_node_with_existing_file_is_ready_for_reading(self, parquet_file_path):
  97. not_ready_dn_cfg = Config.configure_data_node(
  98. "not_ready_data_node_config_id", "parquet", path="NOT_EXISTING.parquet"
  99. )
  100. not_ready_dn = _DataManager._bulk_get_or_create([not_ready_dn_cfg])[not_ready_dn_cfg]
  101. assert not not_ready_dn.is_ready_for_reading
  102. ready_dn_cfg = Config.configure_data_node("ready_data_node_config_id", "parquet", path=parquet_file_path)
  103. ready_dn = _DataManager._bulk_get_or_create([ready_dn_cfg])[ready_dn_cfg]
  104. assert ready_dn.is_ready_for_reading
  105. @pytest.mark.parametrize(
  106. ["properties", "exists"],
  107. [
  108. ({}, False),
  109. ({"default_data": {"a": ["foo", "bar"]}}, True),
  110. ],
  111. )
  112. def test_create_with_default_data(self, properties, exists):
  113. dn = ParquetDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"), properties=properties)
  114. assert os.path.exists(dn.path) is exists
  115. @pytest.mark.parametrize("engine", __engine)
  116. def test_modin_deprecated_in_favor_of_pandas(self, engine, parquet_file_path):
  117. # Create ParquetDataNode with modin exposed_type
  118. props = {"path": parquet_file_path, "exposed_type": "modin", "engine": engine}
  119. parquet_data_node_as_modin = ParquetDataNode("bar", Scope.SCENARIO, properties=props)
  120. assert parquet_data_node_as_modin.properties["exposed_type"] == "pandas"
  121. data_modin = parquet_data_node_as_modin.read()
  122. assert isinstance(data_modin, pd.DataFrame)
  123. @pytest.mark.parametrize("engine", __engine)
  124. def test_read_file(self, engine, parquet_file_path):
  125. not_existing_parquet = ParquetDataNode(
  126. "foo", Scope.SCENARIO, properties={"path": "nonexistent.parquet", "engine": engine}
  127. )
  128. with pytest.raises(NoData):
  129. assert not_existing_parquet.read() is None
  130. not_existing_parquet.read_or_raise()
  131. df = pd.read_parquet(parquet_file_path)
  132. # Create ParquetDataNode without exposed_type (Default is pandas.DataFrame)
  133. parquet_data_node_as_pandas = ParquetDataNode(
  134. "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "engine": engine}
  135. )
  136. data_pandas = parquet_data_node_as_pandas.read()
  137. assert isinstance(data_pandas, pd.DataFrame)
  138. assert len(data_pandas) == 2
  139. assert data_pandas.equals(df)
  140. assert np.array_equal(data_pandas.to_numpy(), df.to_numpy())
  141. # Create ParquetDataNode with numpy exposed_type
  142. parquet_data_node_as_numpy = ParquetDataNode(
  143. "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy", "engine": engine}
  144. )
  145. data_numpy = parquet_data_node_as_numpy.read()
  146. assert isinstance(data_numpy, np.ndarray)
  147. assert len(data_numpy) == 2
  148. assert np.array_equal(data_numpy, df.to_numpy())
  149. @pytest.mark.parametrize("engine", __engine)
  150. def test_read_folder(self, engine):
  151. parquet_folder_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
  152. df = pd.read_parquet(parquet_folder_path)
  153. parquet_data_node_as_pandas = ParquetDataNode(
  154. "bar", Scope.SCENARIO, properties={"path": parquet_folder_path, "engine": engine}
  155. )
  156. data_pandas = parquet_data_node_as_pandas.read()
  157. assert isinstance(data_pandas, pd.DataFrame)
  158. assert len(data_pandas) == 5
  159. assert data_pandas.equals(df)
  160. assert np.array_equal(data_pandas.to_numpy(), df.to_numpy())
  161. def test_set_path(self):
  162. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": "foo.parquet"})
  163. assert dn.path == "foo.parquet"
  164. dn.path = "bar.parquet"
  165. assert dn.path == "bar.parquet"
  166. @pytest.mark.parametrize("engine", __engine)
  167. def test_read_write_after_modify_path(self, engine):
  168. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  169. new_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
  170. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": engine})
  171. read_data = dn.read()
  172. assert read_data is not None
  173. dn.path = new_path
  174. with pytest.raises(FileNotFoundError):
  175. dn.read()
  176. dn.write(read_data)
  177. assert dn.read().equals(read_data)
  178. def test_read_custom_exposed_type(self):
  179. example_parquet_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  180. dn = ParquetDataNode(
  181. "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": MyCustomObject}
  182. )
  183. assert all(isinstance(obj, MyCustomObject) for obj in dn.read())
  184. dn = ParquetDataNode(
  185. "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": create_custom_class}
  186. )
  187. assert all(isinstance(obj, MyOtherCustomObject) for obj in dn.read())
  188. def test_raise_error_unknown_parquet_engine(self):
  189. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  190. with pytest.raises(UnknownParquetEngine):
  191. ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": "foo"})
  192. def test_raise_error_unknown_compression_algorithm(self):
  193. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  194. with pytest.raises(UnknownCompressionAlgorithm):
  195. ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "compression": "foo"})
  196. def test_raise_error_invalid_exposed_type(self):
  197. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  198. with pytest.raises(InvalidExposedType):
  199. ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "foo"})
  200. def test_read_empty_data(self, tmpdir_factory):
  201. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  202. empty_df = pd.DataFrame([])
  203. empty_df.to_parquet(temp_file_path)
  204. # Pandas
  205. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
  206. assert dn.read().equals(empty_df)
  207. # Numpy
  208. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "numpy"})
  209. assert np.array_equal(dn.read(), empty_df.to_numpy())
  210. # Custom
  211. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": MyCustomObject})
  212. assert dn.read() == []
  213. def test_get_system_file_modified_date_instead_of_last_edit_date(self, tmpdir_factory):
  214. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  215. pd.DataFrame([]).to_parquet(temp_file_path)
  216. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
  217. dn.write(pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}))
  218. previous_edit_date = dn.last_edit_date
  219. sleep(0.1)
  220. pd.DataFrame(pd.DataFrame(data={"col1": [5, 6], "col2": [7, 8]})).to_parquet(temp_file_path)
  221. new_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
  222. assert previous_edit_date < dn.last_edit_date
  223. assert new_edit_date == dn.last_edit_date
  224. sleep(0.1)
  225. dn.write(pd.DataFrame(data={"col1": [9, 10], "col2": [10, 12]}))
  226. assert new_edit_date < dn.last_edit_date
  227. os.unlink(temp_file_path)
  228. def test_get_system_folder_modified_date_instead_of_last_edit_date(self, tmpdir_factory):
  229. temp_folder_path = tmpdir_factory.mktemp("data").strpath
  230. temp_file_path = os.path.join(temp_folder_path, "temp.parquet")
  231. pd.DataFrame([]).to_parquet(temp_file_path)
  232. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_folder_path})
  233. initial_edit_date = dn.last_edit_date
  234. # Sleep so that the file can be created successfully on Ubuntu
  235. sleep(0.1)
  236. pd.DataFrame(pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})).to_parquet(temp_file_path)
  237. first_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
  238. assert dn.last_edit_date > initial_edit_date
  239. assert dn.last_edit_date == first_edit_date
  240. sleep(0.1)
  241. pd.DataFrame(pd.DataFrame(data={"col1": [5, 6], "col2": [7, 8]})).to_parquet(temp_file_path)
  242. second_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
  243. assert dn.last_edit_date > first_edit_date
  244. assert dn.last_edit_date == second_edit_date
  245. os.unlink(temp_file_path)
  246. @pytest.mark.skipif(not util.find_spec("fastparquet"), reason="Append parquet requires fastparquet to be installed")
  247. @pytest.mark.parametrize(
  248. "content",
  249. [
  250. ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  251. (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
  252. ],
  253. )
  254. def test_append_pandas(self, parquet_file_path, default_data_frame, content):
  255. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path})
  256. assert_frame_equal(dn.read(), default_data_frame)
  257. dn.append(content)
  258. assert_frame_equal(
  259. dn.read(),
  260. pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
  261. )
  262. @pytest.mark.parametrize(
  263. "data",
  264. [
  265. [{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}],
  266. pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  267. ],
  268. )
  269. def test_write_to_disk(self, tmpdir_factory, data):
  270. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  271. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path})
  272. dn.write(data)
  273. assert pathlib.Path(temp_file_path).exists()
  274. assert isinstance(dn.read(), pd.DataFrame)
  275. def test_filter_pandas_exposed_type(self, parquet_file_path):
  276. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "pandas"})
  277. dn.write(
  278. [
  279. {"foo": 1, "bar": 1},
  280. {"foo": 1, "bar": 2},
  281. {"foo": 1},
  282. {"foo": 2, "bar": 2},
  283. {"bar": 2},
  284. ]
  285. )
  286. # Test datanode indexing and slicing
  287. assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None]))
  288. assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2]))
  289. assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
  290. # Test filter data
  291. filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
  292. filtered_by_indexing = dn[dn["foo"] == 1]
  293. expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
  294. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  295. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  296. filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
  297. filtered_by_indexing = dn[dn["foo"] != 1]
  298. expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
  299. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  300. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  301. filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
  302. filtered_by_indexing = dn[dn["bar"] == 2]
  303. expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
  304. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  305. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  306. filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
  307. filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
  308. expected_data = pd.DataFrame(
  309. [
  310. {"foo": 1.0, "bar": 1.0},
  311. {"foo": 1.0, "bar": 2.0},
  312. {"foo": 2.0, "bar": 2.0},
  313. {"bar": 2.0},
  314. ]
  315. )
  316. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  317. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  318. def test_filter_numpy_exposed_type(self, parquet_file_path):
  319. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy"})
  320. dn.write(
  321. [
  322. [1, 1],
  323. [1, 2],
  324. [1, 3],
  325. [2, 1],
  326. [2, 2],
  327. [2, 3],
  328. ]
  329. )
  330. # Test datanode indexing and slicing
  331. assert np.array_equal(dn[0], np.array([1, 1]))
  332. assert np.array_equal(dn[1], np.array([1, 2]))
  333. assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
  334. assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
  335. assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
  336. # Test filter data
  337. assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
  338. assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
  339. assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
  340. assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
  341. assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
  342. assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
  343. assert np.array_equal(
  344. dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR),
  345. np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
  346. )
  347. assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))
  348. @pytest.mark.parametrize("engine", __engine)
  349. def test_pandas_parquet_config_kwargs(self, engine, tmpdir_factory):
  350. read_kwargs = {"filters": [("integer", "<", 10)], "columns": ["integer"]}
  351. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  352. dn = ParquetDataNode(
  353. "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "read_kwargs": read_kwargs}
  354. )
  355. df = pd.read_csv(os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv"))
  356. dn.write(df)
  357. assert set(pd.read_parquet(temp_file_path).columns) == {"id", "integer", "text"}
  358. assert set(dn.read().columns) == set(read_kwargs["columns"])
  359. # !!! filter doesn't work with `fastparquet` without partition_cols
  360. if engine == "pyarrow":
  361. assert len(dn.read()) != len(df)
  362. assert len(dn.read()) == 2
  363. @pytest.mark.parametrize("engine", __engine)
  364. def test_kwarg_precedence(self, engine, tmpdir_factory, default_data_frame):
  365. # Precedence:
  366. # 1. Class read/write methods
  367. # 2. Defined in read_kwargs and write_kwargs, in properties
  368. # 3. Defined top-level in properties
  369. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  370. temp_file_2_path = str(tmpdir_factory.mktemp("data").join("temp_2.parquet"))
  371. df = default_data_frame.copy(deep=True)
  372. # Write
  373. # 3
  374. comp3 = "snappy"
  375. dn = ParquetDataNode(
  376. "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "compression": comp3}
  377. )
  378. dn.write(df)
  379. df.to_parquet(path=temp_file_2_path, compression=comp3, engine=engine)
  380. with open(temp_file_2_path, "rb") as tf:
  381. with pathlib.Path(temp_file_path).open("rb") as f:
  382. assert f.read() == tf.read()
  383. # 3 and 2
  384. comp2 = "gzip"
  385. dn = ParquetDataNode(
  386. "foo",
  387. Scope.SCENARIO,
  388. properties={
  389. "path": temp_file_path,
  390. "engine": engine,
  391. "compression": comp3,
  392. "write_kwargs": {"compression": comp2},
  393. },
  394. )
  395. dn.write(df)
  396. df.to_parquet(path=temp_file_2_path, compression=comp2, engine=engine)
  397. with open(temp_file_2_path, "rb") as tf:
  398. with pathlib.Path(temp_file_path).open("rb") as f:
  399. assert f.read() == tf.read()
  400. # 3, 2 and 1
  401. comp1 = "brotli"
  402. dn = ParquetDataNode(
  403. "foo",
  404. Scope.SCENARIO,
  405. properties={
  406. "path": temp_file_path,
  407. "engine": engine,
  408. "compression": comp3,
  409. "write_kwargs": {"compression": comp2},
  410. },
  411. )
  412. dn.write_with_kwargs(df, compression=comp1)
  413. df.to_parquet(path=temp_file_2_path, compression=comp1, engine=engine)
  414. with open(temp_file_2_path, "rb") as tf:
  415. with pathlib.Path(temp_file_path).open("rb") as f:
  416. assert f.read() == tf.read()
  417. # Read
  418. df.to_parquet(temp_file_path, engine=engine)
  419. # 2
  420. cols2 = ["a", "b"]
  421. dn = ParquetDataNode(
  422. "foo",
  423. Scope.SCENARIO,
  424. properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
  425. )
  426. assert set(dn.read().columns) == set(cols2)
  427. # 1
  428. cols1 = ["a"]
  429. dn = ParquetDataNode(
  430. "foo",
  431. Scope.SCENARIO,
  432. properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
  433. )
  434. assert set(dn.read_with_kwargs(columns=cols1).columns) == set(cols1)
  435. def test_partition_cols(self, tmpdir_factory, default_data_frame: pd.DataFrame):
  436. temp_dir_path = str(tmpdir_factory.mktemp("data").join("temp_dir"))
  437. write_kwargs = {"partition_cols": ["a", "b"]}
  438. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_dir_path, "write_kwargs": write_kwargs}) # type: ignore
  439. dn.write(default_data_frame)
  440. assert pathlib.Path(temp_dir_path).is_dir()
  441. # dtypes change during round-trip with partition_cols
  442. pd.testing.assert_frame_equal(
  443. dn.read().sort_index(axis=1),
  444. default_data_frame.sort_index(axis=1),
  445. check_dtype=False,
  446. check_categorical=False,
  447. )
  448. def test_read_with_kwargs_never_written(self):
  449. path = "data/node/path"
  450. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path})
  451. assert dn.read_with_kwargs() is None