sql_table.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from datetime import datetime, timedelta
  12. from typing import Any, Dict, List, Optional, Set, Union
  13. import pandas as pd
  14. from sqlalchemy import MetaData, Table
  15. from taipy.config.common.scope import Scope
  16. from .._version._version_manager_factory import _VersionManagerFactory
  17. from ..exceptions.exceptions import MissingRequiredProperty
  18. from ._abstract_sql import _AbstractSQLDataNode
  19. from .data_node_id import DataNodeId, Edit
  20. class SQLTableDataNode(_AbstractSQLDataNode):
  21. """Data Node stored in a SQL table.
  22. Attributes:
  23. config_id (str): Identifier of the data node configuration. It must be a valid Python
  24. identifier.
  25. scope (Scope^): The scope of this data node.
  26. id (str): The unique identifier of this data node.
  27. owner_id (str): The identifier of the owner (sequence_id, scenario_id, cycle_id) or
  28. None.
  29. parent_ids (Optional[Set[str]]): The identifiers of the parent tasks or `None`.
  30. last_edit_date (datetime): The date and time of the last modification.
  31. edits (List[Edit^]): The ordered list of edits for that job.
  32. version (str): The string indicates the application version of the data node to instantiate. If not provided,
  33. the current version is used.
  34. validity_period (Optional[timedelta]): The duration implemented as a timedelta since the last edit date for
  35. which the data node can be considered up-to-date. Once the validity period has passed, the data node is
  36. considered stale and relevant tasks will run even if they are skippable (see the
  37. [Task management page](../core/entities/task-mgt.md) for more details).
  38. If _validity_period_ is set to `None`, the data node is always up-to-date.
  39. edit_in_progress (bool): True if a task computing the data node has been submitted
  40. and not completed yet. False otherwise.
  41. editor_id (Optional[str]): The identifier of the user who is currently editing the data node.
  42. editor_expiration_date (Optional[datetime]): The expiration date of the editor lock.
  43. properties (dict[str, Any]): A dictionary of additional properties. Note that the
  44. _properties_ parameter must at least contain an entry for _"db_name"_, _"db_engine"_, _"table_name"_:
  45. - _"db_name"_ `(str)`: The database name, or the name of the SQLite database file.
  46. - _"db_engine"_ `(str)`: The database engine. For now, the accepted values are _"sqlite"_, _"mssql"_,
  47. _"mysql"_, or _"postgresql"_.
  48. - _"table_name"_ `(str)`: The name of the SQL table.
  49. - _"db_username"_ `(str)`: The database username.
  50. - _"db_password"_ `(str)`: The database password.
  51. - _"db_host"_ `(str)`: The database host. The default value is _"localhost"_.
  52. - _"db_port"_ `(int)`: The database port. The default value is 1433.
  53. - _"db_driver"_ `(str)`: The database driver.
  54. - _"sqlite_folder_path"_ (str): The path to the folder that contains SQLite file. The default value
  55. is the current working folder.
  56. - _"sqlite_file_extension"_ (str): The filename extension of the SQLite file. The default value is ".db".
  57. - _"db_extra_args"_ `(Dict[str, Any])`: A dictionary of additional arguments to be passed into database
  58. connection string.
  59. - _"exposed_type"_: The exposed type of the data read from SQL query. The default value is `pandas`.
  60. """
  61. __STORAGE_TYPE = "sql_table"
  62. __TABLE_KEY = "table_name"
  63. def __init__(
  64. self,
  65. config_id: str,
  66. scope: Scope,
  67. id: Optional[DataNodeId] = None,
  68. owner_id: Optional[str] = None,
  69. parent_ids: Optional[Set[str]] = None,
  70. last_edit_date: Optional[datetime] = None,
  71. edits: Optional[List[Edit]] = None,
  72. version: Optional[str] = None,
  73. validity_period: Optional[timedelta] = None,
  74. edit_in_progress: bool = False,
  75. editor_id: Optional[str] = None,
  76. editor_expiration_date: Optional[datetime] = None,
  77. properties: Optional[Dict] = None,
  78. ) -> None:
  79. if properties is None:
  80. properties = {}
  81. if properties.get(self.__TABLE_KEY) is None:
  82. raise MissingRequiredProperty(f"Property {self.__TABLE_KEY} is not informed and is required.")
  83. super().__init__(
  84. config_id,
  85. scope,
  86. id=id,
  87. owner_id=owner_id,
  88. parent_ids=parent_ids,
  89. last_edit_date=last_edit_date,
  90. edits=edits,
  91. version=version or _VersionManagerFactory._build_manager()._get_latest_version(),
  92. validity_period=validity_period,
  93. edit_in_progress=edit_in_progress,
  94. editor_id=editor_id,
  95. editor_expiration_date=editor_expiration_date,
  96. properties=properties,
  97. )
  98. self._TAIPY_PROPERTIES.update({self.__TABLE_KEY})
  99. @classmethod
  100. def storage_type(cls) -> str:
  101. return cls.__STORAGE_TYPE
  102. def _get_base_read_query(self) -> str:
  103. return f"SELECT * FROM {self.properties[self.__TABLE_KEY]}"
  104. def _do_append(self, data, engine, connection) -> None:
  105. self.__insert_data(data, engine, connection)
  106. def _do_write(self, data, engine, connection) -> None:
  107. self.__insert_data(data, engine, connection, delete_table=True)
  108. def __insert_data(self, data, engine, connection, delete_table: bool = False) -> None:
  109. table = self._create_table(engine)
  110. self.__insert_dataframe(
  111. self._convert_data_to_dataframe(self.properties[self._EXPOSED_TYPE_PROPERTY], data),
  112. table,
  113. connection,
  114. delete_table,
  115. )
  116. def _create_table(self, engine) -> Table:
  117. return Table(
  118. self.properties[self.__TABLE_KEY],
  119. MetaData(),
  120. autoload_with=engine,
  121. )
  122. @classmethod
  123. def __insert_dicts(cls, data: List[Dict], table: Any, connection: Any, delete_table: bool) -> None:
  124. """
  125. This method will insert the data contained in a list of dictionaries into a table. The query itself is handled
  126. by SQLAlchemy, so it's only needed to pass the correct data type.
  127. """
  128. cls.__delete_all_rows(table, connection, delete_table)
  129. connection.execute(table.insert(), data)
  130. @classmethod
  131. def __insert_dataframe(
  132. cls, df: Union[pd.DataFrame, pd.Series], table: Any, connection: Any, delete_table: bool
  133. ) -> None:
  134. if isinstance(df, pd.Series):
  135. data = [df.to_dict()]
  136. elif isinstance(df, pd.DataFrame):
  137. data = df.to_dict(orient="records")
  138. cls.__insert_dicts(data, table, connection, delete_table)
  139. @classmethod
  140. def __delete_all_rows(cls, table: Any, connection: Any, delete_table: bool) -> None:
  141. if delete_table:
  142. connection.execute(table.delete())