data_node.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import uuid
  13. from abc import abstractmethod
  14. from datetime import datetime, timedelta
  15. from typing import Any, Dict, List, Optional, Set, Tuple, Union
  16. import networkx as nx
  17. from taipy.config.common._validate_id import _validate_id
  18. from taipy.config.common.scope import Scope
  19. from taipy.logger._taipy_logger import _TaipyLogger
  20. from .._entity._entity import _Entity
  21. from .._entity._labeled import _Labeled
  22. from .._entity._properties import _Properties
  23. from .._entity._reload import _Reloader, _self_reload, _self_setter
  24. from .._version._version_manager_factory import _VersionManagerFactory
  25. from ..common._warnings import _warn_deprecated
  26. from ..exceptions.exceptions import DataNodeIsBeingEdited, NoData
  27. from ..job.job_id import JobId
  28. from ..notification.event import Event, EventEntityType, EventOperation, _make_event
  29. from ._filter import _FilterDataNode
  30. from .data_node_id import DataNodeId, Edit
  31. from .operator import JoinOperator
  32. class DataNode(_Entity, _Labeled):
  33. """Reference to a dataset.
  34. A Data Node is an abstract class that holds metadata related to the dataset it refers to.
  35. In particular, a data node holds the name, the scope, the owner identifier, the last
  36. edit date, and some additional properties of the data.<br/>
  37. A Data Node also contains information and methods needed to access the dataset. This
  38. information depends on the type of storage, and it is held by subclasses (such as
  39. SQL Data Node, CSV Data Node, ...).
  40. !!! note
  41. It is recommended not to instantiate subclasses of `DataNode` directly.
  42. Attributes:
  43. config_id (str): Identifier of the data node configuration. It must be a valid Python
  44. identifier.
  45. scope (Scope^): The scope of this data node.
  46. id (str): The unique identifier of this data node.
  47. name (str): A user-readable name of this data node.
  48. owner_id (str): The identifier of the owner (sequence_id, scenario_id, cycle_id) or
  49. None.
  50. parent_ids (Optional[Set[str]]): The set of identifiers of the parent tasks.
  51. last_edit_date (datetime): The date and time of the last modification.
  52. edits (List[Edit^]): The list of Edits (an alias for dict) containing metadata about each
  53. data edition including but not limited to timestamp, comments, job_id:
  54. timestamp: The time instant of the writing
  55. comments: Representation of a free text to explain or comment on a data change
  56. job_id: Only populated when the data node is written by a task execution and corresponds to the job's id.
  57. Additional metadata related to the edition made to the data node can also be provided in Edits.
  58. version (str): The string indicates the application version of the data node to
  59. instantiate. If not provided, the current version is used.
  60. validity_period (Optional[timedelta]): The duration implemented as a timedelta since the last edit date for
  61. which the data node can be considered up-to-date. Once the validity period has passed, the data node is
  62. considered stale and relevant tasks will run even if they are skippable (see the
  63. [Task management page](../core/entities/task-mgt.md) for more details).
  64. If _validity_period_ is set to `None`, the data node is always up-to-date.
  65. edit_in_progress (bool): True if the data node is locked for modification. False
  66. otherwise.
  67. editor_id (Optional[str]): The identifier of the user who is currently editing the data node.
  68. editor_expiration_date (Optional[datetime]): The expiration date of the editor lock.
  69. kwargs: A dictionary of additional properties.
  70. """
  71. _ID_PREFIX = "DATANODE"
  72. __ID_SEPARATOR = "_"
  73. __logger = _TaipyLogger._get_logger()
  74. _REQUIRED_PROPERTIES: List[str] = []
  75. _MANAGER_NAME = "data"
  76. __PATH_KEY = "path"
  77. __EDIT_TIMEOUT = 30
  78. _TAIPY_PROPERTIES: Set[str] = set()
  79. def __init__(
  80. self,
  81. config_id,
  82. scope: Scope = Scope(Scope.SCENARIO), # noqa: B008
  83. id: Optional[DataNodeId] = None,
  84. owner_id: Optional[str] = None,
  85. parent_ids: Optional[Set[str]] = None,
  86. last_edit_date: Optional[datetime] = None,
  87. edits: Optional[List[Edit]] = None,
  88. version: Optional[str] = None,
  89. validity_period: Optional[timedelta] = None,
  90. edit_in_progress: bool = False,
  91. editor_id: Optional[str] = None,
  92. editor_expiration_date: Optional[datetime] = None,
  93. **kwargs,
  94. ):
  95. self._config_id = _validate_id(config_id)
  96. self.id = id or DataNodeId(self.__ID_SEPARATOR.join([self._ID_PREFIX, self.config_id, str(uuid.uuid4())]))
  97. self._owner_id = owner_id
  98. self._parent_ids = parent_ids or set()
  99. self._scope = scope
  100. self._last_edit_date = last_edit_date
  101. self._edit_in_progress = edit_in_progress
  102. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  103. self._validity_period = validity_period
  104. self._editor_id: Optional[str] = editor_id
  105. self._editor_expiration_date: Optional[datetime] = editor_expiration_date
  106. # Track edits
  107. self._edits = edits or list()
  108. self._properties = _Properties(self, **kwargs)
  109. @property
  110. def config_id(self):
  111. return self._config_id
  112. @property
  113. def owner_id(self):
  114. return self._owner_id
  115. def get_parents(self):
  116. """Get all parents of this data node."""
  117. from ... import core as tp
  118. return tp.get_parents(self)
  119. @property # type: ignore
  120. @_self_reload(_MANAGER_NAME)
  121. def parent_ids(self):
  122. """List of parent ids of this data node."""
  123. return self._parent_ids
  124. @property # type: ignore
  125. @_self_reload(_MANAGER_NAME)
  126. def edits(self):
  127. """Get all `Edit^`s of this data node."""
  128. return self._edits
  129. def get_last_edit(self) -> Optional[Edit]:
  130. """Get last `Edit^` of this data node.
  131. Returns:
  132. None if there has been no `Edit^` on this data node.
  133. """
  134. if self._edits:
  135. return self._edits[-1]
  136. return None
  137. @property # type: ignore
  138. @_self_reload(_MANAGER_NAME)
  139. def last_edit_date(self):
  140. last_modified_datetime = self.__get_last_modified_datetime()
  141. if last_modified_datetime and last_modified_datetime > self._last_edit_date:
  142. return last_modified_datetime
  143. else:
  144. return self._last_edit_date
  145. @last_edit_date.setter # type: ignore
  146. @_self_setter(_MANAGER_NAME)
  147. def last_edit_date(self, val):
  148. self._last_edit_date = val
  149. @property # type: ignore
  150. @_self_reload(_MANAGER_NAME)
  151. def scope(self):
  152. return self._scope
  153. @scope.setter # type: ignore
  154. @_self_setter(_MANAGER_NAME)
  155. def scope(self, val):
  156. self._scope = val
  157. @property # type: ignore
  158. @_self_reload(_MANAGER_NAME)
  159. def validity_period(self) -> Optional[timedelta]:
  160. return self._validity_period if self._validity_period else None
  161. @validity_period.setter # type: ignore
  162. @_self_setter(_MANAGER_NAME)
  163. def validity_period(self, val):
  164. self._validity_period = val
  165. @property # type: ignore
  166. @_self_reload(_MANAGER_NAME)
  167. def expiration_date(self) -> datetime:
  168. """Datetime instant of the expiration date of this data node."""
  169. last_edit_date = self.last_edit_date
  170. validity_period = self._validity_period
  171. if not last_edit_date:
  172. raise NoData(f"Data node {self.id} from config {self.config_id} has not been written yet.")
  173. return last_edit_date + validity_period if validity_period else last_edit_date
  174. @property # type: ignore
  175. def name(self) -> Optional[str]:
  176. return self.properties.get("name")
  177. @name.setter # type: ignore
  178. def name(self, val):
  179. self.properties["name"] = val
  180. @property
  181. def version(self):
  182. return self._version
  183. @property
  184. def cacheable(self):
  185. """Deprecated. Use `skippable` attribute of a `Task^` instead."""
  186. _warn_deprecated("cacheable", suggest="the skippable feature")
  187. return self.properties.get("cacheable", False)
  188. @cacheable.setter
  189. def cacheable(self, val):
  190. _warn_deprecated("cacheable", suggest="the skippable feature")
  191. @property # type: ignore
  192. @_self_reload(_MANAGER_NAME)
  193. def edit_in_progress(self):
  194. return self._edit_in_progress
  195. @edit_in_progress.setter # type: ignore
  196. @_self_setter(_MANAGER_NAME)
  197. def edit_in_progress(self, val):
  198. self._edit_in_progress = val
  199. @property # type: ignore
  200. @_self_reload(_MANAGER_NAME)
  201. def editor_id(self):
  202. return self._editor_id
  203. @editor_id.setter # type: ignore
  204. @_self_setter(_MANAGER_NAME)
  205. def editor_id(self, val):
  206. self._editor_id = val
  207. @property # type: ignore
  208. @_self_reload(_MANAGER_NAME)
  209. def editor_expiration_date(self):
  210. return self._editor_expiration_date
  211. @editor_expiration_date.setter # type: ignore
  212. @_self_setter(_MANAGER_NAME)
  213. def editor_expiration_date(self, val):
  214. self._editor_expiration_date = val
  215. @property # type: ignore
  216. @_self_reload(_MANAGER_NAME)
  217. def job_ids(self):
  218. """List of the jobs having edited this data node."""
  219. return [edit.get("job_id") for edit in self.edits if edit.get("job_id")]
  220. @property
  221. def properties(self):
  222. """Dictionary of custom properties."""
  223. self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
  224. return self._properties
  225. def _get_user_properties(self) -> Dict[str, Any]:
  226. """Get user properties."""
  227. return {key: value for key, value in self.properties.items() if key not in self._TAIPY_PROPERTIES}
  228. def __eq__(self, other):
  229. return self.id == other.id
  230. def __ne__(self, other):
  231. return not self == other
  232. def __hash__(self):
  233. return hash(self.id)
  234. def __getstate__(self):
  235. return vars(self)
  236. def __setstate__(self, state):
  237. vars(self).update(state)
  238. def __getattr__(self, attribute_name):
  239. protected_attribute_name = _validate_id(attribute_name)
  240. if protected_attribute_name in self._properties:
  241. return self._properties[protected_attribute_name]
  242. raise AttributeError(f"{attribute_name} is not an attribute of data node {self.id}")
  243. def __get_last_modified_datetime(self) -> Optional[datetime]:
  244. path = self._properties.get(self.__PATH_KEY, None)
  245. if path and os.path.isfile(path):
  246. return datetime.fromtimestamp(os.path.getmtime(path))
  247. last_modified_datetime = None
  248. if path and os.path.isdir(path):
  249. for filename in os.listdir(path):
  250. filepath = os.path.join(path, filename)
  251. if os.path.isfile(filepath):
  252. file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
  253. if last_modified_datetime is None or file_mtime > last_modified_datetime:
  254. last_modified_datetime = file_mtime
  255. return last_modified_datetime
  256. @classmethod
  257. @abstractmethod
  258. def storage_type(cls) -> str:
  259. raise NotImplementedError
  260. def read_or_raise(self) -> Any:
  261. """Read the data referenced by this data node.
  262. Returns:
  263. The data referenced by this data node.
  264. Raises:
  265. NoData^: If the data has not been written yet.
  266. """
  267. if not self.last_edit_date:
  268. raise NoData(f"Data node {self.id} from config {self.config_id} has not been written yet.")
  269. return self._read()
  270. def read(self) -> Any:
  271. """Read the data referenced by this data node.
  272. Returns:
  273. The data referenced by this data node. None if the data has not been written yet.
  274. """
  275. try:
  276. return self.read_or_raise()
  277. except NoData:
  278. self.__logger.warning(
  279. f"Data node {self.id} from config {self.config_id} is being read but has never been " f"written."
  280. )
  281. return None
  282. def append(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
  283. """Append some data to this data node.
  284. Parameters:
  285. data (Any): The data to write to this data node.
  286. job_id (JobId^): An optional identifier of the writer.
  287. **kwargs (dict[str, any]): Extra information to attach to the edit document
  288. corresponding to this write.
  289. """
  290. from ._data_manager_factory import _DataManagerFactory
  291. self._append(data)
  292. self.track_edit(job_id=job_id, **kwargs)
  293. self.unlock_edit()
  294. _DataManagerFactory._build_manager()._set(self)
  295. def write(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
  296. """Write some data to this data node.
  297. Parameters:
  298. data (Any): The data to write to this data node.
  299. job_id (JobId^): An optional identifier of the writer.
  300. **kwargs (dict[str, any]): Extra information to attach to the edit document
  301. corresponding to this write.
  302. """
  303. from ._data_manager_factory import _DataManagerFactory
  304. self._write(data)
  305. self.track_edit(job_id=job_id, **kwargs)
  306. self.unlock_edit()
  307. _DataManagerFactory._build_manager()._set(self)
  308. def track_edit(self, **options):
  309. """Creates and adds a new entry in the edits attribute without writing the data.
  310. Parameters:
  311. options (dict[str, any)): track `timestamp`, `comments`, `job_id`. The others are user-custom, users can
  312. use options to attach any information to an external edit of a data node.
  313. """
  314. edit = {}
  315. for k, v in options.items():
  316. if v is not None:
  317. edit[k] = v
  318. if "timestamp" not in edit:
  319. edit["timestamp"] = datetime.now()
  320. self.last_edit_date = edit.get("timestamp")
  321. self._edits.append(edit)
  322. def lock_edit(self, editor_id: Optional[str] = None):
  323. """Lock the data node modification.
  324. Note:
  325. The data node can be unlocked with the method `(DataNode.)unlock_edit()^`.
  326. Parameters:
  327. editor_id (Optional[str]): The editor's identifier.
  328. """
  329. if editor_id:
  330. if (
  331. self.edit_in_progress
  332. and self.editor_id != editor_id
  333. and self.editor_expiration_date
  334. and self.editor_expiration_date > datetime.now()
  335. ):
  336. raise DataNodeIsBeingEdited(self.id, self._editor_id)
  337. self.editor_id = editor_id # type: ignore
  338. self.editor_expiration_date = datetime.now() + timedelta(minutes=self.__EDIT_TIMEOUT) # type: ignore
  339. else:
  340. self.editor_id = None # type: ignore
  341. self.editor_expiration_date = None # type: ignore
  342. self.edit_in_progress = True # type: ignore
  343. def unlock_edit(self, editor_id: Optional[str] = None):
  344. """Unlocks the data node modification.
  345. Note:
  346. The data node can be locked with the method `(DataNode.)lock_edit()^`.
  347. Parameters:
  348. editor_id (Optional[str]): The editor's identifier.
  349. """
  350. if (
  351. editor_id
  352. and self.editor_id != editor_id
  353. and self.editor_expiration_date
  354. and self.editor_expiration_date > datetime.now()
  355. ):
  356. raise DataNodeIsBeingEdited(self.id, self._editor_id)
  357. else:
  358. self.editor_id = None # type: ignore
  359. self.editor_expiration_date = None # type: ignore
  360. self.edit_in_progress = False # type: ignore
  361. def filter(self, operators: Union[List, Tuple], join_operator=JoinOperator.AND):
  362. """Read and filter the data referenced by this data node.
  363. The data is filtered by the provided list of 3-tuples (key, value, `Operator^`).
  364. If multiple filter operators are provided, filtered data will be joined based on the
  365. join operator (*AND* or *OR*).
  366. Parameters:
  367. operators (Union[List[Tuple], Tuple]): A 3-element tuple or a list of 3-element tuples,
  368. each is in the form of (key, value, `Operator^`).
  369. join_operator (JoinOperator^): The operator used to join the multiple filter
  370. 3-tuples.
  371. Returns:
  372. The filtered data.
  373. Raises:
  374. NotImplementedError: If the data type is not supported.
  375. """
  376. data = self._read()
  377. return _FilterDataNode._filter(data, operators, join_operator)
  378. def __getitem__(self, item):
  379. data = self._read()
  380. return _FilterDataNode._filter_by_key(data, item)
  381. @abstractmethod
  382. def _read(self):
  383. raise NotImplementedError
  384. def _append(self, data):
  385. raise NotImplementedError
  386. @abstractmethod
  387. def _write(self, data):
  388. raise NotImplementedError
  389. @property # type: ignore
  390. @_self_reload(_MANAGER_NAME)
  391. def is_ready_for_reading(self) -> bool:
  392. """Indicate if this data node is ready for reading.
  393. Returns:
  394. False if the data is locked for modification or if the data has never been written.
  395. True otherwise.
  396. """
  397. if self._edit_in_progress:
  398. return False
  399. if not self._last_edit_date:
  400. # Never been written so it is not up-to-date
  401. return False
  402. return True
  403. @property # type: ignore
  404. @_self_reload(_MANAGER_NAME)
  405. def is_valid(self) -> bool:
  406. """Indicate if this data node is valid.
  407. Returns:
  408. False if the data ever been written or the expiration date has passed.<br/>
  409. True otherwise.
  410. """
  411. if not self._last_edit_date:
  412. # Never been written so it is not valid
  413. return False
  414. if not self._validity_period:
  415. # No validity period and has already been written, so it is valid
  416. return True
  417. if datetime.now() > self.expiration_date:
  418. # expiration_date has been passed
  419. return False
  420. return True
  421. @property
  422. def is_up_to_date(self) -> bool:
  423. """Indicate if this data node is up-to-date.
  424. Returns:
  425. False if a preceding data node has been updated before the selected data node
  426. or the selected data is invalid.<br/>
  427. True otherwise.
  428. """
  429. from ..scenario.scenario import Scenario
  430. from ..taipy import get_parents
  431. parent_scenarios: Set[Scenario] = get_parents(self)["scenario"] # type: ignore
  432. for parent_scenario in parent_scenarios:
  433. for ancestor_node in nx.ancestors(parent_scenario._build_dag(), self):
  434. if (
  435. isinstance(ancestor_node, DataNode)
  436. and ancestor_node.last_edit_date
  437. and ancestor_node.last_edit_date > self.last_edit_date
  438. ):
  439. return False
  440. return self.is_valid
  441. @staticmethod
  442. def _class_map():
  443. def all_subclasses(cls):
  444. subclasses = set(cls.__subclasses__())
  445. for s in cls.__subclasses__():
  446. subclasses.update(all_subclasses(s))
  447. return subclasses
  448. class_map = {}
  449. for c in all_subclasses(DataNode):
  450. try:
  451. if c.storage_type() is not None:
  452. class_map[c.storage_type()] = c
  453. except NotImplementedError:
  454. pass
  455. return class_map
  456. def get_label(self) -> str:
  457. """Returns the data node simple label prefixed by its owner label.
  458. Returns:
  459. The label of the data node as a string.
  460. """
  461. return self._get_label()
  462. def get_simple_label(self) -> str:
  463. """Returns the data node simple label.
  464. Returns:
  465. The simple label of the data node as a string.
  466. """
  467. return self._get_simple_label()
  468. @_make_event.register(DataNode)
  469. def make_event_for_datanode(
  470. data_node: DataNode,
  471. operation: EventOperation,
  472. /,
  473. attribute_name: Optional[str] = None,
  474. attribute_value: Optional[Any] = None,
  475. **kwargs,
  476. ) -> Event:
  477. metadata = {"config_id": data_node.config_id, "version": data_node._version, **kwargs}
  478. return Event(
  479. entity_type=EventEntityType.DATA_NODE,
  480. entity_id=data_node.id,
  481. operation=operation,
  482. attribute_name=attribute_name,
  483. attribute_value=attribute_value,
  484. metadata=metadata,
  485. )