parquet.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from datetime import datetime, timedelta
  12. from os.path import isdir, isfile
  13. from typing import Any, Dict, List, Optional, Set
  14. import numpy as np
  15. import pandas as pd
  16. from taipy.config.common.scope import Scope
  17. from .._version._version_manager_factory import _VersionManagerFactory
  18. from ..exceptions.exceptions import UnknownCompressionAlgorithm, UnknownParquetEngine
  19. from ..job.job_id import JobId
  20. from ._file_datanode_mixin import _FileDataNodeMixin
  21. from ._tabular_datanode_mixin import _TabularDataNodeMixin
  22. from .data_node import DataNode
  23. from .data_node_id import DataNodeId, Edit
  24. class ParquetDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
  25. """Data Node stored as a Parquet file.
  26. Attributes:
  27. config_id (str): Identifier of the data node configuration. This string must be a valid
  28. Python identifier.
  29. scope (Scope^): The scope of this data node.
  30. id (str): The unique identifier of this data node.
  31. owner_id (str): The identifier of the owner (sequence_id, scenario_id, cycle_id) or `None`.
  32. parent_ids (Optional[Set[str]]): The identifiers of the parent tasks or `None`.
  33. last_edit_date (datetime): The date and time of the last modification.
  34. edits (List[Edit^]): The ordered list of edits for that job.
  35. version (str): The string indicates the application version of the data node to instantiate. If not provided,
  36. the current version is used.
  37. validity_period (Optional[timedelta]): The duration implemented as a timedelta since the last edit date for
  38. which the data node can be considered up-to-date. Once the validity period has passed, the data node is
  39. considered stale and relevant tasks will run even if they are skippable (see the
  40. [Task management page](../core/entities/task-mgt.md) for more details).
  41. If _validity_period_ is set to `None`, the data node is always up-to-date.
  42. edit_in_progress (bool): True if a task computing the data node has been submitted
  43. and not completed yet. False otherwise.
  44. editor_id (Optional[str]): The identifier of the user who is currently editing the data node.
  45. editor_expiration_date (Optional[datetime]): The expiration date of the editor lock.
  46. path (str): The path to the Parquet file.
  47. properties (dict[str, Any]): A dictionary of additional properties. *properties*
  48. must have a *"default_path"* or *"path"* entry with the path of the Parquet file:
  49. - *"default_path"* (`str`): The default path of the Parquet file.
  50. - *"exposed_type"*: The exposed type of the data read from Parquet file. The default
  51. value is `pandas`.
  52. - *"engine"* (`Optional[str]`): Parquet library to use. Possible values are
  53. *"fastparquet"* or *"pyarrow"*.<br/>
  54. The default value is *"pyarrow"*.
  55. - *"compression"* (`Optional[str]`): Name of the compression to use. Possible values
  56. are *"snappy"*, *"gzip"*, *"brotli"*, or *"none"* (no compression).<br/>
  57. The default value is *"snappy"*.
  58. - *"read_kwargs"* (`Optional[dict]`): Additional parameters passed to the
  59. *pandas.read_parquet()* function.
  60. - *"write_kwargs"* (`Optional[dict]`): Additional parameters passed to the
  61. *pandas.DataFrame.write_parquet()* function.
  62. The parameters in *"read_kwargs"* and *"write_kwargs"* have a
  63. **higher precedence** than the top-level parameters which are also passed to
  64. Pandas.
  65. """
  66. __STORAGE_TYPE = "parquet"
  67. __ENGINE_PROPERTY = "engine"
  68. __VALID_PARQUET_ENGINES = ["pyarrow", "fastparquet"]
  69. __COMPRESSION_PROPERTY = "compression"
  70. __VALID_COMPRESSION_ALGORITHMS = ["snappy", "gzip", "brotli", "none"]
  71. __READ_KWARGS_PROPERTY = "read_kwargs"
  72. __WRITE_KWARGS_PROPERTY = "write_kwargs"
  73. _REQUIRED_PROPERTIES: List[str] = []
  74. def __init__(
  75. self,
  76. config_id: str,
  77. scope: Scope,
  78. id: Optional[DataNodeId] = None,
  79. owner_id: Optional[str] = None,
  80. parent_ids: Optional[Set[str]] = None,
  81. last_edit_date: Optional[datetime] = None,
  82. edits: Optional[List[Edit]] = None,
  83. version: Optional[str] = None,
  84. validity_period: Optional[timedelta] = None,
  85. edit_in_progress: bool = False,
  86. editor_id: Optional[str] = None,
  87. editor_expiration_date: Optional[datetime] = None,
  88. properties: Optional[Dict] = None,
  89. ):
  90. self.id = id or self._new_id(config_id)
  91. if properties is None:
  92. properties = {}
  93. if self.__ENGINE_PROPERTY not in properties.keys():
  94. properties[self.__ENGINE_PROPERTY] = "pyarrow"
  95. if properties[self.__ENGINE_PROPERTY] not in self.__VALID_PARQUET_ENGINES:
  96. raise UnknownParquetEngine(
  97. f"Invalid parquet engine: {properties[self.__ENGINE_PROPERTY]}. "
  98. f"Supported engines are {', '.join(self.__VALID_PARQUET_ENGINES)}"
  99. )
  100. if self.__COMPRESSION_PROPERTY not in properties.keys():
  101. properties[self.__COMPRESSION_PROPERTY] = "snappy"
  102. if properties[self.__COMPRESSION_PROPERTY] == "none":
  103. properties[self.__COMPRESSION_PROPERTY] = None
  104. if (
  105. properties[self.__COMPRESSION_PROPERTY]
  106. and properties[self.__COMPRESSION_PROPERTY] not in self.__VALID_COMPRESSION_ALGORITHMS
  107. ):
  108. raise UnknownCompressionAlgorithm(
  109. f"Unsupported compression algorithm: {properties[self.__COMPRESSION_PROPERTY]}. "
  110. f"Supported algorithms are {', '.join(self.__VALID_COMPRESSION_ALGORITHMS)}"
  111. )
  112. if self.__READ_KWARGS_PROPERTY not in properties.keys():
  113. properties[self.__READ_KWARGS_PROPERTY] = {}
  114. if self.__WRITE_KWARGS_PROPERTY not in properties.keys():
  115. properties[self.__WRITE_KWARGS_PROPERTY] = {}
  116. properties[self._EXPOSED_TYPE_PROPERTY] = _TabularDataNodeMixin._get_valid_exposed_type(properties)
  117. self._check_exposed_type(properties[self._EXPOSED_TYPE_PROPERTY])
  118. default_value = properties.pop(self._DEFAULT_DATA_KEY, None)
  119. _FileDataNodeMixin.__init__(self, properties)
  120. _TabularDataNodeMixin.__init__(self, **properties)
  121. DataNode.__init__(
  122. self,
  123. config_id,
  124. scope,
  125. self.id,
  126. owner_id,
  127. parent_ids,
  128. last_edit_date,
  129. edits,
  130. version or _VersionManagerFactory._build_manager()._get_latest_version(),
  131. validity_period,
  132. edit_in_progress,
  133. editor_id,
  134. editor_expiration_date,
  135. **properties,
  136. )
  137. self._write_default_data(default_value)
  138. if not self._last_edit_date and (isfile(self._path) or isdir(self._path)):
  139. self._last_edit_date = datetime.now()
  140. self._TAIPY_PROPERTIES.update(
  141. {
  142. self._EXPOSED_TYPE_PROPERTY,
  143. self._PATH_KEY,
  144. self._DEFAULT_PATH_KEY,
  145. self._DEFAULT_DATA_KEY,
  146. self._IS_GENERATED_KEY,
  147. self.__ENGINE_PROPERTY,
  148. self.__COMPRESSION_PROPERTY,
  149. self.__READ_KWARGS_PROPERTY,
  150. self.__WRITE_KWARGS_PROPERTY,
  151. }
  152. )
  153. @classmethod
  154. def storage_type(cls) -> str:
  155. return cls.__STORAGE_TYPE
  156. def _read(self):
  157. return self.read_with_kwargs()
  158. def _read_as(self, read_kwargs: Dict):
  159. custom_class = self.properties[self._EXPOSED_TYPE_PROPERTY]
  160. list_of_dicts = self._read_as_pandas_dataframe(read_kwargs).to_dict(orient="records")
  161. return [custom_class(**dct) for dct in list_of_dicts]
  162. def _read_as_numpy(self, read_kwargs: Dict) -> np.ndarray:
  163. return self._read_as_pandas_dataframe(read_kwargs).to_numpy()
  164. def _read_as_pandas_dataframe(self, read_kwargs: Dict) -> pd.DataFrame:
  165. return pd.read_parquet(self._path, **read_kwargs)
  166. def _append(self, data: Any):
  167. self.write_with_kwargs(data, engine="fastparquet", append=True)
  168. def _write(self, data: Any):
  169. self.write_with_kwargs(data)
  170. def write_with_kwargs(self, data: Any, job_id: Optional[JobId] = None, **write_kwargs):
  171. """Write the data referenced by this data node.
  172. Keyword arguments here which are also present in the Data Node config will overwrite them.
  173. Parameters:
  174. data (Any): The data to write.
  175. job_id (JobId^): An optional identifier of the writer.
  176. **write_kwargs (dict[str, any]): The keyword arguments passed to the function
  177. `pandas.DataFrame.to_parquet()`.
  178. """
  179. kwargs = {
  180. self.__ENGINE_PROPERTY: self.properties[self.__ENGINE_PROPERTY],
  181. self.__COMPRESSION_PROPERTY: self.properties[self.__COMPRESSION_PROPERTY],
  182. }
  183. kwargs.update(self.properties[self.__WRITE_KWARGS_PROPERTY])
  184. kwargs.update(write_kwargs)
  185. if isinstance(data, pd.Series):
  186. df = pd.DataFrame(data)
  187. else:
  188. df = self._convert_data_to_dataframe(self.properties[self._EXPOSED_TYPE_PROPERTY], data)
  189. # Ensure that the columns are strings, otherwise writing will fail with pandas 1.3.5
  190. df.columns = df.columns.astype(str)
  191. df.to_parquet(self._path, **kwargs)
  192. self.track_edit(timestamp=datetime.now(), job_id=job_id)
  193. def read_with_kwargs(self, **read_kwargs):
  194. """Read data from this data node.
  195. Keyword arguments here which are also present in the Data Node config will overwrite them.
  196. Parameters:
  197. **read_kwargs (dict[str, any]): The keyword arguments passed to the function
  198. `pandas.read_parquet()`.
  199. """
  200. # return None if data was never written
  201. if not self.last_edit_date:
  202. self._DataNode__logger.warning(
  203. f"Data node {self.id} from config {self.config_id} is being read but has never been written."
  204. )
  205. return None
  206. kwargs = self.properties[self.__READ_KWARGS_PROPERTY]
  207. kwargs.update(
  208. {
  209. self.__ENGINE_PROPERTY: self.properties[self.__ENGINE_PROPERTY],
  210. }
  211. )
  212. kwargs.update(read_kwargs)
  213. if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_PANDAS:
  214. return self._read_as_pandas_dataframe(kwargs)
  215. if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_NUMPY:
  216. return self._read_as_numpy(kwargs)
  217. return self._read_as(kwargs)