data_node.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import functools
  12. import os
  13. import uuid
  14. from abc import abstractmethod
  15. from datetime import datetime, timedelta
  16. from typing import Any, Dict, List, Optional, Set, Tuple, Union
  17. import networkx as nx
  18. from taipy.config.common._validate_id import _validate_id
  19. from taipy.config.common.scope import Scope
  20. from taipy.logger._taipy_logger import _TaipyLogger
  21. from .._entity._entity import _Entity
  22. from .._entity._labeled import _Labeled
  23. from .._entity._properties import _Properties
  24. from .._entity._ready_to_run_property import _ReadyToRunProperty
  25. from .._entity._reload import _Reloader, _self_reload, _self_setter
  26. from .._version._version_manager_factory import _VersionManagerFactory
  27. from ..common._warnings import _warn_deprecated
  28. from ..exceptions.exceptions import DataNodeIsBeingEdited, NoData
  29. from ..job.job_id import JobId
  30. from ..notification.event import Event, EventEntityType, EventOperation, _make_event
  31. from ..reason import DataNodeEditInProgress, DataNodeIsNotWritten
  32. from ._filter import _FilterDataNode
  33. from .data_node_id import DataNodeId, Edit
  34. from .operator import JoinOperator
  35. def _update_ready_for_reading(fct):
  36. # This decorator must be wrapped before self_setter decorator as self_setter will run the function twice.
  37. @functools.wraps(fct)
  38. def _recompute_is_ready_for_reading(dn: "DataNode", *args, **kwargs):
  39. fct(dn, *args, **kwargs)
  40. if dn._edit_in_progress:
  41. _ReadyToRunProperty._add(dn, DataNodeEditInProgress(dn.id))
  42. else:
  43. _ReadyToRunProperty._remove(dn, DataNodeEditInProgress(dn.id))
  44. if not dn._last_edit_date:
  45. _ReadyToRunProperty._add(dn, DataNodeIsNotWritten(dn.id))
  46. else:
  47. _ReadyToRunProperty._remove(dn, DataNodeIsNotWritten(dn.id))
  48. return _recompute_is_ready_for_reading
  49. class DataNode(_Entity, _Labeled):
  50. """Reference to a dataset.
  51. A Data Node is an abstract class that holds metadata related to the data it refers to.
  52. In particular, a data node holds the name, the scope, the owner identifier, the last
  53. edit date, and some additional properties of the data.<br/>
  54. A Data Node also contains information and methods needed to access the dataset. This
  55. information depends on the type of storage, and it is held by subclasses (such as
  56. SQL Data Node, CSV Data Node, ...).
  57. !!! note
  58. It is not recommended to instantiate subclasses of `DataNode` directly. Instead,
  59. you have two ways:
  60. 1. Create a Scenario using the `create_scenario()^` function. Related data nodes
  61. will be created automatically. Please refer to the `Scenario^` class for more
  62. information.
  63. 2. Configure a `DataNodeConfig^` with the various configuration methods form `Config^`
  64. and use the `create_global_data_node()^` function as illustrated in the following
  65. example.
  66. A data node's attributes are populated based on its configuration `DataNodeConfig^`.
  67. !!! Example
  68. ```python
  69. import taipy as tp
  70. from taipy import Config
  71. if __name__ == "__main__":
  72. # Configure a global data node
  73. dataset_cfg = Config.configure_data_node("my_dataset", scope=tp.Scope.GLOBAL)
  74. # Instantiate a global data node
  75. dataset = tp.create_global_data_node(dataset_cfg)
  76. # Retrieve the list of all data nodes
  77. all_data_nodes = tp.get_data_nodes()
  78. # Write the data
  79. dataset.write("Hello, World!")
  80. # Read the data
  81. print(dataset.read())
  82. ```
  83. Attributes:
  84. config_id (str): Identifier of the data node configuration. It must be a valid Python
  85. identifier.
  86. scope (Scope^): The scope of this data node.
  87. id (str): The unique identifier of this data node.
  88. name (str): A user-readable name of this data node.
  89. owner_id (str): The identifier of the owner (sequence_id, scenario_id, cycle_id) or
  90. None.
  91. parent_ids (Optional[Set[str]]): The set of identifiers of the parent tasks.
  92. last_edit_date (datetime): The date and time of the last modification.
  93. edits (List[Edit^]): The list of Edits (an alias for dict) containing metadata about each
  94. data edition including but not limited to:
  95. <ul><li>timestamp: The time instant of the writing </li>
  96. <li>comments: Representation of a free text to explain or comment on a data change</li>
  97. <li>job_id: Only populated when the data node is written by a task execution and
  98. corresponds to the job's id.</li></ul>
  99. Additional metadata related to the edition made to the data node can also be provided in Edits.
  100. version (str): The string indicates the application version of the data node to
  101. instantiate. If not provided, the current version is used.
  102. validity_period (Optional[timedelta]): The duration implemented as a timedelta since the last edit date for
  103. which the data node can be considered up-to-date. Once the validity period has passed, the data node is
  104. considered stale and relevant tasks will run even if they are skippable (see the
  105. [Task orchestration](../../userman/sdm/task/index.md#task-configuration) page for more details).
  106. If _validity_period_ is set to `None`, the data node is always up-to-date.
  107. edit_in_progress (bool): True if the data node is locked for modification. False
  108. otherwise.
  109. editor_id (Optional[str]): The identifier of the user who is currently editing the data node.
  110. editor_expiration_date (Optional[datetime]): The expiration date of the editor lock.
  111. kwargs: A dictionary of additional properties.
  112. """
  113. _ID_PREFIX = "DATANODE"
  114. __ID_SEPARATOR = "_"
  115. _logger = _TaipyLogger._get_logger()
  116. _REQUIRED_PROPERTIES: List[str] = []
  117. _MANAGER_NAME: str = "data"
  118. _PATH_KEY = "path"
  119. __EDIT_TIMEOUT = 30
  120. _TAIPY_PROPERTIES: Set[str] = set()
  121. def __init__(
  122. self,
  123. config_id,
  124. scope: Scope = Scope(Scope.SCENARIO), # noqa: B008
  125. id: Optional[DataNodeId] = None,
  126. owner_id: Optional[str] = None,
  127. parent_ids: Optional[Set[str]] = None,
  128. last_edit_date: Optional[datetime] = None,
  129. edits: Optional[List[Edit]] = None,
  130. version: Optional[str] = None,
  131. validity_period: Optional[timedelta] = None,
  132. edit_in_progress: bool = False,
  133. editor_id: Optional[str] = None,
  134. editor_expiration_date: Optional[datetime] = None,
  135. **kwargs,
  136. ) -> None:
  137. self._config_id = _validate_id(config_id)
  138. self.id = id or self._new_id(self._config_id)
  139. self._owner_id = owner_id
  140. self._parent_ids = parent_ids or set()
  141. self._scope = scope
  142. self._last_edit_date: Optional[datetime] = last_edit_date
  143. self._edit_in_progress = edit_in_progress
  144. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  145. self._validity_period = validity_period
  146. self._editor_id: Optional[str] = editor_id
  147. self._editor_expiration_date: Optional[datetime] = editor_expiration_date
  148. # Track edits
  149. self._edits: List[Edit] = edits or []
  150. self._properties: _Properties = _Properties(self, **kwargs)
  151. @staticmethod
  152. def _new_id(config_id: str) -> DataNodeId:
  153. """Generate a unique datanode identifier."""
  154. return DataNodeId(
  155. DataNode.__ID_SEPARATOR.join([DataNode._ID_PREFIX, _validate_id(config_id), str(uuid.uuid4())])
  156. )
  157. @property
  158. def config_id(self):
  159. return self._config_id
  160. @property
  161. def owner_id(self):
  162. return self._owner_id
  163. def get_parents(self):
  164. """Get all parents of this data node."""
  165. from ... import core as tp
  166. return tp.get_parents(self)
  167. @property # type: ignore
  168. @_self_reload(_MANAGER_NAME)
  169. def parent_ids(self):
  170. """List of parent ids of this data node."""
  171. return self._parent_ids
  172. @property # type: ignore
  173. @_self_reload(_MANAGER_NAME)
  174. def edits(self):
  175. """Get all `Edit^`s of this data node."""
  176. return self._edits
  177. def get_last_edit(self) -> Optional[Edit]:
  178. """Get last `Edit^` of this data node.
  179. Returns:
  180. None if there has been no `Edit^` on this data node.
  181. """
  182. return self._edits[-1] if self._edits else None
  183. @property # type: ignore
  184. @_self_reload(_MANAGER_NAME)
  185. def last_edit_date(self):
  186. last_modified_datetime = self._get_last_modified_datetime(self._properties.get(self._PATH_KEY, None))
  187. if last_modified_datetime and last_modified_datetime > self._last_edit_date:
  188. return last_modified_datetime
  189. else:
  190. return self._last_edit_date
  191. @last_edit_date.setter # type: ignore
  192. @_update_ready_for_reading
  193. @_self_setter(_MANAGER_NAME)
  194. def last_edit_date(self, val):
  195. self._last_edit_date = val
  196. @property # type: ignore
  197. @_self_reload(_MANAGER_NAME)
  198. def scope(self):
  199. return self._scope
  200. @scope.setter # type: ignore
  201. @_self_setter(_MANAGER_NAME)
  202. def scope(self, val):
  203. self._scope = val
  204. @property # type: ignore
  205. @_self_reload(_MANAGER_NAME)
  206. def validity_period(self) -> Optional[timedelta]:
  207. return self._validity_period if self._validity_period else None
  208. @validity_period.setter # type: ignore
  209. @_self_setter(_MANAGER_NAME)
  210. def validity_period(self, val):
  211. self._validity_period = val
  212. @property # type: ignore
  213. @_self_reload(_MANAGER_NAME)
  214. def expiration_date(self) -> datetime:
  215. """Datetime instant of the expiration date of this data node."""
  216. last_edit_date = self.last_edit_date
  217. validity_period = self._validity_period
  218. if not last_edit_date:
  219. raise NoData(f"Data node {self.id} from config {self.config_id} has not been written yet.")
  220. return last_edit_date + validity_period if validity_period else last_edit_date
  221. @property # type: ignore
  222. def name(self) -> Optional[str]:
  223. return self.properties.get("name")
  224. @name.setter # type: ignore
  225. def name(self, val):
  226. self.properties["name"] = val
  227. @property
  228. def version(self):
  229. return self._version
  230. @property
  231. def cacheable(self):
  232. """Deprecated. Use `skippable` attribute of a `Task^` instead."""
  233. _warn_deprecated("cacheable", suggest="the skippable feature")
  234. return self.properties.get("cacheable", False)
  235. @cacheable.setter
  236. def cacheable(self, val):
  237. _warn_deprecated("cacheable", suggest="the skippable feature")
  238. @property # type: ignore
  239. @_self_reload(_MANAGER_NAME)
  240. def edit_in_progress(self):
  241. return self._edit_in_progress
  242. @edit_in_progress.setter # type: ignore
  243. @_update_ready_for_reading
  244. @_self_setter(_MANAGER_NAME)
  245. def edit_in_progress(self, val):
  246. self._edit_in_progress = val
  247. @property # type: ignore
  248. @_self_reload(_MANAGER_NAME)
  249. def editor_id(self):
  250. return self._editor_id
  251. @editor_id.setter # type: ignore
  252. @_self_setter(_MANAGER_NAME)
  253. def editor_id(self, val):
  254. self._editor_id = val
  255. @property # type: ignore
  256. @_self_reload(_MANAGER_NAME)
  257. def editor_expiration_date(self):
  258. return self._editor_expiration_date
  259. @editor_expiration_date.setter # type: ignore
  260. @_self_setter(_MANAGER_NAME)
  261. def editor_expiration_date(self, val):
  262. self._editor_expiration_date = val
  263. @property # type: ignore
  264. @_self_reload(_MANAGER_NAME)
  265. def job_ids(self):
  266. """List of the jobs having edited this data node."""
  267. return [edit.get("job_id") for edit in self.edits if edit.get("job_id")]
  268. @property
  269. def properties(self):
  270. """Dictionary of custom properties."""
  271. self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
  272. return self._properties
  273. def _get_user_properties(self) -> Dict[str, Any]:
  274. """Get user properties."""
  275. return {key: value for key, value in self.properties.items() if key not in self._TAIPY_PROPERTIES}
  276. def __eq__(self, other):
  277. return isinstance(other, DataNode) and self.id == other.id
  278. def __ne__(self, other):
  279. return not self == other
  280. def __hash__(self):
  281. return hash(self.id)
  282. def __getstate__(self):
  283. return vars(self)
  284. def __setstate__(self, state):
  285. vars(self).update(state)
  286. @classmethod
  287. def _get_last_modified_datetime(cls, path: Optional[str] = None) -> Optional[datetime]:
  288. if path and os.path.isfile(path):
  289. return datetime.fromtimestamp(os.path.getmtime(path))
  290. last_modified_datetime = None
  291. if path and os.path.isdir(path):
  292. for filename in os.listdir(path):
  293. filepath = os.path.join(path, filename)
  294. if os.path.isfile(filepath):
  295. file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
  296. if last_modified_datetime is None or file_mtime > last_modified_datetime:
  297. last_modified_datetime = file_mtime
  298. return last_modified_datetime
  299. @classmethod
  300. @abstractmethod
  301. def storage_type(cls) -> str:
  302. raise NotImplementedError
  303. def read_or_raise(self) -> Any:
  304. """Read the data referenced by this data node.
  305. Returns:
  306. The data referenced by this data node.
  307. Raises:
  308. NoData^: If the data has not been written yet.
  309. """
  310. if not self.last_edit_date:
  311. raise NoData(f"Data node {self.id} from config {self.config_id} has not been written yet.")
  312. return self._read()
  313. def read(self) -> Any:
  314. """Read the data referenced by this data node.
  315. Returns:
  316. The data referenced by this data node. None if the data has not been written yet.
  317. """
  318. try:
  319. return self.read_or_raise()
  320. except NoData:
  321. self._logger.warning(
  322. f"Data node {self.id} from config {self.config_id} is being read but has never been written."
  323. )
  324. return None
  325. def append(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
  326. """Append some data to this data node.
  327. Parameters:
  328. data (Any): The data to write to this data node.
  329. job_id (JobId^): An optional identifier of the writer.
  330. **kwargs (dict[str, any]): Extra information to attach to the edit document
  331. corresponding to this write.
  332. """
  333. from ._data_manager_factory import _DataManagerFactory
  334. self._append(data)
  335. self.track_edit(job_id=job_id, **kwargs)
  336. self.unlock_edit()
  337. _DataManagerFactory._build_manager()._set(self)
  338. def write(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
  339. """Write some data to this data node.
  340. Parameters:
  341. data (Any): The data to write to this data node.
  342. job_id (JobId^): An optional identifier of the writer.
  343. **kwargs (dict[str, any]): Extra information to attach to the edit document
  344. corresponding to this write.
  345. """
  346. from ._data_manager_factory import _DataManagerFactory
  347. self._write(data)
  348. self.track_edit(job_id=job_id, **kwargs)
  349. self.unlock_edit()
  350. _DataManagerFactory._build_manager()._set(self)
  351. def track_edit(self, **options):
  352. """Creates and adds a new entry in the edits attribute without writing the data.
  353. Parameters:
  354. options (dict[str, any]): track `timestamp`, `comments`, `job_id`. The others are user-custom, users can
  355. use options to attach any information to an external edit of a data node.
  356. """
  357. edit = {k: v for k, v in options.items() if v is not None}
  358. if "timestamp" not in edit:
  359. edit["timestamp"] = (
  360. self._get_last_modified_datetime(self._properties.get(self._PATH_KEY, None)) or datetime.now()
  361. )
  362. self.last_edit_date = edit.get("timestamp")
  363. self._edits.append(edit)
  364. def lock_edit(self, editor_id: Optional[str] = None):
  365. """Lock the data node modification.
  366. Note:
  367. The data node can be unlocked with the method `(DataNode.)unlock_edit()^`.
  368. Parameters:
  369. editor_id (Optional[str]): The editor's identifier.
  370. """
  371. if editor_id:
  372. if (
  373. self.edit_in_progress
  374. and self.editor_id != editor_id
  375. and self.editor_expiration_date
  376. and self.editor_expiration_date > datetime.now()
  377. ):
  378. raise DataNodeIsBeingEdited(self.id, self._editor_id)
  379. self.editor_id = editor_id # type: ignore
  380. self.editor_expiration_date = datetime.now() + timedelta(minutes=self.__EDIT_TIMEOUT) # type: ignore
  381. else:
  382. self.editor_id = None # type: ignore
  383. self.editor_expiration_date = None # type: ignore
  384. self.edit_in_progress = True # type: ignore
  385. def unlock_edit(self, editor_id: Optional[str] = None):
  386. """Unlocks the data node modification.
  387. Note:
  388. The data node can be locked with the method `(DataNode.)lock_edit()^`.
  389. Parameters:
  390. editor_id (Optional[str]): The editor's identifier.
  391. """
  392. if (
  393. editor_id
  394. and self.editor_id != editor_id
  395. and self.editor_expiration_date
  396. and self.editor_expiration_date > datetime.now()
  397. ):
  398. raise DataNodeIsBeingEdited(self.id, self._editor_id)
  399. self.editor_id = None
  400. self.editor_expiration_date = None
  401. self.edit_in_progress = False
  402. def filter(self, operators: Union[List, Tuple], join_operator=JoinOperator.AND):
  403. """Read and filter the data referenced by this data node.
  404. The data is filtered by the provided list of 3-tuples (key, value, `Operator^`).
  405. If multiple filter operators are provided, filtered data will be joined based on the
  406. join operator (*AND* or *OR*).
  407. Parameters:
  408. operators (Union[List[Tuple], Tuple]): A 3-element tuple or a list of 3-element tuples,
  409. each is in the form of (key, value, `Operator^`).
  410. join_operator (JoinOperator^): The operator used to join the multiple filter
  411. 3-tuples.
  412. Returns:
  413. The filtered data.
  414. Raises:
  415. NotImplementedError: If the data type is not supported.
  416. """
  417. data = self._read()
  418. return _FilterDataNode._filter(data, operators, join_operator)
  419. def __getitem__(self, item):
  420. data = self._read()
  421. return _FilterDataNode._filter_by_key(data, item)
  422. @abstractmethod
  423. def _read(self):
  424. raise NotImplementedError
  425. def _append(self, data):
  426. raise NotImplementedError
  427. @abstractmethod
  428. def _write(self, data):
  429. raise NotImplementedError
  430. @property # type: ignore
  431. @_self_reload(_MANAGER_NAME)
  432. def is_ready_for_reading(self) -> bool:
  433. """Indicate if this data node is ready for reading.
  434. Returns:
  435. False if the data is locked for modification or if the data has never been written.
  436. True otherwise.
  437. """
  438. if self._edit_in_progress:
  439. return False
  440. if not self._last_edit_date:
  441. # Never been written so it is not up-to-date
  442. return False
  443. return True
  444. @property # type: ignore
  445. @_self_reload(_MANAGER_NAME)
  446. def is_valid(self) -> bool:
  447. """Indicate if this data node is valid.
  448. Returns:
  449. False if the data ever been written or the expiration date has passed.<br/>
  450. True otherwise.
  451. """
  452. if not self._last_edit_date:
  453. # Never been written so it is not valid
  454. return False
  455. if not self._validity_period:
  456. # No validity period and has already been written, so it is valid
  457. return True
  458. if datetime.now() > self.expiration_date:
  459. # expiration_date has been passed
  460. return False
  461. return True
  462. @property
  463. def is_up_to_date(self) -> bool:
  464. """Indicate if this data node is up-to-date.
  465. Returns:
  466. False if a preceding data node has been updated before the selected data node
  467. or the selected data is invalid.<br/>
  468. True otherwise.
  469. """
  470. if self.is_valid:
  471. from ..scenario.scenario import Scenario
  472. from ..taipy import get_parents
  473. parent_scenarios: Set[Scenario] = get_parents(self)["scenario"] # type: ignore
  474. for parent_scenario in parent_scenarios:
  475. for ancestor_node in nx.ancestors(parent_scenario._build_dag(), self):
  476. if (
  477. isinstance(ancestor_node, DataNode)
  478. and ancestor_node.last_edit_date
  479. and ancestor_node.last_edit_date > self.last_edit_date
  480. ):
  481. return False
  482. return True
  483. return False
  484. @staticmethod
  485. def _class_map():
  486. def all_subclasses(cls):
  487. subclasses = set(cls.__subclasses__())
  488. for s in cls.__subclasses__():
  489. subclasses.update(all_subclasses(s))
  490. return subclasses
  491. class_map = {}
  492. for c in all_subclasses(DataNode):
  493. try:
  494. if c.storage_type() is not None:
  495. class_map[c.storage_type()] = c
  496. except NotImplementedError:
  497. pass
  498. return class_map
  499. def get_label(self) -> str:
  500. """Returns the data node simple label prefixed by its owner label.
  501. Returns:
  502. The label of the data node as a string.
  503. """
  504. return self._get_label()
  505. def get_simple_label(self) -> str:
  506. """Returns the data node simple label.
  507. Returns:
  508. The simple label of the data node as a string.
  509. """
  510. return self._get_simple_label()
  511. @_make_event.register(DataNode)
  512. def _make_event_for_datanode(
  513. data_node: DataNode,
  514. operation: EventOperation,
  515. /,
  516. attribute_name: Optional[str] = None,
  517. attribute_value: Optional[Any] = None,
  518. **kwargs,
  519. ) -> Event:
  520. metadata = {"config_id": data_node.config_id, "version": data_node._version, **kwargs}
  521. return Event(
  522. entity_type=EventEntityType.DATA_NODE,
  523. entity_id=data_node.id,
  524. operation=operation,
  525. attribute_name=attribute_name,
  526. attribute_value=attribute_value,
  527. metadata=metadata,
  528. )