mongo.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from datetime import datetime, timedelta
  12. from importlib import util
  13. from inspect import isclass
  14. from typing import Any, Dict, List, Optional, Set, Tuple, Union
  15. from .._version._version_manager_factory import _VersionManagerFactory
  16. from ..common._check_dependencies import _check_dependency_is_installed
  17. from ..common.scope import Scope
  18. if util.find_spec("pymongo"):
  19. from ..common._mongo_connector import _connect_mongodb
  20. from ..data.operator import JoinOperator, Operator
  21. from ..exceptions.exceptions import InvalidCustomDocument, MissingRequiredProperty
  22. from .data_node import DataNode
  23. from .data_node_id import DataNodeId, Edit
  24. class MongoCollectionDataNode(DataNode):
  25. """Data Node stored in a Mongo collection.
  26. The *properties* attribute must contain the following mandatory entries:
  27. - *db_name* (`str`): The database name.
  28. - *collection_name* (`str`): The collection in the database to read from and to write the data to.
  29. The *properties* attribute can also contain the following optional entries:
  30. - *custom_document* (`Any`): The custom document class to store, encode, and decode data when reading
  31. and writing to a Mongo collection.
  32. - *db_username* (`str`): The database username.
  33. - *db_password* (`str`): The database password.
  34. - *db_host* (`str`): The database host. The default value is *"localhost"*.
  35. - *db_port* (`str`): The database port. The default value is *27017*.
  36. - *db_driver* (`str`): The database driver.
  37. - *db_extra_args* (`Dict[str, Any]`): A dictionary of additional arguments to be passed into
  38. database connection string.
  39. """
  40. __STORAGE_TYPE = "mongo_collection"
  41. __DB_NAME_KEY = "db_name"
  42. __COLLECTION_KEY = "collection_name"
  43. __DB_USERNAME_KEY = "db_username"
  44. __DB_PASSWORD_KEY = "db_password"
  45. __DB_HOST_KEY = "db_host"
  46. __DB_PORT_KEY = "db_port"
  47. __DB_EXTRA_ARGS_KEY = "db_extra_args"
  48. __DB_DRIVER_KEY = "db_driver"
  49. __DB_HOST_DEFAULT = "localhost"
  50. __DB_PORT_DEFAULT = 27017
  51. _CUSTOM_DOCUMENT_PROPERTY = "custom_document"
  52. _REQUIRED_PROPERTIES: List[str] = [
  53. __DB_NAME_KEY,
  54. __COLLECTION_KEY,
  55. ]
  56. def __init__(
  57. self,
  58. config_id: str,
  59. scope: Scope,
  60. id: Optional[DataNodeId] = None,
  61. owner_id: Optional[str] = None,
  62. parent_ids: Optional[Set[str]] = None,
  63. last_edit_date: Optional[datetime] = None,
  64. edits: List[Edit] = None,
  65. version: str = None,
  66. validity_period: Optional[timedelta] = None,
  67. edit_in_progress: bool = False,
  68. editor_id: Optional[str] = None,
  69. editor_expiration_date: Optional[datetime] = None,
  70. properties: Dict = None,
  71. ) -> None:
  72. _check_dependency_is_installed("Mongo Data Node", "pymongo", "mongo")
  73. if properties is None:
  74. properties = {}
  75. required = self._REQUIRED_PROPERTIES
  76. if missing := set(required) - set(properties.keys()):
  77. raise MissingRequiredProperty(
  78. f"The following properties {', '.join(missing)} were not informed and are required."
  79. )
  80. self._check_custom_document(properties[self._CUSTOM_DOCUMENT_PROPERTY])
  81. super().__init__(
  82. config_id,
  83. scope,
  84. id,
  85. owner_id,
  86. parent_ids,
  87. last_edit_date,
  88. edits,
  89. version or _VersionManagerFactory._build_manager()._get_latest_version(),
  90. validity_period,
  91. edit_in_progress,
  92. editor_id,
  93. editor_expiration_date,
  94. **properties,
  95. )
  96. mongo_client = _connect_mongodb(
  97. db_host=properties.get(self.__DB_HOST_KEY, self.__DB_HOST_DEFAULT),
  98. db_port=properties.get(self.__DB_PORT_KEY, self.__DB_PORT_DEFAULT),
  99. db_username=properties.get(self.__DB_USERNAME_KEY, ""),
  100. db_password=properties.get(self.__DB_PASSWORD_KEY, ""),
  101. db_driver=properties.get(self.__DB_DRIVER_KEY, ""),
  102. db_extra_args=frozenset(properties.get(self.__DB_EXTRA_ARGS_KEY, {}).items()),
  103. )
  104. self.collection = mongo_client[properties.get(self.__DB_NAME_KEY, "")][
  105. properties.get(self.__COLLECTION_KEY, "")
  106. ]
  107. self.custom_document = properties[self._CUSTOM_DOCUMENT_PROPERTY]
  108. self._decoder = self._default_decoder
  109. custom_decoder = getattr(self.custom_document, "decode", None)
  110. if callable(custom_decoder):
  111. self._decoder = custom_decoder
  112. self._encoder = self._default_encoder
  113. custom_encoder = getattr(self.custom_document, "encode", None)
  114. if callable(custom_encoder):
  115. self._encoder = custom_encoder
  116. if not self._last_edit_date: # type: ignore
  117. self._last_edit_date = datetime.now()
  118. self._TAIPY_PROPERTIES.update(
  119. {
  120. self.__COLLECTION_KEY,
  121. self.__DB_NAME_KEY,
  122. self._CUSTOM_DOCUMENT_PROPERTY,
  123. self.__DB_USERNAME_KEY,
  124. self.__DB_PASSWORD_KEY,
  125. self.__DB_HOST_KEY,
  126. self.__DB_PORT_KEY,
  127. self.__DB_DRIVER_KEY,
  128. self.__DB_EXTRA_ARGS_KEY,
  129. }
  130. )
  131. @classmethod
  132. def storage_type(cls) -> str:
  133. """Return the storage type of the data node: "mongo_collection"."""
  134. return cls.__STORAGE_TYPE
  135. def filter(self, operators: Union[List, Tuple, None] = None, join_operator=JoinOperator.AND) -> List:
  136. cursor = self._read_by_query(operators, join_operator)
  137. return [self._decoder(row) for row in cursor]
  138. def _read(self):
  139. cursor = self._read_by_query()
  140. return [self._decoder(row) for row in cursor]
  141. def _read_by_query(self, operators: Union[List, Tuple, None] = None, join_operator=JoinOperator.AND):
  142. """Query from a Mongo collection, exclude the _id field"""
  143. if not operators:
  144. return self.collection.find()
  145. if not isinstance(operators, List):
  146. operators = [operators]
  147. conditions = []
  148. for key, value, operator in operators:
  149. if operator == Operator.EQUAL:
  150. conditions.append({key: value})
  151. elif operator == Operator.NOT_EQUAL:
  152. conditions.append({key: {"$ne": value}})
  153. elif operator == Operator.GREATER_THAN:
  154. conditions.append({key: {"$gt": value}})
  155. elif operator == Operator.GREATER_OR_EQUAL:
  156. conditions.append({key: {"$gte": value}})
  157. elif operator == Operator.LESS_THAN:
  158. conditions.append({key: {"$lt": value}})
  159. elif operator == Operator.LESS_OR_EQUAL:
  160. conditions.append({key: {"$lte": value}})
  161. query = {}
  162. if join_operator == JoinOperator.AND:
  163. query = {"$and": conditions}
  164. elif join_operator == JoinOperator.OR:
  165. query = {"$or": conditions}
  166. else:
  167. raise NotImplementedError(f"Join operator {join_operator} is not supported.")
  168. return self.collection.find(query)
  169. def _append(self, data) -> None:
  170. """Append data to a Mongo collection."""
  171. if not isinstance(data, list):
  172. data = [data]
  173. if len(data) == 0:
  174. return
  175. if isinstance(data[0], dict):
  176. self._insert_dicts(data)
  177. else:
  178. self._insert_dicts([self._encoder(row) for row in data])
  179. def _write(self, data) -> None:
  180. """Check data against a collection of types to handle insertion on the database.
  181. Arguments:
  182. data (Any): the data to write to the database.
  183. """
  184. if not isinstance(data, list):
  185. data = [data]
  186. if len(data) == 0:
  187. self.collection.drop()
  188. return
  189. if isinstance(data[0], dict):
  190. self._insert_dicts(data, drop=True)
  191. else:
  192. self._insert_dicts([self._encoder(row) for row in data], drop=True)
  193. def _insert_dicts(self, data: List[Dict], drop=False) -> None:
  194. """
  195. This method will insert data contained in a list of dictionaries into a collection.
  196. Arguments:
  197. data (List[Dict]): a list of dictionaries
  198. drop (bool): drop the collection before inserting the data to overwrite the data in the collection.
  199. """
  200. if drop:
  201. self.collection.drop()
  202. self.collection.insert_many(data)
  203. def _check_custom_document(self, custom_document):
  204. if not isclass(custom_document):
  205. raise InvalidCustomDocument(
  206. f"Invalid custom document of {custom_document}. Only custom class are supported."
  207. )
  208. def _default_decoder(self, document: Dict) -> Any:
  209. """Decode a Mongo dictionary to a custom document object for reading.
  210. Arguments:
  211. document (Dict): the document dictionary return by Mongo query.
  212. Returns:
  213. A custom document object.
  214. """
  215. return self.custom_document(**document)
  216. def _default_encoder(self, document_object: Any) -> Dict:
  217. """Encode a custom document object to a dictionary for writing to MongoDB.
  218. Arguments:
  219. document_object: the custom document class.
  220. Returns:
  221. The document dictionary.
  222. """
  223. return document_object.__dict__