test_parquet_data_node.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. # Copyright 2023 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import pathlib
  13. from datetime import datetime
  14. from importlib import util
  15. from time import sleep
  16. import modin.pandas as modin_pd
  17. import numpy as np
  18. import pandas as pd
  19. import pytest
  20. from modin.pandas.test.utils import df_equals
  21. from pandas.testing import assert_frame_equal
  22. from taipy.config.common.scope import Scope
  23. from taipy.config.config import Config
  24. from taipy.config.exceptions.exceptions import InvalidConfigurationId
  25. from taipy.core.data._data_manager import _DataManager
  26. from taipy.core.data.data_node_id import DataNodeId
  27. from taipy.core.data.operator import JoinOperator, Operator
  28. from taipy.core.data.parquet import ParquetDataNode
  29. from taipy.core.exceptions.exceptions import (
  30. InvalidExposedType,
  31. NoData,
  32. UnknownCompressionAlgorithm,
  33. UnknownParquetEngine,
  34. )
  35. @pytest.fixture(scope="function", autouse=True)
  36. def cleanup():
  37. yield
  38. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
  39. if os.path.isfile(path):
  40. os.remove(path)
  41. class MyCustomObject:
  42. def __init__(self, id, integer, text):
  43. self.id = id
  44. self.integer = integer
  45. self.text = text
  46. class MyOtherCustomObject:
  47. def __init__(self, id, sentence):
  48. self.id = id
  49. self.sentence = sentence
  50. def create_custom_class(**kwargs):
  51. return MyOtherCustomObject(id=kwargs["id"], sentence=kwargs["text"])
  52. class TestParquetDataNode:
  53. __engine = ["pyarrow"]
  54. if util.find_spec("fastparquet"):
  55. __engine.append("fastparquet")
  56. def test_create(self):
  57. path = "data/node/path"
  58. compression = "snappy"
  59. dn = ParquetDataNode(
  60. "foo_bar", Scope.SCENARIO, properties={"path": path, "compression": compression, "name": "super name"}
  61. )
  62. assert isinstance(dn, ParquetDataNode)
  63. assert dn.storage_type() == "parquet"
  64. assert dn.config_id == "foo_bar"
  65. assert dn.name == "super name"
  66. assert dn.scope == Scope.SCENARIO
  67. assert dn.id is not None
  68. assert dn.owner_id is None
  69. assert dn.last_edit_date is None
  70. assert dn.job_ids == []
  71. assert not dn.is_ready_for_reading
  72. assert dn.path == path
  73. assert dn.exposed_type == "pandas"
  74. assert dn.compression == "snappy"
  75. assert dn.engine == "pyarrow"
  76. with pytest.raises(InvalidConfigurationId):
  77. dn = ParquetDataNode("foo bar", Scope.SCENARIO, properties={"path": path, "name": "super name"})
  78. def test_get_user_properties(self, parquet_file_path):
  79. dn_1 = ParquetDataNode("dn_1", Scope.SCENARIO, properties={"path": parquet_file_path})
  80. assert dn_1._get_user_properties() == {}
  81. dn_2 = ParquetDataNode(
  82. "dn_2",
  83. Scope.SCENARIO,
  84. properties={
  85. "exposed_type": "numpy",
  86. "default_data": "foo",
  87. "default_path": parquet_file_path,
  88. "engine": "pyarrow",
  89. "compression": "snappy",
  90. "read_kwargs": {"columns": ["a", "b"]},
  91. "write_kwargs": {"index": False},
  92. "foo": "bar",
  93. },
  94. )
  95. # exposed_type, default_data, default_path, path, engine, compression, read_kwargs, write_kwargs
  96. # are filtered out
  97. assert dn_2._get_user_properties() == {"foo": "bar"}
  98. def test_new_parquet_data_node_with_existing_file_is_ready_for_reading(self, parquet_file_path):
  99. not_ready_dn_cfg = Config.configure_data_node(
  100. "not_ready_data_node_config_id", "parquet", path="NOT_EXISTING.parquet"
  101. )
  102. not_ready_dn = _DataManager._bulk_get_or_create([not_ready_dn_cfg])[not_ready_dn_cfg]
  103. assert not not_ready_dn.is_ready_for_reading
  104. ready_dn_cfg = Config.configure_data_node("ready_data_node_config_id", "parquet", path=parquet_file_path)
  105. ready_dn = _DataManager._bulk_get_or_create([ready_dn_cfg])[ready_dn_cfg]
  106. assert ready_dn.is_ready_for_reading
  107. @pytest.mark.parametrize(
  108. ["properties", "exists"],
  109. [
  110. ({}, False),
  111. ({"default_data": {"a": ["foo", "bar"]}}, True),
  112. ],
  113. )
  114. def test_create_with_default_data(self, properties, exists):
  115. dn = ParquetDataNode("foo", Scope.SCENARIO, DataNodeId("dn_id"), properties=properties)
  116. assert os.path.exists(dn.path) is exists
  117. @pytest.mark.parametrize("engine", __engine)
  118. def test_read_file(self, engine, parquet_file_path):
  119. not_existing_parquet = ParquetDataNode(
  120. "foo", Scope.SCENARIO, properties={"path": "nonexistent.parquet", "engine": engine}
  121. )
  122. with pytest.raises(NoData):
  123. assert not_existing_parquet.read() is None
  124. not_existing_parquet.read_or_raise()
  125. df = pd.read_parquet(parquet_file_path)
  126. # Create ParquetDataNode without exposed_type (Default is pandas.DataFrame)
  127. parquet_data_node_as_pandas = ParquetDataNode(
  128. "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "engine": engine}
  129. )
  130. data_pandas = parquet_data_node_as_pandas.read()
  131. assert isinstance(data_pandas, pd.DataFrame)
  132. assert len(data_pandas) == 2
  133. assert data_pandas.equals(df)
  134. assert np.array_equal(data_pandas.to_numpy(), df.to_numpy())
  135. # Create ParquetDataNode with numpy exposed_type
  136. parquet_data_node_as_numpy = ParquetDataNode(
  137. "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy", "engine": engine}
  138. )
  139. data_numpy = parquet_data_node_as_numpy.read()
  140. assert isinstance(data_numpy, np.ndarray)
  141. assert len(data_numpy) == 2
  142. assert np.array_equal(data_numpy, df.to_numpy())
  143. @pytest.mark.modin
  144. @pytest.mark.parametrize("engine", __engine)
  145. def test_read_file_modin(self, engine, parquet_file_path):
  146. df = pd.read_parquet(parquet_file_path)
  147. # Create ParquetDataNode with modin exposed_type
  148. parquet_data_node_as_modin = ParquetDataNode(
  149. "bar", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "modin", "engine": engine}
  150. )
  151. data_modin = parquet_data_node_as_modin.read()
  152. assert isinstance(data_modin, modin_pd.DataFrame)
  153. assert len(data_modin) == 2
  154. assert data_modin.equals(df)
  155. assert np.array_equal(data_modin.to_numpy(), df.to_numpy())
  156. @pytest.mark.parametrize("engine", __engine)
  157. def test_read_folder(self, engine):
  158. parquet_folder_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/parquet_example")
  159. df = pd.read_parquet(parquet_folder_path)
  160. parquet_data_node_as_pandas = ParquetDataNode(
  161. "bar", Scope.SCENARIO, properties={"path": parquet_folder_path, "engine": engine}
  162. )
  163. data_pandas = parquet_data_node_as_pandas.read()
  164. assert isinstance(data_pandas, pd.DataFrame)
  165. assert len(data_pandas) == 5
  166. assert data_pandas.equals(df)
  167. assert np.array_equal(data_pandas.to_numpy(), df.to_numpy())
  168. def test_set_path(self):
  169. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": "foo.parquet"})
  170. assert dn.path == "foo.parquet"
  171. dn.path = "bar.parquet"
  172. assert dn.path == "bar.parquet"
  173. @pytest.mark.parametrize("engine", __engine)
  174. def test_read_write_after_modify_path(self, engine):
  175. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  176. new_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/temp.parquet")
  177. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": engine})
  178. read_data = dn.read()
  179. assert read_data is not None
  180. dn.path = new_path
  181. with pytest.raises(FileNotFoundError):
  182. dn.read()
  183. dn.write(read_data)
  184. assert dn.read().equals(read_data)
  185. def test_read_custom_exposed_type(self):
  186. example_parquet_path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  187. dn = ParquetDataNode(
  188. "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": MyCustomObject}
  189. )
  190. assert all([isinstance(obj, MyCustomObject) for obj in dn.read()])
  191. dn = ParquetDataNode(
  192. "foo", Scope.SCENARIO, properties={"path": example_parquet_path, "exposed_type": create_custom_class}
  193. )
  194. assert all([isinstance(obj, MyOtherCustomObject) for obj in dn.read()])
  195. def test_raise_error_unknown_parquet_engine(self):
  196. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  197. with pytest.raises(UnknownParquetEngine):
  198. ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "engine": "foo"})
  199. def test_raise_error_unknown_compression_algorithm(self):
  200. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  201. with pytest.raises(UnknownCompressionAlgorithm):
  202. ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "compression": "foo"})
  203. def test_raise_error_invalid_exposed_type(self):
  204. path = os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.parquet")
  205. with pytest.raises(InvalidExposedType):
  206. ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path, "exposed_type": "foo"})
  207. def test_read_empty_data(self, tmpdir_factory):
  208. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  209. empty_df = pd.DataFrame([])
  210. empty_df.to_parquet(temp_file_path)
  211. # Pandas
  212. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
  213. assert dn.read().equals(empty_df)
  214. # Numpy
  215. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "numpy"})
  216. assert np.array_equal(dn.read(), empty_df.to_numpy())
  217. # Custom
  218. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": MyCustomObject})
  219. assert dn.read() == []
  220. def test_get_system_file_modified_date_instead_of_last_edit_date(self, tmpdir_factory):
  221. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  222. pd.DataFrame([]).to_parquet(temp_file_path)
  223. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path, "exposed_type": "pandas"})
  224. dn.write(pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]}))
  225. previous_edit_date = dn.last_edit_date
  226. sleep(0.1)
  227. pd.DataFrame(pd.DataFrame(data={"col1": [5, 6], "col2": [7, 8]})).to_parquet(temp_file_path)
  228. new_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
  229. assert previous_edit_date < dn.last_edit_date
  230. assert new_edit_date == dn.last_edit_date
  231. sleep(0.1)
  232. dn.write(pd.DataFrame(data={"col1": [9, 10], "col2": [10, 12]}))
  233. assert new_edit_date < dn.last_edit_date
  234. os.unlink(temp_file_path)
  235. def test_get_system_folder_modified_date_instead_of_last_edit_date(self, tmpdir_factory):
  236. temp_folder_path = tmpdir_factory.mktemp("data").strpath
  237. temp_file_path = os.path.join(temp_folder_path, "temp.parquet")
  238. pd.DataFrame([]).to_parquet(temp_file_path)
  239. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_folder_path})
  240. initial_edit_date = dn.last_edit_date
  241. # Sleep so that the file can be created successfully on Ubuntu
  242. sleep(0.1)
  243. pd.DataFrame(pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})).to_parquet(temp_file_path)
  244. first_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
  245. assert dn.last_edit_date > initial_edit_date
  246. assert dn.last_edit_date == first_edit_date
  247. sleep(0.1)
  248. pd.DataFrame(pd.DataFrame(data={"col1": [5, 6], "col2": [7, 8]})).to_parquet(temp_file_path)
  249. second_edit_date = datetime.fromtimestamp(os.path.getmtime(temp_file_path))
  250. assert dn.last_edit_date > first_edit_date
  251. assert dn.last_edit_date == second_edit_date
  252. os.unlink(temp_file_path)
  253. @pytest.mark.skipif(not util.find_spec("fastparquet"), reason="Append parquet requires fastparquet to be installed")
  254. @pytest.mark.parametrize(
  255. "content",
  256. [
  257. ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  258. (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
  259. ],
  260. )
  261. def test_append_pandas(self, parquet_file_path, default_data_frame, content):
  262. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path})
  263. assert_frame_equal(dn.read(), default_data_frame)
  264. dn.append(content)
  265. assert_frame_equal(
  266. dn.read(),
  267. pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(drop=True),
  268. )
  269. @pytest.mark.modin
  270. @pytest.mark.skipif(not util.find_spec("fastparquet"), reason="Append parquet requires fastparquet to be installed")
  271. @pytest.mark.parametrize(
  272. "content",
  273. [
  274. ([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  275. (pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}])),
  276. ],
  277. )
  278. def test_append_modin(self, parquet_file_path, default_data_frame, content):
  279. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "modin"})
  280. df_equals(dn.read(), modin_pd.DataFrame(default_data_frame))
  281. dn.append(content)
  282. df_equals(
  283. dn.read(),
  284. modin_pd.concat([default_data_frame, pd.DataFrame(content, columns=["a", "b", "c"])]).reset_index(
  285. drop=True
  286. ),
  287. )
  288. @pytest.mark.parametrize(
  289. "data",
  290. [
  291. [{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}],
  292. pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  293. modin_pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  294. ],
  295. )
  296. def test_write_to_disk(self, tmpdir_factory, data):
  297. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  298. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path})
  299. dn.write(data)
  300. assert pathlib.Path(temp_file_path).exists()
  301. assert isinstance(dn.read(), pd.DataFrame)
  302. @pytest.mark.modin
  303. @pytest.mark.parametrize(
  304. "data",
  305. [
  306. modin_pd.DataFrame([{"a": 11, "b": 22, "c": 33}, {"a": 44, "b": 55, "c": 66}]),
  307. ],
  308. )
  309. def test_write_to_disk_modin(self, tmpdir_factory, data):
  310. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  311. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": temp_file_path})
  312. dn.write(data)
  313. assert pathlib.Path(temp_file_path).exists()
  314. assert isinstance(dn.read(), pd.DataFrame)
  315. def test_filter_pandas_exposed_type(self, parquet_file_path):
  316. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "pandas"})
  317. dn.write(
  318. [
  319. {"foo": 1, "bar": 1},
  320. {"foo": 1, "bar": 2},
  321. {"foo": 1},
  322. {"foo": 2, "bar": 2},
  323. {"bar": 2},
  324. ]
  325. )
  326. # Test datanode indexing and slicing
  327. assert dn["foo"].equals(pd.Series([1, 1, 1, 2, None]))
  328. assert dn["bar"].equals(pd.Series([1, 2, None, 2, 2]))
  329. assert dn[:2].equals(pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
  330. # Test filter data
  331. filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
  332. filtered_by_indexing = dn[dn["foo"] == 1]
  333. expected_data = pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
  334. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  335. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  336. filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
  337. filtered_by_indexing = dn[dn["foo"] != 1]
  338. expected_data = pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
  339. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  340. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  341. filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
  342. filtered_by_indexing = dn[dn["bar"] == 2]
  343. expected_data = pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
  344. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  345. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  346. filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
  347. filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
  348. expected_data = pd.DataFrame(
  349. [
  350. {"foo": 1.0, "bar": 1.0},
  351. {"foo": 1.0, "bar": 2.0},
  352. {"foo": 2.0, "bar": 2.0},
  353. {"bar": 2.0},
  354. ]
  355. )
  356. assert_frame_equal(filtered_by_filter_method.reset_index(drop=True), expected_data)
  357. assert_frame_equal(filtered_by_indexing.reset_index(drop=True), expected_data)
  358. @pytest.mark.modin
  359. def test_filter_modin_exposed_type(self, parquet_file_path):
  360. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "modin"})
  361. dn.write(
  362. [
  363. {"foo": 1, "bar": 1},
  364. {"foo": 1, "bar": 2},
  365. {"foo": 1},
  366. {"foo": 2, "bar": 2},
  367. {"bar": 2},
  368. ]
  369. )
  370. # Test datanode indexing and slicing
  371. assert dn["foo"].equals(modin_pd.Series([1, 1, 1, 2, None]))
  372. assert dn["bar"].equals(modin_pd.Series([1, 2, None, 2, 2]))
  373. assert dn[:2].equals(modin_pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}]))
  374. # Test filter data
  375. filtered_by_filter_method = dn.filter(("foo", 1, Operator.EQUAL))
  376. filtered_by_indexing = dn[dn["foo"] == 1]
  377. expected_data = modin_pd.DataFrame([{"foo": 1.0, "bar": 1.0}, {"foo": 1.0, "bar": 2.0}, {"foo": 1.0}])
  378. df_equals(filtered_by_filter_method.reset_index(drop=True), expected_data)
  379. df_equals(filtered_by_indexing.reset_index(drop=True), expected_data)
  380. filtered_by_filter_method = dn.filter(("foo", 1, Operator.NOT_EQUAL))
  381. filtered_by_indexing = dn[dn["foo"] != 1]
  382. expected_data = modin_pd.DataFrame([{"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
  383. df_equals(filtered_by_filter_method.reset_index(drop=True), expected_data)
  384. df_equals(filtered_by_indexing.reset_index(drop=True), expected_data)
  385. filtered_by_filter_method = dn.filter(("bar", 2, Operator.EQUAL))
  386. filtered_by_indexing = dn[dn["bar"] == 2]
  387. expected_data = modin_pd.DataFrame([{"foo": 1.0, "bar": 2.0}, {"foo": 2.0, "bar": 2.0}, {"bar": 2.0}])
  388. df_equals(filtered_by_filter_method.reset_index(drop=True), expected_data)
  389. df_equals(filtered_by_indexing.reset_index(drop=True), expected_data)
  390. filtered_by_filter_method = dn.filter([("bar", 1, Operator.EQUAL), ("bar", 2, Operator.EQUAL)], JoinOperator.OR)
  391. filtered_by_indexing = dn[(dn["bar"] == 1) | (dn["bar"] == 2)]
  392. expected_data = modin_pd.DataFrame(
  393. [
  394. {"foo": 1.0, "bar": 1.0},
  395. {"foo": 1.0, "bar": 2.0},
  396. {"foo": 2.0, "bar": 2.0},
  397. {"bar": 2.0},
  398. ]
  399. )
  400. df_equals(filtered_by_filter_method.reset_index(drop=True), expected_data)
  401. df_equals(filtered_by_indexing.reset_index(drop=True), expected_data)
  402. def test_filter_numpy_exposed_type(self, parquet_file_path):
  403. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": parquet_file_path, "exposed_type": "numpy"})
  404. dn.write(
  405. [
  406. [1, 1],
  407. [1, 2],
  408. [1, 3],
  409. [2, 1],
  410. [2, 2],
  411. [2, 3],
  412. ]
  413. )
  414. # Test datanode indexing and slicing
  415. assert np.array_equal(dn[0], np.array([1, 1]))
  416. assert np.array_equal(dn[1], np.array([1, 2]))
  417. assert np.array_equal(dn[:3], np.array([[1, 1], [1, 2], [1, 3]]))
  418. assert np.array_equal(dn[:, 0], np.array([1, 1, 1, 2, 2, 2]))
  419. assert np.array_equal(dn[1:4, :1], np.array([[1], [1], [2]]))
  420. # Test filter data
  421. assert np.array_equal(dn.filter((0, 1, Operator.EQUAL)), np.array([[1, 1], [1, 2], [1, 3]]))
  422. assert np.array_equal(dn[dn[:, 0] == 1], np.array([[1, 1], [1, 2], [1, 3]]))
  423. assert np.array_equal(dn.filter((0, 1, Operator.NOT_EQUAL)), np.array([[2, 1], [2, 2], [2, 3]]))
  424. assert np.array_equal(dn[dn[:, 0] != 1], np.array([[2, 1], [2, 2], [2, 3]]))
  425. assert np.array_equal(dn.filter((1, 2, Operator.EQUAL)), np.array([[1, 2], [2, 2]]))
  426. assert np.array_equal(dn[dn[:, 1] == 2], np.array([[1, 2], [2, 2]]))
  427. assert np.array_equal(
  428. dn.filter([(1, 1, Operator.EQUAL), (1, 2, Operator.EQUAL)], JoinOperator.OR),
  429. np.array([[1, 1], [1, 2], [2, 1], [2, 2]]),
  430. )
  431. assert np.array_equal(dn[(dn[:, 1] == 1) | (dn[:, 1] == 2)], np.array([[1, 1], [1, 2], [2, 1], [2, 2]]))
  432. @pytest.mark.parametrize("engine", __engine)
  433. def test_pandas_parquet_config_kwargs(self, engine, tmpdir_factory):
  434. read_kwargs = {"filters": [("integer", "<", 10)], "columns": ["integer"]}
  435. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  436. dn = ParquetDataNode(
  437. "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "read_kwargs": read_kwargs}
  438. )
  439. df = pd.read_csv(os.path.join(pathlib.Path(__file__).parent.resolve(), "data_sample/example.csv"))
  440. dn.write(df)
  441. assert set(pd.read_parquet(temp_file_path).columns) == {"id", "integer", "text"}
  442. assert set(dn.read().columns) == set(read_kwargs["columns"])
  443. # !!! filter doesn't work with `fastparquet` without partition_cols
  444. if engine == "pyarrow":
  445. assert len(dn.read()) != len(df)
  446. assert len(dn.read()) == 2
  447. @pytest.mark.parametrize("engine", __engine)
  448. def test_kwarg_precedence(self, engine, tmpdir_factory, default_data_frame):
  449. # Precedence:
  450. # 1. Class read/write methods
  451. # 2. Defined in read_kwargs and write_kwargs, in properties
  452. # 3. Defined top-level in properties
  453. temp_file_path = str(tmpdir_factory.mktemp("data").join("temp.parquet"))
  454. temp_file_2_path = str(tmpdir_factory.mktemp("data").join("temp_2.parquet"))
  455. df = default_data_frame.copy(deep=True)
  456. # Write
  457. # 3
  458. comp3 = "snappy"
  459. dn = ParquetDataNode(
  460. "foo", Scope.SCENARIO, properties={"path": temp_file_path, "engine": engine, "compression": comp3}
  461. )
  462. dn.write(df)
  463. df.to_parquet(path=temp_file_2_path, compression=comp3, engine=engine)
  464. with open(temp_file_2_path, "rb") as tf:
  465. with pathlib.Path(temp_file_path).open("rb") as f:
  466. assert f.read() == tf.read()
  467. # 3 and 2
  468. comp2 = "gzip"
  469. dn = ParquetDataNode(
  470. "foo",
  471. Scope.SCENARIO,
  472. properties={
  473. "path": temp_file_path,
  474. "engine": engine,
  475. "compression": comp3,
  476. "write_kwargs": {"compression": comp2},
  477. },
  478. )
  479. dn.write(df)
  480. df.to_parquet(path=temp_file_2_path, compression=comp2, engine=engine)
  481. with open(temp_file_2_path, "rb") as tf:
  482. with pathlib.Path(temp_file_path).open("rb") as f:
  483. assert f.read() == tf.read()
  484. # 3, 2 and 1
  485. comp1 = "brotli"
  486. dn = ParquetDataNode(
  487. "foo",
  488. Scope.SCENARIO,
  489. properties={
  490. "path": temp_file_path,
  491. "engine": engine,
  492. "compression": comp3,
  493. "write_kwargs": {"compression": comp2},
  494. },
  495. )
  496. dn.write_with_kwargs(df, compression=comp1)
  497. df.to_parquet(path=temp_file_2_path, compression=comp1, engine=engine)
  498. with open(temp_file_2_path, "rb") as tf:
  499. with pathlib.Path(temp_file_path).open("rb") as f:
  500. assert f.read() == tf.read()
  501. # Read
  502. df.to_parquet(temp_file_path, engine=engine)
  503. # 2
  504. cols2 = ["a", "b"]
  505. dn = ParquetDataNode(
  506. "foo",
  507. Scope.SCENARIO,
  508. properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
  509. )
  510. assert set(dn.read().columns) == set(cols2)
  511. # 1
  512. cols1 = ["a"]
  513. dn = ParquetDataNode(
  514. "foo",
  515. Scope.SCENARIO,
  516. properties={"path": temp_file_path, "engine": engine, "read_kwargs": {"columns": cols2}},
  517. )
  518. assert set(dn.read_with_kwargs(columns=cols1).columns) == set(cols1)
  519. def test_partition_cols(self, tmpdir_factory, default_data_frame: pd.DataFrame):
  520. temp_dir_path = str(tmpdir_factory.mktemp("data").join("temp_dir"))
  521. write_kwargs = {"partition_cols": ["a", "b"]}
  522. dn = ParquetDataNode(
  523. "foo", Scope.SCENARIO, properties={"path": temp_dir_path, "write_kwargs": write_kwargs}
  524. ) # type: ignore
  525. dn.write(default_data_frame)
  526. assert pathlib.Path(temp_dir_path).is_dir()
  527. # dtypes change during round-trip with partition_cols
  528. pd.testing.assert_frame_equal(
  529. dn.read().sort_index(axis=1),
  530. default_data_frame.sort_index(axis=1),
  531. check_dtype=False,
  532. check_categorical=False,
  533. )
  534. def test_read_with_kwargs_never_written(self):
  535. path = "data/node/path"
  536. dn = ParquetDataNode("foo", Scope.SCENARIO, properties={"path": path})
  537. assert dn.read_with_kwargs() is None