data_node.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import os
  12. import uuid
  13. from abc import abstractmethod
  14. from datetime import datetime, timedelta
  15. from typing import Any, Dict, List, Optional, Set, Tuple, Union
  16. import networkx as nx
  17. from taipy.config.common._validate_id import _validate_id
  18. from taipy.config.common.scope import Scope
  19. from taipy.logger._taipy_logger import _TaipyLogger
  20. from .._entity._entity import _Entity
  21. from .._entity._labeled import _Labeled
  22. from .._entity._properties import _Properties
  23. from .._entity._reload import _Reloader, _self_reload, _self_setter
  24. from .._version._version_manager_factory import _VersionManagerFactory
  25. from ..common._warnings import _warn_deprecated
  26. from ..exceptions.exceptions import DataNodeIsBeingEdited, NoData
  27. from ..job.job_id import JobId
  28. from ..notification.event import Event, EventEntityType, EventOperation, _make_event
  29. from ._filter import _FilterDataNode
  30. from .data_node_id import DataNodeId, Edit
  31. from .operator import JoinOperator
  32. class DataNode(_Entity, _Labeled):
  33. """Reference to a dataset.
  34. A Data Node is an abstract class that holds metadata related to the dataset it refers to.
  35. In particular, a data node holds the name, the scope, the owner identifier, the last
  36. edit date, and some additional properties of the data.<br/>
  37. A Data Node also contains information and methods needed to access the dataset. This
  38. information depends on the type of storage, and it is held by subclasses (such as
  39. SQL Data Node, CSV Data Node, ...).
  40. !!! note
  41. It is recommended not to instantiate subclasses of `DataNode` directly.
  42. Attributes:
  43. config_id (str): Identifier of the data node configuration. It must be a valid Python
  44. identifier.
  45. scope (Scope^): The scope of this data node.
  46. id (str): The unique identifier of this data node.
  47. name (str): A user-readable name of this data node.
  48. owner_id (str): The identifier of the owner (sequence_id, scenario_id, cycle_id) or
  49. None.
  50. parent_ids (Optional[Set[str]]): The set of identifiers of the parent tasks.
  51. last_edit_date (datetime): The date and time of the last modification.
  52. edits (List[Edit^]): The list of Edits (an alias for dict) containing metadata about each
  53. data edition including but not limited to timestamp, comments, job_id:
  54. timestamp: The time instant of the writing
  55. comments: Representation of a free text to explain or comment on a data change
  56. job_id: Only populated when the data node is written by a task execution and corresponds to the job's id.
  57. Additional metadata related to the edition made to the data node can also be provided in Edits.
  58. version (str): The string indicates the application version of the data node to
  59. instantiate. If not provided, the current version is used.
  60. validity_period (Optional[timedelta]): The duration implemented as a timedelta since the last edit date for
  61. which the data node can be considered up-to-date. Once the validity period has passed, the data node is
  62. considered stale and relevant tasks will run even if they are skippable (see the
  63. [Task management page](../core/entities/task-mgt.md) for more details).
  64. If _validity_period_ is set to `None`, the data node is always up-to-date.
  65. edit_in_progress (bool): True if the data node is locked for modification. False
  66. otherwise.
  67. editor_id (Optional[str]): The identifier of the user who is currently editing the data node.
  68. editor_expiration_date (Optional[datetime]): The expiration date of the editor lock.
  69. kwargs: A dictionary of additional properties.
  70. """
  71. _ID_PREFIX = "DATANODE"
  72. __ID_SEPARATOR = "_"
  73. __logger = _TaipyLogger._get_logger()
  74. _REQUIRED_PROPERTIES: List[str] = []
  75. _MANAGER_NAME: str = "data"
  76. _PATH_KEY = "path"
  77. __EDIT_TIMEOUT = 30
  78. _TAIPY_PROPERTIES: Set[str] = set()
  79. def __init__(
  80. self,
  81. config_id,
  82. scope: Scope = Scope(Scope.SCENARIO), # noqa: B008
  83. id: Optional[DataNodeId] = None,
  84. owner_id: Optional[str] = None,
  85. parent_ids: Optional[Set[str]] = None,
  86. last_edit_date: Optional[datetime] = None,
  87. edits: Optional[List[Edit]] = None,
  88. version: Optional[str] = None,
  89. validity_period: Optional[timedelta] = None,
  90. edit_in_progress: bool = False,
  91. editor_id: Optional[str] = None,
  92. editor_expiration_date: Optional[datetime] = None,
  93. **kwargs,
  94. ) -> None:
  95. self._config_id = _validate_id(config_id)
  96. self.id = id or self._new_id(self._config_id)
  97. self._owner_id = owner_id
  98. self._parent_ids = parent_ids or set()
  99. self._scope = scope
  100. self._last_edit_date: Optional[datetime] = last_edit_date
  101. self._edit_in_progress = edit_in_progress
  102. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  103. self._validity_period = validity_period
  104. self._editor_id: Optional[str] = editor_id
  105. self._editor_expiration_date: Optional[datetime] = editor_expiration_date
  106. # Track edits
  107. self._edits = edits or []
  108. self._properties = _Properties(self, **kwargs)
  109. @staticmethod
  110. def _new_id(config_id: str) -> DataNodeId:
  111. """Generate a unique datanode identifier."""
  112. return DataNodeId(
  113. DataNode.__ID_SEPARATOR.join([DataNode._ID_PREFIX, _validate_id(config_id), str(uuid.uuid4())])
  114. )
  115. @property
  116. def config_id(self):
  117. return self._config_id
  118. @property
  119. def owner_id(self):
  120. return self._owner_id
  121. def get_parents(self):
  122. """Get all parents of this data node."""
  123. from ... import core as tp
  124. return tp.get_parents(self)
  125. @property # type: ignore
  126. @_self_reload(_MANAGER_NAME)
  127. def parent_ids(self):
  128. """List of parent ids of this data node."""
  129. return self._parent_ids
  130. @property # type: ignore
  131. @_self_reload(_MANAGER_NAME)
  132. def edits(self):
  133. """Get all `Edit^`s of this data node."""
  134. return self._edits
  135. def get_last_edit(self) -> Optional[Edit]:
  136. """Get last `Edit^` of this data node.
  137. Returns:
  138. None if there has been no `Edit^` on this data node.
  139. """
  140. return self._edits[-1] if self._edits else None
  141. @property # type: ignore
  142. @_self_reload(_MANAGER_NAME)
  143. def last_edit_date(self):
  144. last_modified_datetime = self._get_last_modified_datetime(self._properties.get(self._PATH_KEY, None))
  145. if last_modified_datetime and last_modified_datetime > self._last_edit_date:
  146. return last_modified_datetime
  147. else:
  148. return self._last_edit_date
  149. @last_edit_date.setter # type: ignore
  150. @_self_setter(_MANAGER_NAME)
  151. def last_edit_date(self, val):
  152. self._last_edit_date = val
  153. @property # type: ignore
  154. @_self_reload(_MANAGER_NAME)
  155. def scope(self):
  156. return self._scope
  157. @scope.setter # type: ignore
  158. @_self_setter(_MANAGER_NAME)
  159. def scope(self, val):
  160. self._scope = val
  161. @property # type: ignore
  162. @_self_reload(_MANAGER_NAME)
  163. def validity_period(self) -> Optional[timedelta]:
  164. return self._validity_period if self._validity_period else None
  165. @validity_period.setter # type: ignore
  166. @_self_setter(_MANAGER_NAME)
  167. def validity_period(self, val):
  168. self._validity_period = val
  169. @property # type: ignore
  170. @_self_reload(_MANAGER_NAME)
  171. def expiration_date(self) -> datetime:
  172. """Datetime instant of the expiration date of this data node."""
  173. last_edit_date = self.last_edit_date
  174. validity_period = self._validity_period
  175. if not last_edit_date:
  176. raise NoData(f"Data node {self.id} from config {self.config_id} has not been written yet.")
  177. return last_edit_date + validity_period if validity_period else last_edit_date
  178. @property # type: ignore
  179. def name(self) -> Optional[str]:
  180. return self.properties.get("name")
  181. @name.setter # type: ignore
  182. def name(self, val):
  183. self.properties["name"] = val
  184. @property
  185. def version(self):
  186. return self._version
  187. @property
  188. def cacheable(self):
  189. """Deprecated. Use `skippable` attribute of a `Task^` instead."""
  190. _warn_deprecated("cacheable", suggest="the skippable feature")
  191. return self.properties.get("cacheable", False)
  192. @cacheable.setter
  193. def cacheable(self, val):
  194. _warn_deprecated("cacheable", suggest="the skippable feature")
  195. @property # type: ignore
  196. @_self_reload(_MANAGER_NAME)
  197. def edit_in_progress(self):
  198. return self._edit_in_progress
  199. @edit_in_progress.setter # type: ignore
  200. @_self_setter(_MANAGER_NAME)
  201. def edit_in_progress(self, val):
  202. self._edit_in_progress = val
  203. @property # type: ignore
  204. @_self_reload(_MANAGER_NAME)
  205. def editor_id(self):
  206. return self._editor_id
  207. @editor_id.setter # type: ignore
  208. @_self_setter(_MANAGER_NAME)
  209. def editor_id(self, val):
  210. self._editor_id = val
  211. @property # type: ignore
  212. @_self_reload(_MANAGER_NAME)
  213. def editor_expiration_date(self):
  214. return self._editor_expiration_date
  215. @editor_expiration_date.setter # type: ignore
  216. @_self_setter(_MANAGER_NAME)
  217. def editor_expiration_date(self, val):
  218. self._editor_expiration_date = val
  219. @property # type: ignore
  220. @_self_reload(_MANAGER_NAME)
  221. def job_ids(self):
  222. """List of the jobs having edited this data node."""
  223. return [edit.get("job_id") for edit in self.edits if edit.get("job_id")]
  224. @property
  225. def properties(self):
  226. """Dictionary of custom properties."""
  227. self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
  228. return self._properties
  229. def _get_user_properties(self) -> Dict[str, Any]:
  230. """Get user properties."""
  231. return {key: value for key, value in self.properties.items() if key not in self._TAIPY_PROPERTIES}
  232. def __eq__(self, other):
  233. return isinstance(other, DataNode) and self.id == other.id
  234. def __ne__(self, other):
  235. return not self == other
  236. def __hash__(self):
  237. return hash(self.id)
  238. def __getstate__(self):
  239. return vars(self)
  240. def __setstate__(self, state):
  241. vars(self).update(state)
  242. def __getattr__(self, attribute_name):
  243. protected_attribute_name = _validate_id(attribute_name)
  244. if protected_attribute_name in self._properties:
  245. return self._properties[protected_attribute_name]
  246. raise AttributeError(f"{attribute_name} is not an attribute of data node {self.id}")
  247. @classmethod
  248. def _get_last_modified_datetime(cls, path: Optional[str] = None) -> Optional[datetime]:
  249. if path and os.path.isfile(path):
  250. return datetime.fromtimestamp(os.path.getmtime(path))
  251. last_modified_datetime = None
  252. if path and os.path.isdir(path):
  253. for filename in os.listdir(path):
  254. filepath = os.path.join(path, filename)
  255. if os.path.isfile(filepath):
  256. file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
  257. if last_modified_datetime is None or file_mtime > last_modified_datetime:
  258. last_modified_datetime = file_mtime
  259. return last_modified_datetime
  260. @classmethod
  261. @abstractmethod
  262. def storage_type(cls) -> str:
  263. raise NotImplementedError
  264. def read_or_raise(self) -> Any:
  265. """Read the data referenced by this data node.
  266. Returns:
  267. The data referenced by this data node.
  268. Raises:
  269. NoData^: If the data has not been written yet.
  270. """
  271. if not self.last_edit_date:
  272. raise NoData(f"Data node {self.id} from config {self.config_id} has not been written yet.")
  273. return self._read()
  274. def read(self) -> Any:
  275. """Read the data referenced by this data node.
  276. Returns:
  277. The data referenced by this data node. None if the data has not been written yet.
  278. """
  279. try:
  280. return self.read_or_raise()
  281. except NoData:
  282. self.__logger.warning(
  283. f"Data node {self.id} from config {self.config_id} is being read but has never been " f"written."
  284. )
  285. return None
  286. def append(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
  287. """Append some data to this data node.
  288. Parameters:
  289. data (Any): The data to write to this data node.
  290. job_id (JobId^): An optional identifier of the writer.
  291. **kwargs (dict[str, any]): Extra information to attach to the edit document
  292. corresponding to this write.
  293. """
  294. from ._data_manager_factory import _DataManagerFactory
  295. self._append(data)
  296. self.track_edit(job_id=job_id, **kwargs)
  297. self.unlock_edit()
  298. _DataManagerFactory._build_manager()._set(self)
  299. def write(self, data, job_id: Optional[JobId] = None, **kwargs: Dict[str, Any]):
  300. """Write some data to this data node.
  301. Parameters:
  302. data (Any): The data to write to this data node.
  303. job_id (JobId^): An optional identifier of the writer.
  304. **kwargs (dict[str, any]): Extra information to attach to the edit document
  305. corresponding to this write.
  306. """
  307. from ._data_manager_factory import _DataManagerFactory
  308. self._write(data)
  309. self.track_edit(job_id=job_id, **kwargs)
  310. self.unlock_edit()
  311. _DataManagerFactory._build_manager()._set(self)
  312. def track_edit(self, **options):
  313. """Creates and adds a new entry in the edits attribute without writing the data.
  314. Parameters:
  315. options (dict[str, any)): track `timestamp`, `comments`, `job_id`. The others are user-custom, users can
  316. use options to attach any information to an external edit of a data node.
  317. """
  318. edit = {k: v for k, v in options.items() if v is not None}
  319. if "timestamp" not in edit:
  320. edit["timestamp"] = (
  321. self._get_last_modified_datetime(self._properties.get(self._PATH_KEY, None)) or datetime.now()
  322. )
  323. self.last_edit_date = edit.get("timestamp")
  324. self._edits.append(edit)
  325. def lock_edit(self, editor_id: Optional[str] = None):
  326. """Lock the data node modification.
  327. Note:
  328. The data node can be unlocked with the method `(DataNode.)unlock_edit()^`.
  329. Parameters:
  330. editor_id (Optional[str]): The editor's identifier.
  331. """
  332. if editor_id:
  333. if (
  334. self.edit_in_progress
  335. and self.editor_id != editor_id
  336. and self.editor_expiration_date
  337. and self.editor_expiration_date > datetime.now()
  338. ):
  339. raise DataNodeIsBeingEdited(self.id, self._editor_id)
  340. self.editor_id = editor_id # type: ignore
  341. self.editor_expiration_date = datetime.now() + timedelta(minutes=self.__EDIT_TIMEOUT) # type: ignore
  342. else:
  343. self.editor_id = None # type: ignore
  344. self.editor_expiration_date = None # type: ignore
  345. self.edit_in_progress = True # type: ignore
  346. def unlock_edit(self, editor_id: Optional[str] = None):
  347. """Unlocks the data node modification.
  348. Note:
  349. The data node can be locked with the method `(DataNode.)lock_edit()^`.
  350. Parameters:
  351. editor_id (Optional[str]): The editor's identifier.
  352. """
  353. if (
  354. editor_id
  355. and self.editor_id != editor_id
  356. and self.editor_expiration_date
  357. and self.editor_expiration_date > datetime.now()
  358. ):
  359. raise DataNodeIsBeingEdited(self.id, self._editor_id)
  360. self.editor_id = None
  361. self.editor_expiration_date = None
  362. self.edit_in_progress = False
  363. def filter(self, operators: Union[List, Tuple], join_operator=JoinOperator.AND):
  364. """Read and filter the data referenced by this data node.
  365. The data is filtered by the provided list of 3-tuples (key, value, `Operator^`).
  366. If multiple filter operators are provided, filtered data will be joined based on the
  367. join operator (*AND* or *OR*).
  368. Parameters:
  369. operators (Union[List[Tuple], Tuple]): A 3-element tuple or a list of 3-element tuples,
  370. each is in the form of (key, value, `Operator^`).
  371. join_operator (JoinOperator^): The operator used to join the multiple filter
  372. 3-tuples.
  373. Returns:
  374. The filtered data.
  375. Raises:
  376. NotImplementedError: If the data type is not supported.
  377. """
  378. data = self._read()
  379. return _FilterDataNode._filter(data, operators, join_operator)
  380. def __getitem__(self, item):
  381. data = self._read()
  382. return _FilterDataNode._filter_by_key(data, item)
  383. @abstractmethod
  384. def _read(self):
  385. raise NotImplementedError
  386. def _append(self, data):
  387. raise NotImplementedError
  388. @abstractmethod
  389. def _write(self, data):
  390. raise NotImplementedError
  391. @property # type: ignore
  392. @_self_reload(_MANAGER_NAME)
  393. def is_ready_for_reading(self) -> bool:
  394. """Indicate if this data node is ready for reading.
  395. Returns:
  396. False if the data is locked for modification or if the data has never been written.
  397. True otherwise.
  398. """
  399. if self._edit_in_progress:
  400. return False
  401. if not self._last_edit_date:
  402. # Never been written so it is not up-to-date
  403. return False
  404. return True
  405. @property # type: ignore
  406. @_self_reload(_MANAGER_NAME)
  407. def is_valid(self) -> bool:
  408. """Indicate if this data node is valid.
  409. Returns:
  410. False if the data ever been written or the expiration date has passed.<br/>
  411. True otherwise.
  412. """
  413. if not self._last_edit_date:
  414. # Never been written so it is not valid
  415. return False
  416. if not self._validity_period:
  417. # No validity period and has already been written, so it is valid
  418. return True
  419. if datetime.now() > self.expiration_date:
  420. # expiration_date has been passed
  421. return False
  422. return True
  423. @property
  424. def is_up_to_date(self) -> bool:
  425. """Indicate if this data node is up-to-date.
  426. Returns:
  427. False if a preceding data node has been updated before the selected data node
  428. or the selected data is invalid.<br/>
  429. True otherwise.
  430. """
  431. if self.is_valid:
  432. from ..scenario.scenario import Scenario
  433. from ..taipy import get_parents
  434. parent_scenarios: Set[Scenario] = get_parents(self)["scenario"] # type: ignore
  435. for parent_scenario in parent_scenarios:
  436. for ancestor_node in nx.ancestors(parent_scenario._build_dag(), self):
  437. if (
  438. isinstance(ancestor_node, DataNode)
  439. and ancestor_node.last_edit_date
  440. and ancestor_node.last_edit_date > self.last_edit_date
  441. ):
  442. return False
  443. return True
  444. return False
  445. @staticmethod
  446. def _class_map():
  447. def all_subclasses(cls):
  448. subclasses = set(cls.__subclasses__())
  449. for s in cls.__subclasses__():
  450. subclasses.update(all_subclasses(s))
  451. return subclasses
  452. class_map = {}
  453. for c in all_subclasses(DataNode):
  454. try:
  455. if c.storage_type() is not None:
  456. class_map[c.storage_type()] = c
  457. except NotImplementedError:
  458. pass
  459. return class_map
  460. def get_label(self) -> str:
  461. """Returns the data node simple label prefixed by its owner label.
  462. Returns:
  463. The label of the data node as a string.
  464. """
  465. return self._get_label()
  466. def get_simple_label(self) -> str:
  467. """Returns the data node simple label.
  468. Returns:
  469. The simple label of the data node as a string.
  470. """
  471. return self._get_simple_label()
  472. @_make_event.register(DataNode)
  473. def _make_event_for_datanode(
  474. data_node: DataNode,
  475. operation: EventOperation,
  476. /,
  477. attribute_name: Optional[str] = None,
  478. attribute_value: Optional[Any] = None,
  479. **kwargs,
  480. ) -> Event:
  481. metadata = {"config_id": data_node.config_id, "version": data_node._version, **kwargs}
  482. return Event(
  483. entity_type=EventEntityType.DATA_NODE,
  484. entity_id=data_node.id,
  485. operation=operation,
  486. attribute_name=attribute_name,
  487. attribute_value=attribute_value,
  488. metadata=metadata,
  489. )