csv.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import csv
  12. import os
  13. from datetime import datetime, timedelta
  14. from os.path import isfile
  15. from typing import Any, Dict, List, Optional, Set
  16. import numpy as np
  17. import pandas as pd
  18. from taipy.config.common.scope import Scope
  19. from .._backup._backup import _replace_in_backup_file
  20. from .._entity._reload import _self_reload
  21. from .._version._version_manager_factory import _VersionManagerFactory
  22. from ..job.job_id import JobId
  23. from ._abstract_file import _AbstractFileDataNode
  24. from ._abstract_tabular import _AbstractTabularDataNode
  25. from .data_node import DataNode
  26. from .data_node_id import DataNodeId, Edit
  27. class CSVDataNode(DataNode, _AbstractFileDataNode, _AbstractTabularDataNode):
  28. """Data Node stored as a CSV file.
  29. Attributes:
  30. config_id (str): Identifier of the data node configuration. This string must be a valid
  31. Python identifier.
  32. scope (Scope^): The scope of this data node.
  33. id (str): The unique identifier of this data node.
  34. owner_id (str): The identifier of the owner (sequence_id, scenario_id, cycle_id) or `None`.
  35. parent_ids (Optional[Set[str]]): The identifiers of the parent tasks or `None`.
  36. last_edit_date (datetime): The date and time of the last modification.
  37. edits (List[Edit^]): The ordered list of edits for that job.
  38. version (str): The string indicates the application version of the data node to instantiate. If not provided,
  39. the current version is used.
  40. validity_period (Optional[timedelta]): The duration implemented as a timedelta since the last edit date for
  41. which the data node can be considered up-to-date. Once the validity period has passed, the data node is
  42. considered stale and relevant tasks will run even if they are skippable (see the
  43. [Task management page](../core/entities/task-mgt.md) for more details).
  44. If _validity_period_ is set to `None`, the data node is always up-to-date.
  45. edit_in_progress (bool): True if a task computing the data node has been submitted
  46. and not completed yet. False otherwise.
  47. editor_id (Optional[str]): The identifier of the user who is currently editing the data node.
  48. editor_expiration_date (Optional[datetime]): The expiration date of the editor lock.
  49. path (str): The path to the CSV file.
  50. properties (dict[str, Any]): A dictionary of additional properties. The _properties_
  51. must have a _"default_path"_ or _"path"_ entry with the path of the CSV file:
  52. - _"default_path"_ `(str)`: The default path of the CSV file.\n
  53. - _"encoding"_ `(str)`: The encoding of the CSV file. The default value is `utf-8`.\n
  54. - _"default_data"_: The default data of the data nodes instantiated from this csv data node.\n
  55. - _"has_header"_ `(bool)`: If True, indicates that the CSV file has a header.\n
  56. - _"exposed_type"_: The exposed type of the data read from CSV file. The default value is `pandas`.\n
  57. """
  58. __STORAGE_TYPE = "csv"
  59. __PATH_KEY = "path"
  60. __DEFAULT_PATH_KEY = "default_path"
  61. __ENCODING_KEY = "encoding"
  62. __DEFAULT_DATA_KEY = "default_data"
  63. __HAS_HEADER_PROPERTY = "has_header"
  64. _REQUIRED_PROPERTIES: List[str] = []
  65. def __init__(
  66. self,
  67. config_id: str,
  68. scope: Scope,
  69. id: Optional[DataNodeId] = None,
  70. owner_id: Optional[str] = None,
  71. parent_ids: Optional[Set[str]] = None,
  72. last_edit_date: Optional[datetime] = None,
  73. edits: Optional[List[Edit]] = None,
  74. version: Optional[str] = None,
  75. validity_period: Optional[timedelta] = None,
  76. edit_in_progress: bool = False,
  77. editor_id: Optional[str] = None,
  78. editor_expiration_date: Optional[datetime] = None,
  79. properties: Optional[Dict] = None,
  80. ):
  81. if properties is None:
  82. properties = {}
  83. default_value = properties.pop(self.__DEFAULT_DATA_KEY, None)
  84. if self.__ENCODING_KEY not in properties.keys():
  85. properties[self.__ENCODING_KEY] = "utf-8"
  86. if self.__HAS_HEADER_PROPERTY not in properties.keys():
  87. properties[self.__HAS_HEADER_PROPERTY] = True
  88. if self._EXPOSED_TYPE_PROPERTY not in properties.keys():
  89. properties[self._EXPOSED_TYPE_PROPERTY] = self._EXPOSED_TYPE_PANDAS
  90. elif properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_MODIN:
  91. # Deprecated in favor of pandas since 3.1.0
  92. properties[self._EXPOSED_TYPE_PROPERTY] = self._EXPOSED_TYPE_PANDAS
  93. self._check_exposed_type(properties[self._EXPOSED_TYPE_PROPERTY])
  94. super().__init__(
  95. config_id,
  96. scope,
  97. id,
  98. owner_id,
  99. parent_ids,
  100. last_edit_date,
  101. edits,
  102. version or _VersionManagerFactory._build_manager()._get_latest_version(),
  103. validity_period,
  104. edit_in_progress,
  105. editor_id,
  106. editor_expiration_date,
  107. **properties,
  108. )
  109. self._path = properties.get(self.__PATH_KEY, properties.get(self.__DEFAULT_PATH_KEY))
  110. if not self._path:
  111. self._path = self._build_path(self.storage_type())
  112. properties[self.__PATH_KEY] = self._path
  113. if not self._last_edit_date and isfile(self._path):
  114. self._last_edit_date = datetime.now()
  115. if default_value is not None and not os.path.exists(self._path):
  116. self._write(default_value)
  117. self._last_edit_date = datetime.now()
  118. self._edits.append(
  119. Edit(
  120. {
  121. "timestamp": self._last_edit_date,
  122. "writer_identifier": "TAIPY",
  123. "comments": "Default data written.",
  124. }
  125. )
  126. )
  127. self._TAIPY_PROPERTIES.update(
  128. {
  129. self._EXPOSED_TYPE_PROPERTY,
  130. self.__PATH_KEY,
  131. self.__DEFAULT_PATH_KEY,
  132. self.__ENCODING_KEY,
  133. self.__DEFAULT_DATA_KEY,
  134. self.__HAS_HEADER_PROPERTY,
  135. }
  136. )
  137. @classmethod
  138. def storage_type(cls) -> str:
  139. return cls.__STORAGE_TYPE
  140. @property # type: ignore
  141. @_self_reload(DataNode._MANAGER_NAME)
  142. def path(self):
  143. return self._path
  144. @path.setter
  145. def path(self, value):
  146. tmp_old_path = self._path
  147. self._path = value
  148. self.properties[self.__PATH_KEY] = value
  149. _replace_in_backup_file(old_file_path=tmp_old_path, new_file_path=self._path)
  150. def _read(self):
  151. if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_PANDAS:
  152. return self._read_as_pandas_dataframe()
  153. if self.properties[self._EXPOSED_TYPE_PROPERTY] == self._EXPOSED_TYPE_NUMPY:
  154. return self._read_as_numpy()
  155. return self._read_as()
  156. def _read_as(self):
  157. custom_class = self.properties[self._EXPOSED_TYPE_PROPERTY]
  158. with open(self._path, encoding=self.properties[self.__ENCODING_KEY]) as csvFile:
  159. res = list()
  160. if self.properties[self.__HAS_HEADER_PROPERTY]:
  161. reader = csv.DictReader(csvFile)
  162. for line in reader:
  163. res.append(custom_class(**line))
  164. else:
  165. reader = csv.reader(
  166. csvFile,
  167. )
  168. for line in reader:
  169. res.append(custom_class(*line))
  170. return res
  171. def _read_as_numpy(self) -> np.ndarray:
  172. return self._read_as_pandas_dataframe().to_numpy()
  173. def _read_as_pandas_dataframe(
  174. self, usecols: Optional[List[int]] = None, column_names: Optional[List[str]] = None
  175. ) -> pd.DataFrame:
  176. try:
  177. if self.properties[self.__HAS_HEADER_PROPERTY]:
  178. if column_names:
  179. return pd.read_csv(self._path, encoding=self.properties[self.__ENCODING_KEY])[column_names]
  180. return pd.read_csv(self._path, encoding=self.properties[self.__ENCODING_KEY])
  181. else:
  182. if usecols:
  183. return pd.read_csv(
  184. self._path, encoding=self.properties[self.__ENCODING_KEY], header=None, usecols=usecols
  185. )
  186. return pd.read_csv(self._path, encoding=self.properties[self.__ENCODING_KEY], header=None)
  187. except pd.errors.EmptyDataError:
  188. return pd.DataFrame()
  189. def _append(self, data: Any):
  190. if isinstance(data, pd.DataFrame):
  191. data.to_csv(self._path, mode="a", index=False, encoding=self.properties[self.__ENCODING_KEY], header=False)
  192. else:
  193. pd.DataFrame(data).to_csv(
  194. self._path, mode="a", index=False, encoding=self.properties[self.__ENCODING_KEY], header=False
  195. )
  196. def _write(self, data: Any):
  197. if isinstance(data, pd.DataFrame):
  198. data.to_csv(self._path, index=False, encoding=self.properties[self.__ENCODING_KEY])
  199. else:
  200. pd.DataFrame(data).to_csv(self._path, index=False, encoding=self.properties[self.__ENCODING_KEY])
  201. def write_with_column_names(self, data: Any, columns: Optional[List[str]] = None, job_id: Optional[JobId] = None):
  202. """Write a selection of columns.
  203. Parameters:
  204. data (Any): The data to write.
  205. columns (Optional[List[str]]): The list of column names to write.
  206. job_id (JobId^): An optional identifier of the writer.
  207. """
  208. if not columns:
  209. df = pd.DataFrame(data)
  210. else:
  211. df = pd.DataFrame(data, columns=columns)
  212. df.to_csv(self._path, index=False, encoding=self.properties[self.__ENCODING_KEY])
  213. self.track_edit(timestamp=datetime.now(), job_id=job_id)