data_node.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import functools
  12. import os
  13. import uuid
  14. from abc import abstractmethod
  15. from datetime import datetime, timedelta
  16. from typing import Any, Dict, List, Optional, Set, Tuple, Union
  17. import networkx as nx
  18. from taipy.config.common._validate_id import _validate_id
  19. from taipy.config.common.scope import Scope
  20. from taipy.logger._taipy_logger import _TaipyLogger
  21. from .._entity._entity import _Entity
  22. from .._entity._labeled import _Labeled
  23. from .._entity._properties import _Properties
  24. from .._entity._ready_to_run_property import _ReadyToRunProperty
  25. from .._entity._reload import _Reloader, _self_reload, _self_setter
  26. from .._version._version_manager_factory import _VersionManagerFactory
  27. from ..common._warnings import _warn_deprecated
  28. from ..exceptions.exceptions import DataNodeIsBeingEdited, NoData
  29. from ..job.job_id import JobId
  30. from ..notification.event import Event, EventEntityType, EventOperation, _make_event
  31. from ._filter import _FilterDataNode
  32. from .data_node_id import DataNodeId, Edit
  33. from .operator import JoinOperator
  34. def _update_ready_for_reading(fct):
  35. # This decorator must be wrapped before self_setter decorator as self_setter will run the function twice.
  36. @functools.wraps(fct)
  37. def _recompute_is_ready_for_reading(dn: "DataNode", *args, **kwargs):
  38. fct(dn, *args, **kwargs)
  39. if dn._edit_in_progress:
  40. _ReadyToRunProperty._add(dn, f"DataNode {dn.id} is being edited")
  41. else:
  42. _ReadyToRunProperty._remove(dn, f"DataNode {dn.id} is being edited")
  43. if not dn._last_edit_date:
  44. _ReadyToRunProperty._add(dn, f"DataNode {dn.id} is not written")
  45. else:
  46. _ReadyToRunProperty._remove(dn, f"DataNode {dn.id} is not written")
  47. return _recompute_is_ready_for_reading
  48. class DataNode(_Entity, _Labeled):
  49. """Reference to a dataset.
  50. A Data Node is an abstract class that holds metadata related to the dataset it refers to.
  51. In particular, a data node holds the name, the scope, the owner identifier, the last
  52. edit date, and some additional properties of the data.<br/>
  53. A Data Node also contains information and methods needed to access the dataset. This
  54. information depends on the type of storage, and it is held by subclasses (such as
  55. SQL Data Node, CSV Data Node, ...).
  56. !!! note
  57. It is recommended not to instantiate subclasses of `DataNode` directly.
  58. Attributes:
  59. config_id (str): Identifier of the data node configuration. It must be a valid Python
  60. identifier.
  61. scope (Scope^): The scope of this data node.
  62. id (str): The unique identifier of this data node.
  63. name (str): A user-readable name of this data node.
  64. owner_id (str): The identifier of the owner (sequence_id, scenario_id, cycle_id) or
  65. None.
  66. parent_ids (Optional[Set[str]]): The set of identifiers of the parent tasks.
  67. last_edit_date (datetime): The date and time of the last modification.
  68. edits (List[Edit^]): The list of Edits (an alias for dict) containing metadata about each
  69. data edition including but not limited to timestamp, comments, job_id:
  70. timestamp: The time instant of the writing
  71. comments: Representation of a free text to explain or comment on a data change
  72. job_id: Only populated when the data node is written by a task execution and corresponds to the job's id.
  73. Additional metadata related to the edition made to the data node can also be provided in Edits.
  74. version (str): The string indicates the application version of the data node to
  75. instantiate. If not provided, the current version is used.
  76. validity_period (Optional[timedelta]): The duration implemented as a timedelta since the last edit date for
  77. which the data node can be considered up-to-date. Once the validity period has passed, the data node is
  78. considered stale and relevant tasks will run even if they are skippable (see the
  79. [Task management page](../core/entities/task-mgt.md) for more details).
  80. If _validity_period_ is set to `None`, the data node is always up-to-date.
  81. edit_in_progress (bool): True if the data node is locked for modification. False
  82. otherwise.
  83. editor_id (Optional[str]): The identifier of the user who is currently editing the data node.
  84. editor_expiration_date (Optional[datetime]): The expiration date of the editor lock.
  85. kwargs: A dictionary of additional properties.
  86. """
  87. _ID_PREFIX = "DATANODE"
  88. __ID_SEPARATOR = "_"
  89. __logger = _TaipyLogger._get_logger()
  90. _REQUIRED_PROPERTIES: List[str] = []
  91. _MANAGER_NAME: str = "data"
  92. _PATH_KEY = "path"
  93. __EDIT_TIMEOUT = 30
  94. _TAIPY_PROPERTIES: Set[str] = set()
  95. def __init__(
  96. self,
  97. config_id,
  98. scope: Scope = Scope(Scope.SCENARIO), # noqa: B008
  99. id: Optional[DataNodeId] = None,
  100. owner_id: Optional[str] = None,
  101. parent_ids: Optional[Set[str]] = None,
  102. last_edit_date: Optional[datetime] = None,
  103. edits: Optional[List[Edit]] = None,
  104. version: Optional[str] = None,
  105. validity_period: Optional[timedelta] = None,
  106. edit_in_progress: bool = False,
  107. editor_id: Optional[str] = None,
  108. editor_expiration_date: Optional[datetime] = None,
  109. **kwargs,
  110. ) -> None:
  111. self._config_id = _validate_id(config_id)
  112. self.id = id or self._new_id(self._config_id)
  113. self._owner_id = owner_id
  114. self._parent_ids = parent_ids or set()
  115. self._scope = scope
  116. self._last_edit_date: Optional[datetime] = last_edit_date
  117. self._edit_in_progress = edit_in_progress
  118. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  119. self._validity_period = validity_period
  120. self._editor_id: Optional[str] = editor_id
  121. self._editor_expiration_date: Optional[datetime] = editor_expiration_date
  122. # Track edits
  123. self._edits = edits or []
  124. self._properties = _Properties(self, **kwargs)
  125. @staticmethod
  126. def _new_id(config_id: str) -> DataNodeId:
  127. """Generate a unique datanode identifier."""
  128. return DataNodeId(
  129. DataNode.__ID_SEPARATOR.join([DataNode._ID_PREFIX, _validate_id(config_id), str(uuid.uuid4())])
  130. )
  131. @property
  132. def config_id(self):
  133. return self._config_id
  134. @property
  135. def owner_id(self):
  136. return self._owner_id
  137. def get_parents(self):
  138. """Get all parents of this data node."""
  139. from ... import core as tp
  140. return tp.get_parents(self)
  141. @property # type: ignore
  142. @_self_reload(_MANAGER_NAME)
  143. def parent_ids(self):
  144. """List of parent ids of this data node."""
  145. return self._parent_ids
  146. @property # type: ignore
  147. @_self_reload(_MANAGER_NAME)
  148. def edits(self):
  149. """Get all `Edit^`s of this data node."""
  150. return self._edits
  151. def get_last_edit(self) -> Optional[Edit]:
  152. """Get last `Edit^` of this data node.
  153. Returns:
  154. None if there has been no `Edit^` on this data node.
  155. """
  156. return self._edits[-1] if self._edits else None
  157. @property # type: ignore
  158. @_self_reload(_MANAGER_NAME)
  159. def last_edit_date(self):
  160. last_modified_datetime = self._get_last_modified_datetime(self._properties.get(self._PATH_KEY, None))
  161. if last_modified_datetime and last_modified_datetime > self._last_edit_date:
  162. return last_modified_datetime
  163. else:
  164. return self._last_edit_date
  165. @last_edit_date.setter # type: ignore
  166. @_update_ready_for_reading
  167. @_self_setter(_MANAGER_NAME)
  168. def last_edit_date(self, val):
  169. self._last_edit_date = val
  170. @property # type: ignore
  171. @_self_reload(_MANAGER_NAME)
  172. def scope(self):
  173. return self._scope
  174. @scope.setter # type: ignore
  175. @_self_setter(_MANAGER_NAME)
  176. def scope(self, val):
  177. self._scope = val
  178. @property # type: ignore
  179. @_self_reload(_MANAGER_NAME)
  180. def validity_period(self) -> Optional[timedelta]:
  181. return self._validity_period if self._validity_period else None
  182. @validity_period.setter # type: ignore
  183. @_self_setter(_MANAGER_NAME)
  184. def validity_period(self, val):
  185. self._validity_period = val
  186. @property # type: ignore
  187. @_self_reload(_MANAGER_NAME)
  188. def expiration_date(self) -> datetime:
  189. """Datetime instant of the expiration date of this data node."""
  190. last_edit_date = self.last_edit_date
  191. validity_period = self._validity_period
  192. if not last_edit_date:
  193. raise NoData(f"Data node {self.id} from config {self.config_id} has not been written yet.")
  194. return last_edit_date + validity_period if validity_period else last_edit_date
  195. @property # type: ignore
  196. def name(self) -> Optional[str]:
  197. return self.properties.get("name")
  198. @name.setter # type: ignore
  199. def name(self, val):
  200. self.properties["name"] = val
  201. @property
  202. def version(self):
  203. return self._version
  204. @property
  205. def cacheable(self):
  206. """Deprecated. Use `skippable` attribute of a `Task^` instead."""
  207. _warn_deprecated("cacheable", suggest="the skippable feature")
  208. return self.properties.get("cacheable", False)
  209. @cacheable.setter
  210. def cacheable(self, val):
  211. _warn_deprecated("cacheable", suggest="the skippable feature")
  212. @property # type: ignore
  213. @_self_reload(_MANAGER_NAME)
  214. def edit_in_progress(self):
  215. return self._edit_in_progress
  216. @edit_in_progress.setter # type: ignore
  217. @_update_ready_for_reading
  218. @_self_setter(_MANAGER_NAME)
  219. def edit_in_progress(self, val):
  220. self._edit_in_progress = val
  221. @property # type: ignore
  222. @_self_reload(_MANAGER_NAME)
  223. def editor_id(self):
  224. return self._editor_id
  225. @editor_id.setter # type: ignore
  226. @_self_setter(_MANAGER_NAME)
  227. def editor_id(self, val):
  228. self._editor_id = val
  229. @property # type: ignore
  230. @_self_reload(_MANAGER_NAME)
  231. def editor_expiration_date(self):
  232. return self._editor_expiration_date
  233. @editor_expiration_date.setter # type: ignore
  234. @_self_setter(_MANAGER_NAME)
  235. def editor_expiration_date(self, val):
  236. self._editor_expiration_date = val
  237. @property # type: ignore
  238. @_self_reload(_MANAGER_NAME)
  239. def job_ids(self):
  240. """List of the jobs having edited this data node."""
  241. return [edit.get("job_id") for edit in self.edits if edit.get("job_id")]
  242. @property
  243. def properties(self):
  244. """Dictionary of custom properties."""
  245. self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
  246. return self._properties
  247. def _get_user_properties(self) -> Dict[str, Any]:
  248. """Get user properties."""
  249. return {key: value for key, value in self.properties.items() if key not in self._TAIPY_PROPERTIES}
  250. def __eq__(self, other):
  251. return isinstance(other, DataNode) and self.id == other.id
  252. def __ne__(self, other):
  253. return not self == other
  254. def __hash__(self):
  255. return hash(self.id)
  256. def __getstate__(self):
  257. return vars(self)
  258. def __setstate__(self, state):
  259. vars(self).update(state)
  260. def __getattr__(self, attribute_name):
  261. protected_attribute_name = _validate_id(attribute_name)
  262. if protected_attribute_name in self._properties:
  263. return self._properties[protected_attribute_name]
  264. raise AttributeError(f"{attribute_name} is not an attribute of data node {self.id}")
  265. @classmethod
  266. def _get_last_modified_datetime(cls, path: Optional[str] = None) -> Optional[datetime]:
  267. if path and os.path.isfile(path):
  268. return datetime.fromtimestamp(os.path.getmtime(path))
  269. last_modified_datetime = None
  270. if path and os.path.isdir(path):
  271. for filename in os.listdir(path):
  272. filepath = os.path.join(path, filename)
  273. if os.path.isfile(filepath):
  274. file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
  275. if last_modified_datetime is None or file_mtime > last_modified_datetime:
  276. last_modified_datetime = file_mtime
  277. return last_modified_datetime
  278. @classmethod
  279. @abstractmethod
  280. def storage_type(cls) -> str:
  281. raise NotImplementedError
  282. def read_or_raise(self) -> Any:
  283. """Read the data referenced by this data node.
  284. Returns:
  285. The data referenced by this data node.
  286. Raises:
  287. NoData^: If the data has not been written yet.
  288. """
  289. if not self.last_edit_date:
  290. raise NoData(f"Data node {self.id} from config {self.config_id} has not been written yet.")
  291. return self._read()
  292. def read(self) -> Any:
  293. """Read the data referenced by this data node.
  294. Returns:
  295. The data referenced by this data node. None if the data has not been written yet.
  296. """
  297. try:
  298. return self.read_or_raise()
  299. except NoData:
  300. self.__logger.warning(
  301. f"Data node {self.id} from config {self.config_id} is being read but has never been " f"written."
  302. )
  303. return None
  304. def append(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
  305. """Append some data to this data node.
  306. Parameters:
  307. data (Any): The data to write to this data node.
  308. job_id (JobId^): An optional identifier of the writer.
  309. **kwargs (dict[str, any]): Extra information to attach to the edit document
  310. corresponding to this write.
  311. """
  312. from ._data_manager_factory import _DataManagerFactory
  313. self._append(data)
  314. self.track_edit(job_id=job_id, **kwargs)
  315. self.unlock_edit()
  316. _DataManagerFactory._build_manager()._set(self)
  317. def write(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
  318. """Write some data to this data node.
  319. Parameters:
  320. data (Any): The data to write to this data node.
  321. job_id (JobId^): An optional identifier of the writer.
  322. **kwargs (dict[str, any]): Extra information to attach to the edit document
  323. corresponding to this write.
  324. """
  325. from ._data_manager_factory import _DataManagerFactory
  326. self._write(data)
  327. self.track_edit(job_id=job_id, **kwargs)
  328. self.unlock_edit()
  329. _DataManagerFactory._build_manager()._set(self)
  330. def track_edit(self, **options):
  331. """Creates and adds a new entry in the edits attribute without writing the data.
  332. Parameters:
  333. options (dict[str, any]): track `timestamp`, `comments`, `job_id`. The others are user-custom, users can
  334. use options to attach any information to an external edit of a data node.
  335. """
  336. edit = {k: v for k, v in options.items() if v is not None}
  337. if "timestamp" not in edit:
  338. edit["timestamp"] = (
  339. self._get_last_modified_datetime(self._properties.get(self._PATH_KEY, None)) or datetime.now()
  340. )
  341. self.last_edit_date = edit.get("timestamp")
  342. self._edits.append(edit)
  343. def lock_edit(self, editor_id: Optional[str] = None):
  344. """Lock the data node modification.
  345. Note:
  346. The data node can be unlocked with the method `(DataNode.)unlock_edit()^`.
  347. Parameters:
  348. editor_id (Optional[str]): The editor's identifier.
  349. """
  350. if editor_id:
  351. if (
  352. self.edit_in_progress
  353. and self.editor_id != editor_id
  354. and self.editor_expiration_date
  355. and self.editor_expiration_date > datetime.now()
  356. ):
  357. raise DataNodeIsBeingEdited(self.id, self._editor_id)
  358. self.editor_id = editor_id # type: ignore
  359. self.editor_expiration_date = datetime.now() + timedelta(minutes=self.__EDIT_TIMEOUT) # type: ignore
  360. else:
  361. self.editor_id = None # type: ignore
  362. self.editor_expiration_date = None # type: ignore
  363. self.edit_in_progress = True # type: ignore
  364. def unlock_edit(self, editor_id: Optional[str] = None):
  365. """Unlocks the data node modification.
  366. Note:
  367. The data node can be locked with the method `(DataNode.)lock_edit()^`.
  368. Parameters:
  369. editor_id (Optional[str]): The editor's identifier.
  370. """
  371. if (
  372. editor_id
  373. and self.editor_id != editor_id
  374. and self.editor_expiration_date
  375. and self.editor_expiration_date > datetime.now()
  376. ):
  377. raise DataNodeIsBeingEdited(self.id, self._editor_id)
  378. self.editor_id = None
  379. self.editor_expiration_date = None
  380. self.edit_in_progress = False
  381. def filter(self, operators: Union[List, Tuple], join_operator=JoinOperator.AND):
  382. """Read and filter the data referenced by this data node.
  383. The data is filtered by the provided list of 3-tuples (key, value, `Operator^`).
  384. If multiple filter operators are provided, filtered data will be joined based on the
  385. join operator (*AND* or *OR*).
  386. Parameters:
  387. operators (Union[List[Tuple], Tuple]): A 3-element tuple or a list of 3-element tuples,
  388. each is in the form of (key, value, `Operator^`).
  389. join_operator (JoinOperator^): The operator used to join the multiple filter
  390. 3-tuples.
  391. Returns:
  392. The filtered data.
  393. Raises:
  394. NotImplementedError: If the data type is not supported.
  395. """
  396. data = self._read()
  397. return _FilterDataNode._filter(data, operators, join_operator)
  398. def __getitem__(self, item):
  399. data = self._read()
  400. return _FilterDataNode._filter_by_key(data, item)
  401. @abstractmethod
  402. def _read(self):
  403. raise NotImplementedError
  404. def _append(self, data):
  405. raise NotImplementedError
  406. @abstractmethod
  407. def _write(self, data):
  408. raise NotImplementedError
  409. @property # type: ignore
  410. @_self_reload(_MANAGER_NAME)
  411. def is_ready_for_reading(self) -> bool:
  412. """Indicate if this data node is ready for reading.
  413. Returns:
  414. False if the data is locked for modification or if the data has never been written.
  415. True otherwise.
  416. """
  417. if self._edit_in_progress:
  418. return False
  419. if not self._last_edit_date:
  420. # Never been written so it is not up-to-date
  421. return False
  422. return True
  423. @property # type: ignore
  424. @_self_reload(_MANAGER_NAME)
  425. def is_valid(self) -> bool:
  426. """Indicate if this data node is valid.
  427. Returns:
  428. False if the data ever been written or the expiration date has passed.<br/>
  429. True otherwise.
  430. """
  431. if not self._last_edit_date:
  432. # Never been written so it is not valid
  433. return False
  434. if not self._validity_period:
  435. # No validity period and has already been written, so it is valid
  436. return True
  437. if datetime.now() > self.expiration_date:
  438. # expiration_date has been passed
  439. return False
  440. return True
  441. @property
  442. def is_up_to_date(self) -> bool:
  443. """Indicate if this data node is up-to-date.
  444. Returns:
  445. False if a preceding data node has been updated before the selected data node
  446. or the selected data is invalid.<br/>
  447. True otherwise.
  448. """
  449. from ..scenario.scenario import Scenario
  450. from ..taipy import get_parents
  451. parent_scenarios: Set[Scenario] = get_parents(self)["scenario"] # type: ignore
  452. for parent_scenario in parent_scenarios:
  453. for ancestor_node in nx.ancestors(parent_scenario._build_dag(), self):
  454. if (
  455. isinstance(ancestor_node, DataNode)
  456. and ancestor_node.last_edit_date
  457. and ancestor_node.last_edit_date > self.last_edit_date
  458. ):
  459. return False
  460. return self.is_valid
  461. @staticmethod
  462. def _class_map():
  463. def all_subclasses(cls):
  464. subclasses = set(cls.__subclasses__())
  465. for s in cls.__subclasses__():
  466. subclasses.update(all_subclasses(s))
  467. return subclasses
  468. class_map = {}
  469. for c in all_subclasses(DataNode):
  470. try:
  471. if c.storage_type() is not None:
  472. class_map[c.storage_type()] = c
  473. except NotImplementedError:
  474. pass
  475. return class_map
  476. def get_label(self) -> str:
  477. """Returns the data node simple label prefixed by its owner label.
  478. Returns:
  479. The label of the data node as a string.
  480. """
  481. return self._get_label()
  482. def get_simple_label(self) -> str:
  483. """Returns the data node simple label.
  484. Returns:
  485. The simple label of the data node as a string.
  486. """
  487. return self._get_simple_label()
  488. @_make_event.register(DataNode)
  489. def _make_event_for_datanode(
  490. data_node: DataNode,
  491. operation: EventOperation,
  492. /,
  493. attribute_name: Optional[str] = None,
  494. attribute_value: Optional[Any] = None,
  495. **kwargs,
  496. ) -> Event:
  497. metadata = {"config_id": data_node.config_id, "version": data_node._version, **kwargs}
  498. return Event(
  499. entity_type=EventEntityType.DATA_NODE,
  500. entity_id=data_node.id,
  501. operation=operation,
  502. attribute_name=attribute_name,
  503. attribute_value=attribute_value,
  504. metadata=metadata,
  505. )