json.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import dataclasses
  12. import json
  13. from datetime import date, datetime, timedelta
  14. from enum import Enum
  15. from pydoc import locate
  16. from typing import Any, Dict, List, Optional, Set
  17. from .._entity._reload import _Reloader, _self_reload
  18. from .._version._version_manager_factory import _VersionManagerFactory
  19. from ..common.scope import Scope
  20. from ._file_datanode_mixin import _FileDataNodeMixin
  21. from .data_node import DataNode
  22. from .data_node_id import DataNodeId, Edit
  23. class JSONDataNode(DataNode, _FileDataNodeMixin):
  24. """Data Node stored as a JSON file.
  25. The *properties* attribute can contain the following optional entries:
  26. - *default_path* (`str`): The default path of the JSON file used at the instantiation of
  27. the data node.
  28. - *default_data* (`Any`): The default data of the data node. It is used at the data node
  29. instantiation to write the data to the JSON file.
  30. - *encoding* (`str`): The encoding of the JSON file. The default value is `utf-8`.\n
  31. """
  32. __STORAGE_TYPE = "json"
  33. __ENCODING_KEY = "encoding"
  34. _ENCODER_KEY = "encoder"
  35. _DECODER_KEY = "decoder"
  36. _REQUIRED_PROPERTIES: List[str] = []
  37. def __init__(
  38. self,
  39. config_id: str,
  40. scope: Scope,
  41. id: Optional[DataNodeId] = None,
  42. owner_id: Optional[str] = None,
  43. parent_ids: Optional[Set[str]] = None,
  44. last_edit_date: Optional[datetime] = None,
  45. edits: Optional[List[Edit]] = None,
  46. version: Optional[str] = None,
  47. validity_period: Optional[timedelta] = None,
  48. edit_in_progress: bool = False,
  49. editor_id: Optional[str] = None,
  50. editor_expiration_date: Optional[datetime] = None,
  51. properties: Optional[Dict] = None,
  52. ) -> None:
  53. self.id = id or self._new_id(config_id)
  54. if properties is None:
  55. properties = {}
  56. if self.__ENCODING_KEY not in properties.keys():
  57. properties[self.__ENCODING_KEY] = "utf-8"
  58. default_value = properties.pop(self._DEFAULT_DATA_KEY, None)
  59. _FileDataNodeMixin.__init__(self, properties)
  60. DataNode.__init__(
  61. self,
  62. config_id,
  63. scope,
  64. self.id,
  65. owner_id,
  66. parent_ids,
  67. last_edit_date,
  68. edits,
  69. version or _VersionManagerFactory._build_manager()._get_latest_version(),
  70. validity_period,
  71. edit_in_progress,
  72. editor_id,
  73. editor_expiration_date,
  74. **properties,
  75. )
  76. self._decoder = self._properties.get(self._DECODER_KEY, _DefaultJSONDecoder)
  77. self._encoder = self._properties.get(self._ENCODER_KEY, _DefaultJSONEncoder)
  78. with _Reloader():
  79. self._write_default_data(default_value)
  80. self._TAIPY_PROPERTIES.update(
  81. {
  82. self._PATH_KEY,
  83. self._DEFAULT_PATH_KEY,
  84. self._DEFAULT_DATA_KEY,
  85. self._IS_GENERATED_KEY,
  86. self.__ENCODING_KEY,
  87. self._ENCODER_KEY,
  88. self._DECODER_KEY,
  89. }
  90. )
  91. @classmethod
  92. def storage_type(cls) -> str:
  93. """Return the storage type of the data node: "json"."""
  94. return cls.__STORAGE_TYPE
  95. @property # type: ignore
  96. @_self_reload(DataNode._MANAGER_NAME)
  97. def encoder(self) -> json.JSONEncoder:
  98. """The JSON encoder that is used to write into the JSON file."""
  99. return self._encoder
  100. @encoder.setter
  101. def encoder(self, encoder: json.JSONEncoder) -> None:
  102. self.properties[self._ENCODER_KEY] = encoder
  103. @property # type: ignore
  104. @_self_reload(DataNode._MANAGER_NAME)
  105. def decoder(self) -> json.JSONDecoder:
  106. """The JSON decoder that is used to read from the JSON file."""
  107. return self._decoder
  108. @decoder.setter
  109. def decoder(self, decoder: json.JSONDecoder) -> None:
  110. self.properties[self._DECODER_KEY] = decoder
  111. def _read(self):
  112. return self._read_from_path()
  113. def _read_from_path(self, path: Optional[str] = None, **read_kwargs) -> Any:
  114. if path is None:
  115. path = self._path
  116. with open(path, "r", encoding=self.properties[self.__ENCODING_KEY]) as f:
  117. return json.load(f, cls=self._decoder)
  118. def _append(self, data: Any):
  119. with open(self._path, "r+", encoding=self.properties[self.__ENCODING_KEY]) as f:
  120. file_data = json.load(f, cls=self._decoder)
  121. if isinstance(file_data, List):
  122. if isinstance(data, List):
  123. file_data.extend(data)
  124. else:
  125. file_data.append(data)
  126. elif isinstance(data, Dict):
  127. file_data.update(data)
  128. f.seek(0)
  129. json.dump(file_data, f, indent=4, cls=self._encoder)
  130. def _write(self, data: Any):
  131. with open(self._path, "w", encoding=self.properties[self.__ENCODING_KEY]) as f: # type: ignore
  132. json.dump(data, f, indent=4, cls=self._encoder)
  133. class _DefaultJSONEncoder(json.JSONEncoder):
  134. def default(self, o):
  135. if isinstance(o, Enum):
  136. return {
  137. "__type__": f"Enum-{o.__class__.__module__}-{o.__class__.__qualname__}-{o.name}",
  138. "__value__": o.value,
  139. }
  140. if isinstance(o, (datetime, date)):
  141. return {"__type__": "Datetime", "__value__": o.isoformat()}
  142. if dataclasses.is_dataclass(o):
  143. return {
  144. "__type__": f"dataclass-{o.__class__.__module__}-{o.__class__.__qualname__}",
  145. "__value__": dataclasses.asdict(o),
  146. }
  147. return super().default(o)
  148. class _DefaultJSONDecoder(json.JSONDecoder):
  149. def __init__(self, *args, **kwargs):
  150. json.JSONDecoder.__init__(self, *args, **kwargs, object_hook=self.object_hook)
  151. def object_hook(self, source):
  152. if _type := source.get("__type__"):
  153. if _type.startswith("Enum"):
  154. _, module, classname, name = _type.split("-")
  155. _enum_class = locate(f"{module}.{classname}")
  156. return _enum_class[name]
  157. if _type == "Datetime":
  158. return datetime.fromisoformat(source.get("__value__"))
  159. if _type.startswith("dataclass"):
  160. _, module, classname = _type.split("-")
  161. _data_class = locate(f"{module}.{classname}")
  162. return _data_class(**source.get("__value__"))
  163. return source