test_parquet_data_node.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. # Copyright 2023 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import pathlib
  13. from datetime import datetime
  14. from importlib import util
  15. from time import sleep
  16. import modin.pandas as modin_pd
  17. import numpy as np
  18. import pandas as pd
  19. import pytest
  20. from modin.pandas.test.utils import df_equals
  21. from pandas.testing import assert_frame_equal
  22. from src.taipy.core.data._data_manager import _DataManager
  23. from src.taipy.core.data.data_node_id import DataNodeId
  24. from src.taipy.core.data.operator import JoinOperator, Operator
  25. from src.taipy.core.data.parquet import ParquetDataNode
  26. from src.taipy.core.exceptions.exceptions import (
  27. InvalidExposedType,
  28. NoData,
  29. UnknownCompressionAlgorithm,
  30. UnknownParquetEngine,
  31. )
  32. from taipy.config.common.scope import Scope
  33. from taipy.config.config import Config
  34. from taipy.config.exceptions.exceptions import InvalidConfigurationId
  35. @pytest.fixture(scope="function", autouse=True)
  36. def cleanup():
  37. yield
  38. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
  39. if os.path.isfile(path):
  40. os.remove(path)
  41. class MyCustomObject:
  42. def __init__(self, id, integer, text):
  43. self.id = id
  44. self.integer = integer
  45. self.text = text
  46. class MyOtherCustomObject:
  47. def __init__(self, id, sentence):
  48. self.id = id
  49. self.sentence = sentence
  50. def create_custom_class(**kwargs):
  51. return MyOtherCustomObject(id=kwargs["id"], sentence=kwargs["text"])
  52. class TestParquetDataNode:
  53. __engine = ["pyarrow"]
  54. if util.find_spec("fastparquet"):
  55. __engine.append("fastparquet")
  56. def test_create(self):
  57. path = "data/node/path"
  58. compression = "snappy"
  59. dn = ParquetDataNode(
  60. "foo_bar", Scope.SCENARIO, properties={"path": path, "compression": compression, "name": "super name"}
  61. )
  62. assert isinstance(dn, ParquetDataNode)
  63. assert dn.storage_type() == "parquet"
  64. assert dn.config_id == "foo_bar"
  65. assert dn.name == "super name"
  66. assert dn.scope == Scope.SCENARIO
  67. assert dn.id is not None
  68. assert dn.owner_id is None
  69. assert dn.last_edit_date is None
  70. assert dn.job_ids == []
  71. assert not dn.is_ready_for_reading
  72. assert dn.path == path
  73. assert dn.exposed_type == "pandas"
  74. assert dn.compression == "snappy"
  75. assert dn.engine == "pyarrow"
  76. with pytest.raises(InvalidConfigurationId):
  77. dn = ParquetDataNode("foo bar", Scope.SCENARIO, properties={"path": path, "name": "super name"})
  78. def test_get_user_properties(self, parquet_file_path):
  79. dn_1 = ParquetDataNode("dn_1", Scope.SCENARIO, properties={"path": parquet_file_path})
  80. assert dn_1._get_user_properties() == {}
  81. dn_2 = ParquetDataNode(
  82. "dn_2",
  83. Scope.SCENARIO,
  84. properties={
  85. "exposed_type": "numpy",
  86. "default_data": "foo",
  87. "default_path": parquet_file_path,
  88. "engine": "pyarrow",
  89. "compression": "snappy",
  90. "read_kwargs": {"columns": ["a", "b"]},
  91. "write_kwargs": {"index": False},
  92. "foo": "bar",
  93. },
  94. )
  95. # exposed_type, default_data, default_path, path, engine, compression, read_kwargs, write_kwargs
  96. # are filtered out
  97. assert dn_2._get_user_properties() == {"foo": "bar"}
  98. def test_new_parquet_data_node_with_existing_file_is_ready_for_reading(self, parquet_file_path):
  99. not_ready_dn_cfg = Config.configure_data_node(
  100. "not_ready_data_node_config_id", "parquet", path="NOT_EXISTING.parquet"
  101. )
  102. not_ready_dn = _DataManager._bulk_get_or_create([not_ready_dn_cfg])[not_ready_dn_cfg]
  103. assert not not_ready_dn.is_ready_for_reading
  104. ready_dn_cfg = Config.configure_data_node("ready_data_node_config_id", "parquet", path=parquet_file_path)
  105. ready_dn = _DataManager._bulk_get_or_create([ready_dn_cfg])[ready_dn_cfg]
  106. assert ready_dn.is_ready_for_reading
  107. @pytest.mark.parametrize(
  108. ["properties", "exists"],
  109. [
  110. ({}, False),
  111. ({"default_data": {"a": ["foo", "bar"]}}, True),
  112. ],
  113. )
  114. def test_create_with_default_data(self, properties, exists):
  115. dn = ParquetDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"), properties=properties)
  116. assert os.path.exists(dn.path) is exists
  117. @pytest.mark.parametrize("engine", __engine)
  118. def test_read_file(self, engine, parquet_file_path):
  119. not_existing_parquet = ParquetDataNode(
  120. "foo", Scope.SCENARIO, properties={"path": "nonexistent.parquet", "engine": engine}
  121. )
  122. with pytest.raises(NoData):
  123. assert not_existing_parquet.read() is None
  124. not_existing_parquet.read_or_raise()
  125. df = pd.read_parquet(parquet_file_path)
  126. # Create ParquetDataNode without exposed_type (Default is pandas.DataFrame)
  127. parquet_data_node_as_pandas = ParquetDataNode(
  128. "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "engine": engine}
  129. )
  130. data_pandas = parquet_data_node_as_pandas.read()
  131. assert isinstance(data_pandas, pd.DataFrame)
  132. assert len(data_pandas) == 2
  133. assert data_pandas.equals(df)
  134. assert np.array_equal(data_pandas.to_numpy(), df.to_numpy())
  135. # Create ParquetDataNode with modin exposed_type
  136. parquet_data_node_as_modin = ParquetDataNode(
  137. "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "modin", "engine": engine}
  138. )
  139. data_modin = parquet_data_node_as_modin.read()
  140. assert isinstance(data_modin, modin_pd.DataFrame)
  141. assert len(data_modin) == 2
  142. assert data_modin.equals(df)
  143. assert np.array_equal(data_modin.to_numpy(), df.to_numpy())
  144. # Create ParquetDataNode with numpy exposed_type
  145. parquet_data_node_as_numpy = ParquetDataNode(
  146. "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy", "engine": engine}
  147. )
  148. data_numpy = parquet_data_node_as_numpy.read()
  149. assert isinstance(data_numpy, np.ndarray)
  150. assert len(data_numpy) == 2
  151. assert np.array_equal(data_numpy, df.to_numpy())
  152. @pytest.mark.parametrize("engine", __engine)
  153. def test_read_folder(self, engine):
  154. parquet_folder_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
  155. df = pd.read_parquet(parquet_folder_path)
  156. parquet_data_node_as_pandas = ParquetDataNode(
  157. "bar", Scope.SCENARIO, properties={"path": parquet_folder_path, "engine": engine}
  158. )
  159. data_pandas = parquet_data_node_as_pandas.read()
  160. assert isinstance(data_pandas, pd.DataFrame)
  161. assert len(data_pandas) == 5
  162. assert data_pandas.equals(df)
  163. assert np.array_equal(data_pandas.to_numpy(), df.to_numpy())
  164. def test_set_path(self):
  165. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": "foo.parquet"})
  166. assert dn.path == "foo.parquet"
  167. dn.path = "bar.parquet"
  168. assert dn.path == "bar.parquet"
  169. @pytest.mark.parametrize("engine", __engine)
  170. def test_read_write_after_modify_path(self, engine):
  171. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  172. new_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
  173. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": engine})
  174. read_data = dn.read()
  175. assert read_data is not None
  176. dn.path = new_path
  177. with pytest.raises(FileNotFoundError):
  178. dn.read()
  179. dn.write(read_data)
  180. assert dn.read().equals(read_data)
  181. def test_read_custom_exposed_type(self):
  182. example_parquet_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  183. dn = ParquetDataNode(
  184. "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": MyCustomObject}
  185. )
  186. assert all([isinstance(obj, MyCustomObject) for obj in dn.read()])
  187. dn = ParquetDataNode(
  188. "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": create_custom_class}
  189. )
  190. assert all([isinstance(obj, MyOtherCustomObject) for obj in dn.read()])
  191. def test_raise_error_unknown_parquet_engine(self):
  192. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  193. with pytest.raises(UnknownParquetEngine):
  194. ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": "foo"})
  195. def test_raise_error_unknown_compression_algorithm(self):
  196. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  197. with pytest.raises(UnknownCompressionAlgorithm):
  198. ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "compression": "foo"})
  199. def test_raise_error_invalid_exposed_type(self):
  200. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  201. with pytest.raises(InvalidExposedType):
  202. ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "foo"})
  203. def test_read_empty_data(self, tmpdir_factory):
  204. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  205. empty_df = pd.DataFrame([])
  206. empty_df.to_parquet(temp_file_path)
  207. # Pandas
  208. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
  209. assert dn.read().equals(empty_df)
  210. # Numpy
  211. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "numpy"})
  212. assert np.array_equal(dn.read(), empty_df.to_numpy())
  213. # Custom
  214. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": MyCustomObject})
  215. assert dn.read() == []
  216. def test_get_system_file_modified_date_instead_of_last_edit_date(self, tmpdir_factory):
  217. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  218. pd.DataFrame([]).to_parquet(temp_file_path)
  219. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
  220. dn.write(pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}))
  221. previous_edit_date = dn.last_edit_date
  222. sleep(0.1)
  223. pd.DataFrame(pd.DataFrame(data={"col1": [5, 6], "col2": [7, 8]})).to_parquet(temp_file_path)
  224. new_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
  225. assert previous_edit_date < dn.last_edit_date
  226. assert new_edit_date == dn.last_edit_date
  227. sleep(0.1)
  228. dn.write(pd.DataFrame(data={"col1": [9, 10], "col2": [10, 12]}))
  229. assert new_edit_date < dn.last_edit_date
  230. os.unlink(temp_file_path)
  231. def test_get_system_folder_modified_date_instead_of_last_edit_date(self, tmpdir_factory):
  232. temp_folder_path = tmpdir_factory.mktemp("data").strpath
  233. temp_file_path = os.path.join(temp_folder_path, "temp.parquet")
  234. pd.DataFrame([]).to_parquet(temp_file_path)
  235. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_folder_path})
  236. initial_edit_date = dn.last_edit_date
  237. # Sleep so that the file can be created successfully on Ubuntu
  238. sleep(0.1)
  239. pd.DataFrame(pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})).to_parquet(temp_file_path)
  240. first_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
  241. assert dn.last_edit_date > initial_edit_date
  242. assert dn.last_edit_date == first_edit_date
  243. sleep(0.1)
  244. pd.DataFrame(pd.DataFrame(data={"col1": [5, 6], "col2": [7, 8]})).to_parquet(temp_file_path)
  245. second_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
  246. assert dn.last_edit_date > first_edit_date
  247. assert dn.last_edit_date == second_edit_date
  248. os.unlink(temp_file_path)
  249. @pytest.mark.skipif(not util.find_spec("fastparquet"), reason="Append parquet requires fastparquet to be installed")
  250. @pytest.mark.parametrize(
  251. "content",
  252. [
  253. ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  254. (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
  255. ],
  256. )
  257. def test_append_pandas(self, parquet_file_path, default_data_frame, content):
  258. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path})
  259. assert_frame_equal(dn.read(), default_data_frame)
  260. dn.append(content)
  261. assert_frame_equal(
  262. dn.read(),
  263. pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
  264. )
  265. @pytest.mark.skipif(not util.find_spec("fastparquet"), reason="Append parquet requires fastparquet to be installed")
  266. @pytest.mark.parametrize(
  267. "content",
  268. [
  269. ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  270. (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
  271. ],
  272. )
  273. def test_append_modin(self, parquet_file_path, default_data_frame, content):
  274. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "modin"})
  275. df_equals(dn.read(), modin_pd.DataFrame(default_data_frame))
  276. dn.append(content)
  277. df_equals(
  278. dn.read(),
  279. modin_pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(
  280. drop=True
  281. ),
  282. )
  283. @pytest.mark.parametrize(
  284. "data",
  285. [
  286. [{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}],
  287. pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  288. modin_pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  289. ],
  290. )
  291. def test_write_to_disk(self, tmpdir_factory, data):
  292. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  293. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path})
  294. dn.write(data)
  295. assert pathlib.Path(temp_file_path).exists()
  296. assert isinstance(dn.read(), pd.DataFrame)
  297. def test_filter_pandas_exposed_type(self, parquet_file_path):
  298. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "pandas"})
  299. dn.write(
  300. [
  301. {"foo": 1, "bar": 1},
  302. {"foo": 1, "bar": 2},
  303. {"foo": 1},
  304. {"foo": 2, "bar": 2},
  305. {"bar": 2},
  306. ]
  307. )
  308. # Test datanode indexing and slicing
  309. assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None]))
  310. assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2]))
  311. assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
  312. # Test filter data
  313. filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
  314. filtered_by_indexing = dn[dn["foo"] == 1]
  315. expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
  316. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  317. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  318. filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
  319. filtered_by_indexing = dn[dn["foo"] != 1]
  320. expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
  321. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  322. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  323. filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
  324. filtered_by_indexing = dn[dn["bar"] == 2]
  325. expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
  326. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  327. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  328. filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
  329. filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
  330. expected_data = pd.DataFrame(
  331. [
  332. {"foo": 1.0, "bar": 1.0},
  333. {"foo": 1.0, "bar": 2.0},
  334. {"foo": 2.0, "bar": 2.0},
  335. {"bar": 2.0},
  336. ]
  337. )
  338. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  339. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  340. def test_filter_modin_exposed_type(self, parquet_file_path):
  341. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "modin"})
  342. dn.write(
  343. [
  344. {"foo": 1, "bar": 1},
  345. {"foo": 1, "bar": 2},
  346. {"foo": 1},
  347. {"foo": 2, "bar": 2},
  348. {"bar": 2},
  349. ]
  350. )
  351. # Test datanode indexing and slicing
  352. assert dn["foo"].equals(modin_pd.Series([1, 1, 1, 2, None]))
  353. assert dn["bar"].equals(modin_pd.Series([1, 2, None, 2, 2]))
  354. assert dn[:2].equals(modin_pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
  355. # Test filter data
  356. filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
  357. filtered_by_indexing = dn[dn["foo"] == 1]
  358. expected_data = modin_pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
  359. df_equals(filtered_by_filter_method.reset_index(drop=True), expected_data)
  360. df_equals(filtered_by_indexing.reset_index(drop=True), expected_data)
  361. filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
  362. filtered_by_indexing = dn[dn["foo"] != 1]
  363. expected_data = modin_pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
  364. df_equals(filtered_by_filter_method.reset_index(drop=True), expected_data)
  365. df_equals(filtered_by_indexing.reset_index(drop=True), expected_data)
  366. filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
  367. filtered_by_indexing = dn[dn["bar"] == 2]
  368. expected_data = modin_pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
  369. df_equals(filtered_by_filter_method.reset_index(drop=True), expected_data)
  370. df_equals(filtered_by_indexing.reset_index(drop=True), expected_data)
  371. filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
  372. filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
  373. expected_data = modin_pd.DataFrame(
  374. [
  375. {"foo": 1.0, "bar": 1.0},
  376. {"foo": 1.0, "bar": 2.0},
  377. {"foo": 2.0, "bar": 2.0},
  378. {"bar": 2.0},
  379. ]
  380. )
  381. df_equals(filtered_by_filter_method.reset_index(drop=True), expected_data)
  382. df_equals(filtered_by_indexing.reset_index(drop=True), expected_data)
  383. def test_filter_numpy_exposed_type(self, parquet_file_path):
  384. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy"})
  385. dn.write(
  386. [
  387. [1, 1],
  388. [1, 2],
  389. [1, 3],
  390. [2, 1],
  391. [2, 2],
  392. [2, 3],
  393. ]
  394. )
  395. # Test datanode indexing and slicing
  396. assert np.array_equal(dn[0], np.array([1, 1]))
  397. assert np.array_equal(dn[1], np.array([1, 2]))
  398. assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
  399. assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
  400. assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
  401. # Test filter data
  402. assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
  403. assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
  404. assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
  405. assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
  406. assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
  407. assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
  408. assert np.array_equal(
  409. dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR),
  410. np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
  411. )
  412. assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))
  413. @pytest.mark.parametrize("engine", __engine)
  414. def test_pandas_parquet_config_kwargs(self, engine, tmpdir_factory):
  415. read_kwargs = {"filters": [("integer", "<", 10)], "columns": ["integer"]}
  416. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  417. dn = ParquetDataNode(
  418. "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "read_kwargs": read_kwargs}
  419. )
  420. df = pd.read_csv(os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv"))
  421. dn.write(df)
  422. assert set(pd.read_parquet(temp_file_path).columns) == {"id", "integer", "text"}
  423. assert set(dn.read().columns) == set(read_kwargs["columns"])
  424. # !!! filter doesn't work with `fastparquet` without partition_cols
  425. if engine == "pyarrow":
  426. assert len(dn.read()) != len(df)
  427. assert len(dn.read()) == 2
  428. @pytest.mark.parametrize("engine", __engine)
  429. def test_kwarg_precedence(self, engine, tmpdir_factory, default_data_frame):
  430. # Precedence:
  431. # 1. Class read/write methods
  432. # 2. Defined in read_kwargs and write_kwargs, in properties
  433. # 3. Defined top-level in properties
  434. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  435. temp_file_2_path = str(tmpdir_factory.mktemp("data").join("temp_2.parquet"))
  436. df = default_data_frame.copy(deep=True)
  437. # Write
  438. # 3
  439. comp3 = "snappy"
  440. dn = ParquetDataNode(
  441. "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "compression": comp3}
  442. )
  443. dn.write(df)
  444. df.to_parquet(path=temp_file_2_path, compression=comp3, engine=engine)
  445. with open(temp_file_2_path, "rb") as tf:
  446. with pathlib.Path(temp_file_path).open("rb") as f:
  447. assert f.read() == tf.read()
  448. # 3 and 2
  449. comp2 = "gzip"
  450. dn = ParquetDataNode(
  451. "foo",
  452. Scope.SCENARIO,
  453. properties={
  454. "path": temp_file_path,
  455. "engine": engine,
  456. "compression": comp3,
  457. "write_kwargs": {"compression": comp2},
  458. },
  459. )
  460. dn.write(df)
  461. df.to_parquet(path=temp_file_2_path, compression=comp2, engine=engine)
  462. with open(temp_file_2_path, "rb") as tf:
  463. with pathlib.Path(temp_file_path).open("rb") as f:
  464. assert f.read() == tf.read()
  465. # 3, 2 and 1
  466. comp1 = "brotli"
  467. dn = ParquetDataNode(
  468. "foo",
  469. Scope.SCENARIO,
  470. properties={
  471. "path": temp_file_path,
  472. "engine": engine,
  473. "compression": comp3,
  474. "write_kwargs": {"compression": comp2},
  475. },
  476. )
  477. dn.write_with_kwargs(df, compression=comp1)
  478. df.to_parquet(path=temp_file_2_path, compression=comp1, engine=engine)
  479. with open(temp_file_2_path, "rb") as tf:
  480. with pathlib.Path(temp_file_path).open("rb") as f:
  481. assert f.read() == tf.read()
  482. # Read
  483. df.to_parquet(temp_file_path, engine=engine)
  484. # 2
  485. cols2 = ["a", "b"]
  486. dn = ParquetDataNode(
  487. "foo",
  488. Scope.SCENARIO,
  489. properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
  490. )
  491. assert set(dn.read().columns) == set(cols2)
  492. # 1
  493. cols1 = ["a"]
  494. dn = ParquetDataNode(
  495. "foo",
  496. Scope.SCENARIO,
  497. properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
  498. )
  499. assert set(dn.read_with_kwargs(columns=cols1).columns) == set(cols1)
  500. def test_partition_cols(self, tmpdir_factory, default_data_frame: pd.DataFrame):
  501. temp_dir_path = str(tmpdir_factory.mktemp("data").join("temp_dir"))
  502. write_kwargs = {"partition_cols": ["a", "b"]}
  503. dn = ParquetDataNode(
  504. "foo", Scope.SCENARIO, properties={"path": temp_dir_path, "write_kwargs": write_kwargs}
  505. ) # type: ignore
  506. dn.write(default_data_frame)
  507. assert pathlib.Path(temp_dir_path).is_dir()
  508. # dtypes change during round-trip with partition_cols
  509. pd.testing.assert_frame_equal(
  510. dn.read().sort_index(axis=1),
  511. default_data_frame.sort_index(axis=1),
  512. check_dtype=False,
  513. check_categorical=False,
  514. )
  515. def test_read_with_kwargs_never_written(self):
  516. path = "data/node/path"
  517. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path})
  518. assert dn.read_with_kwargs() is None