excel.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from datetime import datetime, timedelta
  12. from typing import Any, Dict, List, Optional, Set, Union
  13. import numpy as np
  14. import pandas as pd
  15. from openpyxl import load_workbook
  16. from taipy.config.common.scope import Scope
  17. from .._entity._reload import _Reloader
  18. from .._version._version_manager_factory import _VersionManagerFactory
  19. from ..exceptions.exceptions import ExposedTypeLengthMismatch, NonExistingExcelSheet, SheetNameLengthMismatch
  20. from ..job.job_id import JobId
  21. from ._file_datanode_mixin import _FileDataNodeMixin
  22. from ._tabular_datanode_mixin import _TabularDataNodeMixin
  23. from .data_node import DataNode
  24. from .data_node_id import DataNodeId, Edit
  25. class ExcelDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
  26. """Data Node stored as an Excel file.
  27. The Excel file format is _xlsx_.
  28. Attributes:
  29. config_id (str): Identifier of this data node configuration. It must be a valid Python
  30. identifier.
  31. scope (Scope^): The scope of this data node.
  32. id (str): The unique identifier of this data node.
  33. owner_id (str): The identifier of the owner (sequence_id, scenario_id, cycle_id) or
  34. `None`.
  35. parent_ids (Optional[Set[str]]): The identifiers of the parent tasks or `None`.
  36. last_edit_date (datetime): The date and time of the last modification.
  37. edits (List[Edit^]): The ordered list of edits for that job.
  38. version (str): The string indicates the application version of the data node to instantiate. If not provided,
  39. the current version is used.
  40. validity_period (Optional[timedelta]): The duration implemented as a timedelta since the last edit date for
  41. which the data node can be considered up-to-date. Once the validity period has passed, the data node is
  42. considered stale and relevant tasks will run even if they are skippable (see the
  43. [Task management page](../core/entities/task-mgt.md) for more details).
  44. If _validity_period_ is set to `None`, the data node is always up-to-date.
  45. edit_in_progress (bool): True if a task computing the data node has been submitted
  46. and not completed yet. False otherwise.
  47. editor_id (Optional[str]): The identifier of the user who is currently editing the data node.
  48. editor_expiration_date (Optional[datetime]): The expiration date of the editor lock.
  49. path (str): The path to the Excel file.
  50. properties (dict[str, Any]): A dictionary of additional properties. The _properties_
  51. must have a _"default_path"_ or _"path"_ entry with the path of the Excel file:
  52. - _"default_path"_ `(str)`: The path of the Excel file.\n
  53. - _"has_header"_ `(bool)`: If True, indicates that the Excel file has a header.\n
  54. - _"sheet_name"_ `(Union[List[str], str])`: The list of sheet names to be used. This
  55. can be a unique name.\n
  56. - _"exposed_type"_: The exposed type of the data read from Excel file. The default value is `pandas`.\n
  57. """
  58. __STORAGE_TYPE = "excel"
  59. __SHEET_NAME_PROPERTY = "sheet_name"
  60. _REQUIRED_PROPERTIES: List[str] = []
  61. def __init__(
  62. self,
  63. config_id: str,
  64. scope: Scope,
  65. id: Optional[DataNodeId] = None,
  66. owner_id: Optional[str] = None,
  67. parent_ids: Optional[Set[str]] = None,
  68. last_edit_date: Optional[datetime] = None,
  69. edits: List[Edit] = None,
  70. version: str = None,
  71. validity_period: Optional[timedelta] = None,
  72. edit_in_progress: bool = False,
  73. editor_id: Optional[str] = None,
  74. editor_expiration_date: Optional[datetime] = None,
  75. properties: Dict = None,
  76. ) -> None:
  77. self.id = id or self._new_id(config_id)
  78. if properties is None:
  79. properties = {}
  80. if self.__SHEET_NAME_PROPERTY not in properties.keys():
  81. properties[self.__SHEET_NAME_PROPERTY] = None
  82. if self._HAS_HEADER_PROPERTY not in properties.keys():
  83. properties[self._HAS_HEADER_PROPERTY] = True
  84. properties[self._EXPOSED_TYPE_PROPERTY] = _TabularDataNodeMixin._get_valid_exposed_type(properties)
  85. self._check_exposed_type(properties[self._EXPOSED_TYPE_PROPERTY])
  86. default_value = properties.pop(self._DEFAULT_DATA_KEY, None)
  87. _FileDataNodeMixin.__init__(self, properties)
  88. _TabularDataNodeMixin.__init__(self, **properties)
  89. DataNode.__init__(
  90. self,
  91. config_id,
  92. scope,
  93. self.id,
  94. owner_id,
  95. parent_ids,
  96. last_edit_date,
  97. edits,
  98. version or _VersionManagerFactory._build_manager()._get_latest_version(),
  99. validity_period,
  100. edit_in_progress,
  101. editor_id,
  102. editor_expiration_date,
  103. **properties,
  104. )
  105. with _Reloader():
  106. self._write_default_data(default_value)
  107. self._TAIPY_PROPERTIES.update(
  108. {
  109. self._PATH_KEY,
  110. self._DEFAULT_PATH_KEY,
  111. self._DEFAULT_DATA_KEY,
  112. self._IS_GENERATED_KEY,
  113. self._HAS_HEADER_PROPERTY,
  114. self._EXPOSED_TYPE_PROPERTY,
  115. self.__SHEET_NAME_PROPERTY,
  116. }
  117. )
  118. @classmethod
  119. def storage_type(cls) -> str:
  120. return cls.__STORAGE_TYPE
  121. @staticmethod
  122. def _check_exposed_type(exposed_type):
  123. if isinstance(exposed_type, str):
  124. _TabularDataNodeMixin._check_exposed_type(exposed_type)
  125. elif isinstance(exposed_type, list):
  126. for t in exposed_type:
  127. _TabularDataNodeMixin._check_exposed_type(t)
  128. elif isinstance(exposed_type, dict):
  129. for t in exposed_type.values():
  130. _TabularDataNodeMixin._check_exposed_type(t)
  131. def _read(self):
  132. return self._read_from_path()
  133. def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
  134. if path is None:
  135. path = self._path
  136. exposed_type = self.properties[self._EXPOSED_TYPE_PROPERTY]
  137. if exposed_type == self._EXPOSED_TYPE_PANDAS:
  138. return self._read_as_pandas_dataframe(path=path)
  139. if exposed_type == self._EXPOSED_TYPE_NUMPY:
  140. return self._read_as_numpy(path=path)
  141. return self._read_as(path=path)
  142. def _read_sheet_with_exposed_type(
  143. self, path: str, sheet_exposed_type: str, sheet_name: str
  144. ) -> Optional[Union[np.ndarray, pd.DataFrame]]:
  145. if sheet_exposed_type == self._EXPOSED_TYPE_NUMPY:
  146. return self._read_as_numpy(path, sheet_name)
  147. elif sheet_exposed_type == self._EXPOSED_TYPE_PANDAS:
  148. return self._read_as_pandas_dataframe(path, sheet_name)
  149. return None
  150. def _read_as(self, path: str):
  151. try:
  152. properties = self.properties
  153. excel_file = load_workbook(path)
  154. exposed_type = properties[self._EXPOSED_TYPE_PROPERTY]
  155. work_books = {}
  156. sheet_names = excel_file.sheetnames
  157. user_provided_sheet_names = properties.get(self.__SHEET_NAME_PROPERTY) or []
  158. if not isinstance(user_provided_sheet_names, (list, set, tuple)):
  159. user_provided_sheet_names = [user_provided_sheet_names]
  160. provided_sheet_names = user_provided_sheet_names or sheet_names
  161. for sheet_name in provided_sheet_names:
  162. if sheet_name not in sheet_names:
  163. raise NonExistingExcelSheet(sheet_name, path)
  164. if isinstance(exposed_type, List):
  165. if len(provided_sheet_names) != len(exposed_type):
  166. raise ExposedTypeLengthMismatch(
  167. f"Expected {len(provided_sheet_names)} exposed types, got " f"{len(exposed_type)}"
  168. )
  169. for i, sheet_name in enumerate(provided_sheet_names):
  170. work_sheet = excel_file[sheet_name]
  171. sheet_exposed_type = exposed_type
  172. if not isinstance(sheet_exposed_type, str):
  173. if isinstance(exposed_type, dict):
  174. sheet_exposed_type = exposed_type.get(sheet_name, self._EXPOSED_TYPE_PANDAS)
  175. elif isinstance(exposed_type, List):
  176. sheet_exposed_type = exposed_type[i]
  177. if isinstance(sheet_exposed_type, str):
  178. sheet_data = self._read_sheet_with_exposed_type(path, sheet_exposed_type, sheet_name)
  179. if sheet_data is not None:
  180. work_books[sheet_name] = sheet_data
  181. continue
  182. res = [[col.value for col in row] for row in work_sheet.rows]
  183. if properties[self._HAS_HEADER_PROPERTY] and res:
  184. header = res.pop(0)
  185. for i, row in enumerate(res):
  186. res[i] = sheet_exposed_type(**dict([[h, r] for h, r in zip(header, row)]))
  187. else:
  188. for i, row in enumerate(res):
  189. res[i] = sheet_exposed_type(*row)
  190. work_books[sheet_name] = res
  191. finally:
  192. excel_file.close()
  193. if len(user_provided_sheet_names) == 1:
  194. return work_books[user_provided_sheet_names[0]]
  195. return work_books
  196. def _read_as_numpy(self, path: str, sheet_names=None):
  197. sheets = self._read_as_pandas_dataframe(path=path, sheet_names=sheet_names)
  198. if isinstance(sheets, dict):
  199. return {sheet_name: df.to_numpy() for sheet_name, df in sheets.items()}
  200. return sheets.to_numpy()
  201. def _do_read_excel(
  202. self, path: str, sheet_names, kwargs
  203. ) -> Union[Dict[Union[int, str], pd.DataFrame], pd.DataFrame]:
  204. return pd.read_excel(path, sheet_name=sheet_names, **kwargs)
  205. def __get_sheet_names_and_header(self, sheet_names):
  206. kwargs = {}
  207. properties = self.properties
  208. if sheet_names is None:
  209. sheet_names = properties[self.__SHEET_NAME_PROPERTY]
  210. if not properties[self._HAS_HEADER_PROPERTY]:
  211. kwargs["header"] = None
  212. return sheet_names, kwargs
  213. def _read_as_pandas_dataframe(
  214. self, path: str, sheet_names=None
  215. ) -> Union[Dict[Union[int, str], pd.DataFrame], pd.DataFrame]:
  216. sheet_names, kwargs = self.__get_sheet_names_and_header(sheet_names)
  217. try:
  218. return self._do_read_excel(path, sheet_names, kwargs)
  219. except pd.errors.EmptyDataError:
  220. return pd.DataFrame()
  221. def _append_excel_with_single_sheet(self, append_excel_fct, *args, **kwargs):
  222. sheet_name = self.properties.get(self.__SHEET_NAME_PROPERTY)
  223. with pd.ExcelWriter(self._path, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
  224. if sheet_name:
  225. if not isinstance(sheet_name, str):
  226. sheet_name = sheet_name[0]
  227. append_excel_fct(
  228. writer, *args, **kwargs, sheet_name=sheet_name, startrow=writer.sheets[sheet_name].max_row
  229. )
  230. else:
  231. sheet_name = list(writer.sheets.keys())[0]
  232. append_excel_fct(writer, *args, **kwargs, startrow=writer.sheets[sheet_name].max_row)
  233. def _set_column_if_dataframe(self, data: Any, columns) -> Union[pd.DataFrame, Any]:
  234. if isinstance(data, pd.DataFrame):
  235. data.columns = pd.Index(columns, dtype="object")
  236. return data
  237. def _append_excel_with_multiple_sheets(self, data: Any, columns: List[str] = None):
  238. with pd.ExcelWriter(self._path, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
  239. # Each key stands for a sheet name
  240. for sheet_name in data.keys():
  241. if isinstance(data[sheet_name], np.ndarray):
  242. df = pd.DataFrame(data[sheet_name])
  243. else:
  244. df = data[sheet_name]
  245. if columns:
  246. df = self._set_column_if_dataframe(df, columns)
  247. df.to_excel(
  248. writer, sheet_name=sheet_name, index=False, header=False, startrow=writer.sheets[sheet_name].max_row
  249. )
  250. def _append(self, data: Any):
  251. from importlib.metadata import version
  252. if version("pandas") < "1.4":
  253. raise ImportError("The append method is only available for pandas version 1.4 or higher.")
  254. if isinstance(data, Dict) and all(isinstance(x, (pd.DataFrame, np.ndarray)) for x in data.values()):
  255. self._append_excel_with_multiple_sheets(data)
  256. elif isinstance(data, pd.DataFrame):
  257. self._append_excel_with_single_sheet(data.to_excel, index=False, header=False)
  258. else:
  259. self._append_excel_with_single_sheet(pd.DataFrame(data).to_excel, index=False, header=False)
  260. def _write_excel_with_single_sheet(self, write_excel_fct, *args, **kwargs):
  261. if sheet_name := self.properties.get(self.__SHEET_NAME_PROPERTY):
  262. if not isinstance(sheet_name, str):
  263. if len(sheet_name) > 1:
  264. raise SheetNameLengthMismatch
  265. else:
  266. sheet_name = sheet_name[0]
  267. write_excel_fct(*args, **kwargs, sheet_name=sheet_name)
  268. else:
  269. write_excel_fct(*args, **kwargs)
  270. def _write_excel_with_multiple_sheets(self, data: Any, columns: List[str] = None):
  271. with pd.ExcelWriter(self._path) as writer:
  272. # Each key stands for a sheet name
  273. properties = self.properties
  274. for key in data.keys():
  275. df = self._convert_data_to_dataframe(properties[self._EXPOSED_TYPE_PROPERTY], data[key])
  276. if columns:
  277. df = self._set_column_if_dataframe(df, columns)
  278. df.to_excel(writer, key, index=False, header=properties[self._HAS_HEADER_PROPERTY] or False)
  279. def _write(self, data: Any):
  280. if isinstance(data, Dict):
  281. return self._write_excel_with_multiple_sheets(data)
  282. else:
  283. properties = self.properties
  284. data = self._convert_data_to_dataframe(properties[self._EXPOSED_TYPE_PROPERTY], data)
  285. self._write_excel_with_single_sheet(
  286. data.to_excel, self._path, index=False, header=properties[self._HAS_HEADER_PROPERTY] or None
  287. )
  288. def write_with_column_names(self, data: Any, columns: List[str] = None, job_id: Optional[JobId] = None):
  289. """Write a set of columns.
  290. Parameters:
  291. data (Any): The data to write.
  292. columns (List[str]): The list of column names to write.
  293. job_id (JobId^): An optional identifier of the writer.
  294. """
  295. if isinstance(data, Dict) and all(isinstance(x, (pd.DataFrame, np.ndarray)) for x in data.values()):
  296. self._write_excel_with_multiple_sheets(data, columns=columns)
  297. else:
  298. df = pd.DataFrame(data)
  299. if columns:
  300. df = self._set_column_if_dataframe(df, columns)
  301. self._write_excel_with_single_sheet(df.to_excel, self.path, index=False)
  302. self.track_edit(timestamp=datetime.now(), job_id=job_id)