浏览代码

refactor: allow expanding read/write FileBasedDataNode with custom path

trgiangdo 2 周之前
父节点
当前提交
95226fa8b9

+ 17 - 1
taipy/core/data/_file_datanode_mixin.py

@@ -196,7 +196,7 @@ class _FileDataNodeMixin:
     def _write_default_data(self, default_value: Any):
     def _write_default_data(self, default_value: Any):
         if default_value is not None and not os.path.exists(self._path):
         if default_value is not None and not os.path.exists(self._path):
             self._write(default_value)  # type: ignore[attr-defined]
             self._write(default_value)  # type: ignore[attr-defined]
-            self._last_edit_date = DataNode._get_last_modified_datetime(self._path) or datetime.now()
+            self._last_edit_date = self._get_last_modified_datetime() or datetime.now()
             self._edits.append(  # type: ignore[attr-defined]
             self._edits.append(  # type: ignore[attr-defined]
                 Edit(
                 Edit(
                     {
                     {
@@ -210,6 +210,22 @@ class _FileDataNodeMixin:
         if not self._last_edit_date and isfile(self._path):
         if not self._last_edit_date and isfile(self._path):
             self._last_edit_date = datetime.now()
             self._last_edit_date = datetime.now()
 
 
+    def _get_last_modified_datetime(self) -> Optional[datetime]:
+        if self._path and os.path.isfile(self._path):
+            return datetime.fromtimestamp(os.path.getmtime(self._path))
+
+        last_modified_datetime = None
+        if self._path and os.path.isdir(self._path):
+            for filename in os.listdir(self._path):
+                filepath = os.path.join(self._path, filename)
+                if os.path.isfile(filepath):
+                    file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
+
+                    if last_modified_datetime is None or file_mtime > last_modified_datetime:
+                        last_modified_datetime = file_mtime
+
+        return last_modified_datetime
+
     def _build_path(self, storage_type) -> str:
     def _build_path(self, storage_type) -> str:
         folder = f"{storage_type}s"
         folder = f"{storage_type}s"
         dir_path = pathlib.Path(Config.core.storage_folder) / folder
         dir_path = pathlib.Path(Config.core.storage_folder) / folder

+ 6 - 3
taipy/core/data/csv.py

@@ -25,7 +25,7 @@ from .data_node import DataNode
 from .data_node_id import DataNodeId, Edit
 from .data_node_id import DataNodeId, Edit
 
 
 
 
-class CSVDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
+class CSVDataNode(_FileDataNodeMixin, DataNode, _TabularDataNodeMixin):
     """Data Node stored as a CSV file.
     """Data Node stored as a CSV file.
 
 
     The *properties* attribute can contain the following optional entries:
     The *properties* attribute can contain the following optional entries:
@@ -122,7 +122,7 @@ class CSVDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
             columns (Optional[List[str]]): The list of column names to write.
             columns (Optional[List[str]]): The list of column names to write.
             editor_id (str): An optional identifier of the writer.
             editor_id (str): An optional identifier of the writer.
         """
         """
-        self._write(data, columns)
+        self._write_to_path(self._path, data, columns)
         self.track_edit(editor_id=editor_id, timestamp=datetime.now())
         self.track_edit(editor_id=editor_id, timestamp=datetime.now())
 
 
     def _read(self):
     def _read(self):
@@ -178,6 +178,9 @@ class CSVDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
         data.to_csv(self._path, mode="a", index=False, encoding=properties[self.__ENCODING_KEY], header=False)
         data.to_csv(self._path, mode="a", index=False, encoding=properties[self.__ENCODING_KEY], header=False)
 
 
     def _write(self, data: Any, columns: Optional[List[str]] = None):
     def _write(self, data: Any, columns: Optional[List[str]] = None):
+        self._write_to_path(self._path, data, columns)
+
+    def _write_to_path(self, path: str, data: Any, columns: Optional[List[str]] = None):
         properties = self.properties
         properties = self.properties
         exposed_type = properties[self._EXPOSED_TYPE_PROPERTY]
         exposed_type = properties[self._EXPOSED_TYPE_PROPERTY]
         data = self._convert_data_to_dataframe(exposed_type, data)
         data = self._convert_data_to_dataframe(exposed_type, data)
@@ -186,7 +189,7 @@ class CSVDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
             data.columns = pd.Index(columns, dtype="object")
             data.columns = pd.Index(columns, dtype="object")
 
 
         data.to_csv(
         data.to_csv(
-            self._path,
+            path,
             index=False,
             index=False,
             encoding=properties[self.__ENCODING_KEY],
             encoding=properties[self.__ENCODING_KEY],
             header=properties[self._HAS_HEADER_PROPERTY],
             header=properties[self._HAS_HEADER_PROPERTY],

+ 22 - 31
taipy/core/data/data_node.py

@@ -10,7 +10,6 @@
 # specific language governing permissions and limitations under the License.
 # specific language governing permissions and limitations under the License.
 
 
 import functools
 import functools
-import os
 import typing
 import typing
 import uuid
 import uuid
 from abc import abstractmethod
 from abc import abstractmethod
@@ -211,7 +210,11 @@ class DataNode(_Entity, _Labeled):
     @_self_reload(_MANAGER_NAME)
     @_self_reload(_MANAGER_NAME)
     def last_edit_date(self) -> Optional[datetime]:
     def last_edit_date(self) -> Optional[datetime]:
         """The date and time of the last modification."""
         """The date and time of the last modification."""
-        last_modified_datetime = self._get_last_modified_datetime(self._properties.get(self._PATH_KEY, None))
+        try:
+            last_modified_datetime = self._get_last_modified_datetime()
+        except NotImplementedError:
+            last_modified_datetime = None
+
         if last_modified_datetime and last_modified_datetime > self._last_edit_date:  # type: ignore
         if last_modified_datetime and last_modified_datetime > self._last_edit_date:  # type: ignore
             return last_modified_datetime
             return last_modified_datetime
         else:
         else:
@@ -504,7 +507,10 @@ class DataNode(_Entity, _Labeled):
         if comment:
         if comment:
             edit[EDIT_COMMENT_KEY] = comment
             edit[EDIT_COMMENT_KEY] = comment
         if not timestamp:
         if not timestamp:
-            timestamp = self._get_last_modified_datetime(self._properties.get(self._PATH_KEY)) or datetime.now()
+            try:
+                timestamp = self._get_last_modified_datetime() or datetime.now()
+            except NotImplementedError:
+                timestamp = datetime.now()
         edit[EDIT_TIMESTAMP_KEY] = timestamp
         edit[EDIT_TIMESTAMP_KEY] = timestamp
         self.last_edit_date = edit.get(EDIT_TIMESTAMP_KEY)
         self.last_edit_date = edit.get(EDIT_TIMESTAMP_KEY)
         self._edits.append(typing.cast(Edit, edit))
         self._edits.append(typing.cast(Edit, edit))
@@ -707,38 +713,23 @@ class DataNode(_Entity, _Labeled):
         """Get user properties."""
         """Get user properties."""
         return {key: value for key, value in self.properties.items() if key not in self._TAIPY_PROPERTIES}
         return {key: value for key, value in self.properties.items() if key not in self._TAIPY_PROPERTIES}
 
 
-    @classmethod
-    def _get_last_modified_datetime(cls, path: Optional[str] = None) -> Optional[datetime]:
-        if path and os.path.isfile(path):
-            return datetime.fromtimestamp(os.path.getmtime(path))
-
-        last_modified_datetime = None
-        if path and os.path.isdir(path):
-            for filename in os.listdir(path):
-                filepath = os.path.join(path, filename)
-                if os.path.isfile(filepath):
-                    file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
-
-                    if last_modified_datetime is None or file_mtime > last_modified_datetime:
-                        last_modified_datetime = file_mtime
-
-        return last_modified_datetime
+    def _get_last_modified_datetime(self) -> Optional[datetime]:
+        raise NotImplementedError
 
 
     @staticmethod
     @staticmethod
     def _class_map():
     def _class_map():
-        def all_subclasses(cls):
-            subclasses = set(cls.__subclasses__())
-            for s in cls.__subclasses__():
-                subclasses.update(all_subclasses(s))
-            return subclasses
-
         class_map = {}
         class_map = {}
-        for c in all_subclasses(DataNode):
-            try:
-                if c.storage_type() is not None:
-                    class_map[c.storage_type()] = c
-            except NotImplementedError:
-                pass
+        classes_stack = [DataNode]
+        while classes_stack:
+            current_class = classes_stack.pop()
+            for subclass in current_class.__subclasses__():
+                try:
+                    if subclass.storage_type() is not None:
+                        class_map[subclass.storage_type()] = subclass
+                except NotImplementedError:
+                    pass
+
+                classes_stack.append(subclass)
 
 
         return class_map
         return class_map
 
 

+ 8 - 8
taipy/core/data/excel.py

@@ -26,7 +26,7 @@ from .data_node import DataNode
 from .data_node_id import DataNodeId, Edit
 from .data_node_id import DataNodeId, Edit
 
 
 
 
-class ExcelDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
+class ExcelDataNode(_FileDataNodeMixin, DataNode, _TabularDataNodeMixin):
     """Data Node stored as an Excel file.
     """Data Node stored as an Excel file.
 
 
     The Excel file format is _xlsx_.
     The Excel file format is _xlsx_.
@@ -126,7 +126,7 @@ class ExcelDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
             editor_id (Optional[str]): An optional identifier of the writer.
             editor_id (Optional[str]): An optional identifier of the writer.
         """
         """
         if isinstance(data, Dict) and all(isinstance(x, (pd.DataFrame, np.ndarray)) for x in data.values()):
         if isinstance(data, Dict) and all(isinstance(x, (pd.DataFrame, np.ndarray)) for x in data.values()):
-            self._write_excel_with_multiple_sheets(data, columns=columns)
+            self._write_excel_with_multiple_sheets(self._path, data, columns=columns)
         else:
         else:
             df = pd.DataFrame(data)
             df = pd.DataFrame(data)
             if columns:
             if columns:
@@ -306,19 +306,19 @@ class ExcelDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
         else:
         else:
             self._append_excel_with_single_sheet(pd.DataFrame(data).to_excel, index=False, header=False)
             self._append_excel_with_single_sheet(pd.DataFrame(data).to_excel, index=False, header=False)
 
 
-    def _write_excel_with_single_sheet(self, write_excel_fct, *args, **kwargs):
+    def _write_excel_with_single_sheet(self, write_excel_fct, path, *args, **kwargs):
         if sheet_name := self.properties.get(self.__SHEET_NAME_PROPERTY):
         if sheet_name := self.properties.get(self.__SHEET_NAME_PROPERTY):
             if not isinstance(sheet_name, str):
             if not isinstance(sheet_name, str):
                 if len(sheet_name) > 1:
                 if len(sheet_name) > 1:
                     raise SheetNameLengthMismatch
                     raise SheetNameLengthMismatch
                 else:
                 else:
                     sheet_name = sheet_name[0]
                     sheet_name = sheet_name[0]
-            write_excel_fct(*args, **kwargs, sheet_name=sheet_name)
+            write_excel_fct(path, *args, **kwargs, sheet_name=sheet_name)
         else:
         else:
-            write_excel_fct(*args, **kwargs)
+            write_excel_fct(path, *args, **kwargs)
 
 
-    def _write_excel_with_multiple_sheets(self, data: Any, columns: List[str] = None):
-        with pd.ExcelWriter(self._path) as writer:
+    def _write_excel_with_multiple_sheets(self, path: str, data: Any, columns: List[str] = None):
+        with pd.ExcelWriter(path) as writer:
             # Each key stands for a sheet name
             # Each key stands for a sheet name
             properties = self.properties
             properties = self.properties
             for key in data.keys():
             for key in data.keys():
@@ -331,7 +331,7 @@ class ExcelDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
 
 
     def _write(self, data: Any):
     def _write(self, data: Any):
         if isinstance(data, Dict):
         if isinstance(data, Dict):
-            return self._write_excel_with_multiple_sheets(data)
+            return self._write_excel_with_multiple_sheets(self._path, data)
         else:
         else:
             properties = self.properties
             properties = self.properties
             data = self._convert_data_to_dataframe(properties[self._EXPOSED_TYPE_PROPERTY], data)
             data = self._convert_data_to_dataframe(properties[self._EXPOSED_TYPE_PROPERTY], data)

+ 5 - 2
taipy/core/data/json.py

@@ -24,7 +24,7 @@ from .data_node import DataNode
 from .data_node_id import DataNodeId, Edit
 from .data_node_id import DataNodeId, Edit
 
 
 
 
-class JSONDataNode(DataNode, _FileDataNodeMixin):
+class JSONDataNode(_FileDataNodeMixin, DataNode):
     """Data Node stored as a JSON file.
     """Data Node stored as a JSON file.
 
 
     The *properties* attribute can contain the following optional entries:
     The *properties* attribute can contain the following optional entries:
@@ -154,7 +154,10 @@ class JSONDataNode(DataNode, _FileDataNodeMixin):
             json.dump(file_data, f, indent=4, cls=self._encoder)
             json.dump(file_data, f, indent=4, cls=self._encoder)
 
 
     def _write(self, data: Any):
     def _write(self, data: Any):
-        with open(self._path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f:  # type: ignore
+        self._write_to_path(self._path, data)
+
+    def _write_to_path(self, path: str, data: Any):
+        with open(path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f:  # type: ignore
             json.dump(data, f, indent=4, cls=self._encoder)
             json.dump(data, f, indent=4, cls=self._encoder)
 
 
 
 

+ 6 - 5
taipy/core/data/parquet.py

@@ -26,7 +26,7 @@ from .data_node import DataNode
 from .data_node_id import DataNodeId, Edit
 from .data_node_id import DataNodeId, Edit
 
 
 
 
-class ParquetDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
+class ParquetDataNode(_FileDataNodeMixin, DataNode, _TabularDataNodeMixin):
     """Data Node stored as a Parquet file.
     """Data Node stored as a Parquet file.
 
 
     The *properties* attribute can contain the following optional entries:
     The *properties* attribute can contain the following optional entries:
@@ -161,12 +161,13 @@ class ParquetDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
         """Return the storage type of the data node: "parquet"."""
         """Return the storage type of the data node: "parquet"."""
         return cls.__STORAGE_TYPE
         return cls.__STORAGE_TYPE
 
 
-    def _write_with_kwargs(self, data: Any, editor_id: Optional[str] = None, **write_kwargs):
+    def _write_to_path(self, path: str, data: Any, editor_id: Optional[str] = None, **write_kwargs):
         """Write the data referenced by this data node.
         """Write the data referenced by this data node.
 
 
         Keyword arguments here which are also present in the Data Node config will overwrite them.
         Keyword arguments here which are also present in the Data Node config will overwrite them.
 
 
         Arguments:
         Arguments:
+            path (str): The path to the Parquet file or directory.
             data (Any): The data to write.
             data (Any): The data to write.
             editor_id (str): An optional identifier of the writer.
             editor_id (str): An optional identifier of the writer.
             **write_kwargs (dict[str, any]): The keyword arguments passed to the function
             **write_kwargs (dict[str, any]): The keyword arguments passed to the function
@@ -186,7 +187,7 @@ class ParquetDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
 
 
         # Ensure that the columns are strings, otherwise writing will fail with pandas 1.3.5
         # Ensure that the columns are strings, otherwise writing will fail with pandas 1.3.5
         df.columns = df.columns.astype(str)
         df.columns = df.columns.astype(str)
-        df.to_parquet(self._path, **kwargs)
+        df.to_parquet(path, **kwargs)
 
 
     def read_with_kwargs(self, **read_kwargs):
     def read_with_kwargs(self, **read_kwargs):
         """Read data from this data node.
         """Read data from this data node.
@@ -243,7 +244,7 @@ class ParquetDataNode(DataNode, _FileDataNodeMixin, _TabularDataNodeMixin):
         return pd.read_parquet(path, **read_kwargs)
         return pd.read_parquet(path, **read_kwargs)
 
 
     def _append(self, data: Any):
     def _append(self, data: Any):
-        self._write_with_kwargs(data, engine="fastparquet", append=True)
+        self._write_to_path(self._path, data, engine="fastparquet", append=True)
 
 
     def _write(self, data: Any):
     def _write(self, data: Any):
-        self._write_with_kwargs(data)
+        self._write_to_path(self._path, data)

+ 5 - 2
taipy/core/data/pickle.py

@@ -21,7 +21,7 @@ from .data_node import DataNode
 from .data_node_id import DataNodeId, Edit
 from .data_node_id import DataNodeId, Edit
 
 
 
 
-class PickleDataNode(DataNode, _FileDataNodeMixin):
+class PickleDataNode(_FileDataNodeMixin, DataNode):
     """Data Node stored as a pickle file.
     """Data Node stored as a pickle file.
 
 
     The *properties* attribute can contain the following optional entries:
     The *properties* attribute can contain the following optional entries:
@@ -119,5 +119,8 @@ class PickleDataNode(DataNode, _FileDataNodeMixin):
             return pickle.load(pf)
             return pickle.load(pf)
 
 
     def _write(self, data):
     def _write(self, data):
-        with open(self._path, "wb") as pf:
+        self._write_to_path(self._path, data)
+
+    def _write_to_path(self, path: str, data: Any):
+        with open(path, "wb") as pf:
             pickle.dump(data, pf)
             pickle.dump(data, pf)

+ 1 - 1
tests/core/data/test_write_parquet_data_node.py

@@ -183,7 +183,7 @@ class TestWriteParquetDataNode:
             },
             },
         )
         )
         _DataManagerFactory._build_manager()._repository._save(dn)
         _DataManagerFactory._build_manager()._repository._save(dn)
-        dn._write_with_kwargs(df, compression=comp1)
+        dn._write_to_path(temp_file_path, df, compression=comp1)
         df.to_parquet(path=temp_file_2_path, compression=comp1, engine=engine)
         df.to_parquet(path=temp_file_2_path, compression=comp1, engine=engine)
         with open(temp_file_2_path, "rb") as tf:
         with open(temp_file_2_path, "rb") as tf:
             with pathlib.Path(temp_file_path).open("rb") as f:
             with pathlib.Path(temp_file_path).open("rb") as f: