_file_datanode_mixin.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import pathlib
  13. import shutil
  14. from datetime import datetime
  15. from os.path import isfile
  16. from typing import Any, Callable, Dict, Optional
  17. from taipy.config.config import Config
  18. from taipy.logger._taipy_logger import _TaipyLogger
  19. from .._entity._reload import _self_reload
  20. from ..reason import InvalidUploadFile, ReasonCollection, UploadFileCanNotBeRead
  21. from .data_node import DataNode
  22. from .data_node_id import Edit
  23. class _FileDataNodeMixin(object):
  24. """Mixin class designed to handle file-based data nodes
  25. (CSVDataNode, ParquetDataNode, ExcelDataNode, PickleDataNode, JSONDataNode, etc.)."""
  26. __EXTENSION_MAP = {"csv": "csv", "excel": "xlsx", "parquet": "parquet", "pickle": "p", "json": "json"}
  27. _DEFAULT_DATA_KEY = "default_data"
  28. _PATH_KEY = "path"
  29. _DEFAULT_PATH_KEY = "default_path"
  30. _IS_GENERATED_KEY = "is_generated"
  31. __logger = _TaipyLogger._get_logger()
  32. def __init__(self, properties: Dict) -> None:
  33. self._path: str = properties.get(self._PATH_KEY, properties.get(self._DEFAULT_PATH_KEY))
  34. self._is_generated: bool = properties.get(self._IS_GENERATED_KEY, self._path is None)
  35. self._last_edit_date: Optional[datetime] = None
  36. if self._path and ".data" in self._path:
  37. self._path = self._migrate_path(self.storage_type(), self._path) # type: ignore[attr-defined]
  38. if not self._path:
  39. self._path = self._build_path(self.storage_type()) # type: ignore[attr-defined]
  40. properties[self._IS_GENERATED_KEY] = self._is_generated
  41. properties[self._PATH_KEY] = self._path
  42. def _write_default_data(self, default_value: Any):
  43. if default_value is not None and not os.path.exists(self._path):
  44. self._write(default_value) # type: ignore[attr-defined]
  45. self._last_edit_date = DataNode._get_last_modified_datetime(self._path) or datetime.now()
  46. self._edits.append( # type: ignore[attr-defined]
  47. Edit(
  48. {
  49. "timestamp": self._last_edit_date,
  50. "editor": "TAIPY",
  51. "comment": "Default data written.",
  52. }
  53. )
  54. )
  55. if not self._last_edit_date and isfile(self._path):
  56. self._last_edit_date = datetime.now()
  57. @property # type: ignore
  58. @_self_reload(DataNode._MANAGER_NAME)
  59. def is_generated(self) -> bool:
  60. return self._is_generated
  61. @property # type: ignore
  62. @_self_reload(DataNode._MANAGER_NAME)
  63. def path(self) -> Any:
  64. return self._path
  65. @path.setter
  66. def path(self, value):
  67. self._path = value
  68. self.properties[self._PATH_KEY] = value
  69. self.properties[self._IS_GENERATED_KEY] = False
  70. def _build_path(self, storage_type) -> str:
  71. folder = f"{storage_type}s"
  72. dir_path = pathlib.Path(Config.core.storage_folder) / folder
  73. if not dir_path.exists():
  74. dir_path.mkdir(parents=True, exist_ok=True)
  75. return str(dir_path / f"{self.id}.{self.__EXTENSION_MAP.get(storage_type)}") # type: ignore[attr-defined]
  76. def _migrate_path(self, storage_type, old_path) -> str:
  77. new_path = self._build_path(storage_type)
  78. if os.path.exists(old_path):
  79. shutil.move(old_path, new_path)
  80. return new_path
  81. def _get_downloadable_path(self) -> str:
  82. """Get the downloadable path of the file data of the data node.
  83. Returns:
  84. The downloadable path of the file data of the data node if it exists, otherwise an empty string.
  85. """
  86. if os.path.exists(self.path) and isfile(self._path):
  87. return self.path
  88. return ""
  89. def _upload(self, path: str, upload_checker: Optional[Callable[[str, Any], bool]] = None) -> ReasonCollection:
  90. """Upload a file data to the data node.
  91. Parameters:
  92. path (str): The path of the file to upload to the data node.
  93. upload_checker (Optional[Callable[[str, Any], bool]]): A function to check if the upload is allowed.
  94. The function takes the title of the upload data and the data itself as arguments and returns
  95. True if the upload is allowed, otherwise False.
  96. Returns:
  97. True if the upload was successful, otherwise False.
  98. """
  99. from ._data_manager_factory import _DataManagerFactory
  100. reason_collection = ReasonCollection()
  101. upload_path = pathlib.Path(path)
  102. try:
  103. upload_data = self._read_from_path(str(upload_path))
  104. except Exception as err:
  105. self.__logger.error(f"Error while uploading {upload_path.name} to data node {self.id}:") # type: ignore[attr-defined]
  106. self.__logger.error(f"Error: {err}")
  107. reason_collection._add_reason(self.id, UploadFileCanNotBeRead(upload_path.name, self.id)) # type: ignore[attr-defined]
  108. return reason_collection
  109. if upload_checker is not None:
  110. if not upload_checker(upload_path.name, upload_data):
  111. reason_collection._add_reason(self.id, InvalidUploadFile(upload_path.name, self.id)) # type: ignore[attr-defined]
  112. return reason_collection
  113. shutil.copy(upload_path, self.path)
  114. self.track_edit(timestamp=datetime.now()) # type: ignore[attr-defined]
  115. self.unlock_edit() # type: ignore[attr-defined]
  116. _DataManagerFactory._build_manager()._set(self) # type: ignore[arg-type]
  117. return reason_collection
  118. def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
  119. raise NotImplementedError