1
0

data_node.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import functools
  12. import os
  13. import typing
  14. import uuid
  15. from abc import abstractmethod
  16. from datetime import datetime, timedelta
  17. from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast
  18. import networkx as nx
  19. from taipy.common.config import Config
  20. from taipy.common.config.common._validate_id import _validate_id
  21. from taipy.common.logger._taipy_logger import _TaipyLogger
  22. from .._entity._entity import _Entity
  23. from .._entity._labeled import _Labeled
  24. from .._entity._properties import _Properties
  25. from .._entity._ready_to_run_property import _ReadyToRunProperty
  26. from .._entity._reload import _Reloader, _self_reload, _self_setter
  27. from .._version._version_manager_factory import _VersionManagerFactory
  28. from ..common.scope import Scope
  29. from ..exceptions.exceptions import DataNodeIsBeingEdited, NoData
  30. from ..job.job_id import JobId
  31. from ..notification.event import Event, EventEntityType, EventOperation, _make_event
  32. from ..reason import DataNodeEditInProgress, DataNodeIsNotWritten
  33. from ._filter import _FilterDataNode
  34. from .data_node_id import EDIT_COMMENT_KEY, EDIT_EDITOR_ID_KEY, EDIT_JOB_ID_KEY, EDIT_TIMESTAMP_KEY, DataNodeId, Edit
  35. from .operator import JoinOperator
  36. def _update_ready_for_reading(fct):
  37. # This decorator must be wrapped before self_setter decorator as self_setter will run the function twice.
  38. @functools.wraps(fct)
  39. def _recompute_is_ready_for_reading(dn: "DataNode", *args, **kwargs):
  40. fct(dn, *args, **kwargs)
  41. if dn._edit_in_progress:
  42. _ReadyToRunProperty._add(dn, DataNodeEditInProgress(dn.id))
  43. else:
  44. _ReadyToRunProperty._remove(dn, DataNodeEditInProgress(dn.id))
  45. if not dn._last_edit_date:
  46. _ReadyToRunProperty._add(dn, DataNodeIsNotWritten(dn.id))
  47. else:
  48. _ReadyToRunProperty._remove(dn, DataNodeIsNotWritten(dn.id))
  49. return _recompute_is_ready_for_reading
  50. class DataNode(_Entity, _Labeled):
  51. """Reference to a dataset.
  52. A Data Node is an abstract class that holds metadata related to the data it refers to.
  53. In particular, a data node holds the name, the scope, the owner identifier, the last
  54. edit date, and some additional properties of the data.<br/>
  55. A Data Node also contains information and methods needed to access the dataset. This
  56. information depends on the type of storage, and it is held by subclasses (such as
  57. SQL Data Node, CSV Data Node, ...).
  58. !!! note
  59. It is not recommended to instantiate subclasses of `DataNode` directly. Instead,
  60. you have two ways:
  61. 1. Create a Scenario using the `create_scenario()^` function. Related data nodes
  62. will be created automatically. Please refer to the `Scenario^` class for more
  63. information.
  64. 2. Configure a `DataNodeConfig^` with the various configuration methods form `Config^`
  65. and use the `create_global_data_node()^` function as illustrated in the following
  66. example.
  67. A data node's attributes are populated based on its configuration `DataNodeConfig^`.
  68. !!! Example
  69. ```python
  70. import taipy as tp
  71. from taipy import Config
  72. if __name__ == "__main__":
  73. # Configure a global data node
  74. dataset_cfg = Config.configure_data_node("my_dataset", scope=tp.Scope.GLOBAL)
  75. # Instantiate a global data node
  76. dataset = tp.create_global_data_node(dataset_cfg)
  77. # Retrieve the list of all data nodes
  78. all_data_nodes = tp.get_data_nodes()
  79. # Write the data
  80. dataset.write("Hello, World!")
  81. # Read the data
  82. print(dataset.read())
  83. ```
  84. """
  85. _ID_PREFIX = "DATANODE"
  86. __ID_SEPARATOR = "_"
  87. _MANAGER_NAME: str = "data"
  88. _logger = _TaipyLogger._get_logger()
  89. _REQUIRED_PROPERTIES: List[str] = []
  90. _PATH_KEY = "path"
  91. __EDIT_TIMEOUT = 30
  92. _TAIPY_PROPERTIES: Set[str] = set()
  93. id: DataNodeId
  94. """The unique identifier of the data node."""
  95. def __init__(
  96. self,
  97. config_id,
  98. scope: Scope = Scope(Scope.SCENARIO), # noqa: B008
  99. id: Optional[DataNodeId] = None,
  100. owner_id: Optional[str] = None,
  101. parent_ids: Optional[Set[str]] = None,
  102. last_edit_date: Optional[datetime] = None,
  103. edits: Optional[List[Edit]] = None,
  104. version: Optional[str] = None,
  105. validity_period: Optional[timedelta] = None,
  106. edit_in_progress: bool = False,
  107. editor_id: Optional[str] = None,
  108. editor_expiration_date: Optional[datetime] = None,
  109. **kwargs,
  110. ) -> None:
  111. self._config_id = _validate_id(config_id)
  112. self.id = id or self._new_id(self._config_id)
  113. self._owner_id = owner_id
  114. self._parent_ids = parent_ids or set()
  115. self._scope = scope
  116. self._last_edit_date: Optional[datetime] = last_edit_date
  117. self._edit_in_progress = edit_in_progress
  118. self._version = version or _VersionManagerFactory._build_manager()._get_latest_version()
  119. self._validity_period = validity_period
  120. self._editor_id: Optional[str] = editor_id
  121. self._editor_expiration_date: Optional[datetime] = editor_expiration_date
  122. # Track edits
  123. self._edits: List[Edit] = edits or []
  124. self._properties: _Properties = _Properties(self, **kwargs)
  125. def __eq__(self, other) -> bool:
  126. """Check if two data nodes are equal."""
  127. return isinstance(other, DataNode) and self.id == other.id
  128. def __ne__(self, other) -> bool:
  129. """Check if two data nodes are different."""
  130. return not self == other
  131. def __hash__(self) -> int:
  132. """Hash the data node."""
  133. return hash(self.id)
  134. def __getstate__(self) -> Dict[str, Any]:
  135. return vars(self)
  136. def __setstate__(self, state) -> None:
  137. vars(self).update(state)
  138. def __getitem__(self, item) -> Any:
  139. data = self._read()
  140. return _FilterDataNode._filter_by_key(data, item)
  141. @property
  142. def config_id(self) -> str:
  143. """Identifier of the data node configuration. It must be a valid Python identifier."""
  144. return self._config_id
  145. @property
  146. def owner_id(self) -> Optional[str]:
  147. """The identifier of the owner (scenario_id, cycle_id or None)."""
  148. return self._owner_id
  149. @property # type: ignore
  150. @_self_reload(_MANAGER_NAME)
  151. def parent_ids(self) -> Set[str]:
  152. """The set of identifiers of the parent tasks."""
  153. return self._parent_ids
  154. @property # type: ignore
  155. @_self_reload(_MANAGER_NAME)
  156. def edits(self) -> List[Edit]:
  157. """The list of Edits.
  158. The list of Edits (an alias for dict) containing metadata about each
  159. data edition including but not limited to:
  160. <ul><li>timestamp: The time instant of the writing </li>
  161. <li>comments: Representation of a free text to explain or comment on a data change</li>
  162. <li>job_id: Only populated when the data node is written by a task execution and
  163. corresponds to the job's id.</li></ul>
  164. Additional metadata related to the edition made to the data node can also be provided in Edits.
  165. """
  166. return self._edits
  167. @edits.setter # type: ignore
  168. @_self_setter(_MANAGER_NAME)
  169. def edits(self, val):
  170. self._edits = val
  171. @property # type: ignore
  172. @_self_reload(_MANAGER_NAME)
  173. def last_edit_date(self) -> Optional[datetime]:
  174. """The date and time of the last modification."""
  175. last_modified_datetime = self._get_last_modified_datetime(self._properties.get(self._PATH_KEY, None))
  176. if last_modified_datetime and last_modified_datetime > self._last_edit_date: # type: ignore
  177. return last_modified_datetime
  178. else:
  179. return self._last_edit_date
  180. @last_edit_date.setter # type: ignore
  181. @_update_ready_for_reading
  182. @_self_setter(_MANAGER_NAME)
  183. def last_edit_date(self, val):
  184. self._last_edit_date = val
  185. @property # type: ignore
  186. @_self_reload(_MANAGER_NAME)
  187. def scope(self) -> Scope:
  188. """The data node scope."""
  189. return self._scope
  190. @scope.setter # type: ignore
  191. @_self_setter(_MANAGER_NAME)
  192. def scope(self, val):
  193. self._scope = val
  194. @property # type: ignore
  195. @_self_reload(_MANAGER_NAME)
  196. def validity_period(self) -> Optional[timedelta]:
  197. """The duration since the last edit date for which the data node is considered up-to-date.
  198. The duration implemented as a timedelta since the last edit date for which the data node
  199. can be considered up-to-date. Once the validity period has passed, the data node is
  200. considered stale and relevant tasks will run even if they are skippable (see the
  201. Task orchestration page of the user manual for more details).
  202. If _validity_period_ is set to `None`, the data node is always up-to-date.
  203. """
  204. return self._validity_period if self._validity_period else None
  205. @validity_period.setter # type: ignore
  206. @_self_setter(_MANAGER_NAME)
  207. def validity_period(self, val):
  208. self._validity_period = val
  209. @property # type: ignore
  210. @_self_reload(_MANAGER_NAME)
  211. def expiration_date(self) -> datetime:
  212. """Datetime instant of the expiration date of this data node."""
  213. last_edit_date = self.last_edit_date
  214. validity_period = self._validity_period
  215. if not last_edit_date:
  216. raise NoData(f"Data node {self.id} from config {self.config_id} has not been written yet.")
  217. return last_edit_date + validity_period if validity_period else last_edit_date
  218. @property # type: ignore
  219. def name(self) -> Optional[str]:
  220. """A human-readable name of the data node."""
  221. return self.properties.get("name")
  222. @name.setter # type: ignore
  223. def name(self, val):
  224. self.properties["name"] = val
  225. @property
  226. def version(self) -> str:
  227. """The string indicates the application version of the data node to instantiate.
  228. If not provided, the current version is used.
  229. """
  230. return self._version
  231. @property # type: ignore
  232. @_self_reload(_MANAGER_NAME)
  233. def edit_in_progress(self) -> bool:
  234. """True if the data node is locked for modification. False otherwise."""
  235. return self._edit_in_progress
  236. @edit_in_progress.setter # type: ignore
  237. @_update_ready_for_reading
  238. @_self_setter(_MANAGER_NAME)
  239. def edit_in_progress(self, val):
  240. self._edit_in_progress = val
  241. @property # type: ignore
  242. @_self_reload(_MANAGER_NAME)
  243. def editor_id(self) -> Optional[str]:
  244. """The identifier of the user who is currently editing the data node."""
  245. return self._editor_id
  246. @editor_id.setter # type: ignore
  247. @_self_setter(_MANAGER_NAME)
  248. def editor_id(self, val):
  249. self._editor_id = val
  250. @property # type: ignore
  251. @_self_reload(_MANAGER_NAME)
  252. def editor_expiration_date(self) -> Optional[datetime]:
  253. """The expiration date of the editor lock."""
  254. return self._editor_expiration_date
  255. @editor_expiration_date.setter # type: ignore
  256. @_self_setter(_MANAGER_NAME)
  257. def editor_expiration_date(self, val):
  258. self._editor_expiration_date = val
  259. @property # type: ignore
  260. @_self_reload(_MANAGER_NAME)
  261. def job_ids(self) -> List[JobId]:
  262. """List of the jobs having edited this data node."""
  263. return [job_id for edit in self.edits if (job_id := edit.get("job_id"))]
  264. @property
  265. def properties(self):
  266. """Dictionary of custom properties."""
  267. self._properties = _Reloader()._reload(self._MANAGER_NAME, self)._properties
  268. return self._properties
  269. @property # type: ignore
  270. @_self_reload(_MANAGER_NAME)
  271. def is_ready_for_reading(self) -> bool:
  272. """Indicate if this data node is ready for reading.
  273. False if the data is locked for modification or if the data has never been written.
  274. True otherwise.
  275. """
  276. if self._edit_in_progress:
  277. return False
  278. if not self._last_edit_date:
  279. # Never been written so it is not up-to-date
  280. return False
  281. return True
  282. @property # type: ignore
  283. @_self_reload(_MANAGER_NAME)
  284. def is_valid(self) -> bool:
  285. """Indicate if this data node is valid.
  286. False if the data ever been written or the expiration date has passed.<br/>
  287. True otherwise.
  288. """
  289. if not self._last_edit_date:
  290. # Never been written so it is not valid
  291. return False
  292. if not self._validity_period:
  293. # No validity period and has already been written, so it is valid
  294. return True
  295. if datetime.now() > self.expiration_date:
  296. # expiration_date has been passed
  297. return False
  298. return True
  299. @property
  300. def is_up_to_date(self) -> bool:
  301. """Indicate if this data node is up-to-date.
  302. False if a preceding data node has been updated before the selected data node
  303. or the selected data is invalid.<br/>
  304. True otherwise.
  305. """
  306. if self.is_valid:
  307. from ..scenario.scenario import Scenario
  308. from ..taipy import get_parents
  309. parent_scenarios: Set[Scenario] = get_parents(self)["scenario"] # type: ignore
  310. for parent_scenario in parent_scenarios:
  311. for ancestor_node in nx.ancestors(parent_scenario._build_dag(), self):
  312. if (
  313. isinstance(ancestor_node, DataNode)
  314. and ancestor_node.last_edit_date
  315. and ancestor_node.last_edit_date > cast(datetime, self.last_edit_date)
  316. ):
  317. return False
  318. return True
  319. return False
  320. @classmethod
  321. @abstractmethod
  322. def storage_type(cls) -> str:
  323. """The storage type of the data node.
  324. Each subclass must implement this method exposing the data node storage type.
  325. """
  326. raise NotImplementedError
  327. def read_or_raise(self) -> Any:
  328. """Read the data referenced by this data node.
  329. Returns:
  330. The data referenced by this data node.
  331. Raises:
  332. NoData^: If the data has not been written yet.
  333. """
  334. if not self.last_edit_date:
  335. raise NoData(f"Data node {self.id} from config {self.config_id} has not been written yet.")
  336. return self._read()
  337. def read(self) -> Any:
  338. """Read the data referenced by this data node.
  339. Returns:
  340. The data referenced by this data node. None if the data has not been written yet.
  341. """
  342. try:
  343. return self.read_or_raise()
  344. except NoData:
  345. self._logger.warning(
  346. f"Data node {self.id} from config {self.config_id} is being read but has never been written."
  347. )
  348. return None
  349. def append(self, data, editor_id: Optional[str] = None, comment: Optional[str] = None, **kwargs: Any):
  350. """Append some data to this data node.
  351. Arguments:
  352. data (Any): The data to write to this data node.
  353. editor_id (str): An optional identifier of the editor.
  354. comment (str): An optional comment to attach to the edit document.
  355. **kwargs (Any): Extra information to attach to the edit document
  356. corresponding to this write.
  357. """
  358. from ._data_manager_factory import _DataManagerFactory
  359. if (editor_id
  360. and self.edit_in_progress
  361. and self.editor_id != editor_id
  362. and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
  363. raise DataNodeIsBeingEdited(self.id, self.editor_id)
  364. self._append(data)
  365. self.track_edit(editor_id=editor_id, comment=comment, **kwargs)
  366. self.unlock_edit()
  367. _DataManagerFactory._build_manager()._set(self)
  368. def write(self,
  369. data,
  370. job_id: Optional[JobId] = None,
  371. editor_id: Optional[str] = None,
  372. comment: Optional[str] = None,
  373. **kwargs: Any):
  374. """Write some data to this data node.
  375. once the data is written, the data node is unlocked and the edit is tracked.
  376. Arguments:
  377. data (Any): The data to write to this data node.
  378. job_id (JobId): An optional identifier of the job writing the data.
  379. editor_id (str): An optional identifier of the editor writing the data.
  380. comment (str): An optional comment to attach to the edit document.
  381. **kwargs (Any): Extra information to attach to the edit document
  382. corresponding to this write.
  383. """
  384. if (editor_id
  385. and self.edit_in_progress
  386. and self.editor_id != editor_id
  387. and (not self.editor_expiration_date or self.editor_expiration_date > datetime.now())):
  388. raise DataNodeIsBeingEdited(self.id, self.editor_id)
  389. self._write(data)
  390. self.track_edit(job_id=job_id, editor_id=editor_id, comment=comment, **kwargs)
  391. self.unlock_edit()
  392. from ._data_manager_factory import _DataManagerFactory
  393. _DataManagerFactory._build_manager()._set(self)
  394. def track_edit(self,
  395. job_id: Optional[str] = None,
  396. editor_id: Optional[str] = None,
  397. timestamp: Optional[datetime] = None,
  398. comment: Optional[str] = None,
  399. **options: Any):
  400. """Creates and adds a new entry in the edits attribute without writing the data.
  401. Arguments:
  402. job_id (Optional[str]): The optional identifier of the job writing the data.
  403. editor_id (Optional[str]): The optional identifier of the editor writing the data.
  404. timestamp (Optional[datetime]): The optional timestamp of the edit. If not provided, the
  405. current time is used.
  406. comment (Optional[str]): The optional comment of the edit.
  407. **options (Any): User-custom attributes to attach to the edit.
  408. """
  409. edit = {k: v for k, v in options.items() if v is not None}
  410. if job_id:
  411. edit[EDIT_JOB_ID_KEY] = job_id
  412. if editor_id:
  413. edit[EDIT_EDITOR_ID_KEY] = editor_id
  414. if comment:
  415. edit[EDIT_COMMENT_KEY] = comment
  416. if not timestamp:
  417. timestamp = self._get_last_modified_datetime(self._properties.get(self._PATH_KEY)) or datetime.now()
  418. edit[EDIT_TIMESTAMP_KEY] = timestamp
  419. self.last_edit_date = edit.get(EDIT_TIMESTAMP_KEY)
  420. self._edits.append(typing.cast(Edit, edit))
  421. self.edits = self._edits
  422. def lock_edit(self, editor_id: Optional[str] = None):
  423. """Lock the data node modification.
  424. Note:
  425. The data node can be unlocked with the method `(DataNode.)unlock_edit()^`.
  426. Arguments:
  427. editor_id (Optional[str]): The editor's identifier.
  428. """
  429. if editor_id:
  430. if (
  431. self.edit_in_progress
  432. and self.editor_id != editor_id
  433. and self.editor_expiration_date
  434. and self.editor_expiration_date > datetime.now()
  435. ):
  436. raise DataNodeIsBeingEdited(self.id, self._editor_id)
  437. self.editor_id = editor_id # type: ignore
  438. self.editor_expiration_date = datetime.now() + timedelta(minutes=self.__EDIT_TIMEOUT) # type: ignore
  439. else:
  440. self.editor_id = None # type: ignore
  441. self.editor_expiration_date = None # type: ignore
  442. self.edit_in_progress = True # type: ignore
  443. def unlock_edit(self, editor_id: Optional[str] = None):
  444. """Unlocks the data node modification.
  445. Note:
  446. The data node can be locked with the method `(DataNode.)lock_edit()^`.
  447. Arguments:
  448. editor_id (Optional[str]): The editor's identifier.
  449. """
  450. if (
  451. editor_id
  452. and self.editor_id != editor_id
  453. and self.editor_expiration_date
  454. and self.editor_expiration_date > datetime.now()
  455. ):
  456. raise DataNodeIsBeingEdited(self.id, self._editor_id)
  457. self.editor_id = None
  458. self.editor_expiration_date = None
  459. self.edit_in_progress = False
  460. def filter(self, operators: Union[List, Tuple], join_operator=JoinOperator.AND) -> Any:
  461. """Read and filter the data referenced by this data node.
  462. The data is filtered by the provided list of 3-tuples (key, value, `Operator^`).
  463. If multiple filter operators are provided, filtered data will be joined based on the
  464. join operator (*AND* or *OR*).
  465. Arguments:
  466. operators (Union[List[Tuple], Tuple]): A 3-element tuple or a list of 3-element tuples,
  467. each is in the form of (key, value, `Operator^`).
  468. join_operator (JoinOperator^): The operator used to join the multiple filter
  469. 3-tuples.
  470. Returns:
  471. The filtered data.
  472. Raises:
  473. NotImplementedError: If the data type is not supported.
  474. """
  475. data = self._read()
  476. return _FilterDataNode._filter(data, operators, join_operator)
  477. def get_label(self) -> str:
  478. """Returns the data node simple label prefixed by its owner label.
  479. Returns:
  480. The label of the data node as a string.
  481. """
  482. return self._get_label()
  483. def get_simple_label(self) -> str:
  484. """Returns the data node simple label.
  485. Returns:
  486. The simple label of the data node as a string.
  487. """
  488. return self._get_simple_label()
  489. def get_parents(self) -> Dict[str, Set[_Entity]]:
  490. """Get all parents of this data node.
  491. Returns:
  492. The dictionary of all parent entities.
  493. They are grouped by their type (Scenario^, Sequences^, or tasks^) so each key corresponds
  494. to a level of the parents and the value is a set of the parent entities.
  495. An empty dictionary is returned if the entity does not have parents.
  496. """
  497. from ... import core as tp
  498. return tp.get_parents(self)
  499. def get_last_edit(self) -> Optional[Edit]:
  500. """Get last `Edit` of this data node.
  501. Returns:
  502. None if there has been no `Edit` on this data node.
  503. """
  504. return self._edits[-1] if self._edits else None
  505. def _get_rank(self, scenario_config_id: str) -> int:
  506. """Get the data node rank for given scenario config.
  507. The rank corresponds to the order of appearance of the data nodes in a scenario config DAG.
  508. Arguments:
  509. scenario_config_id (str): The identifier of the scenario config used to
  510. get the data node rank.
  511. Returns:
  512. The int value representing the rank of the data node config in the scenario config DAG.
  513. If the scenario config is None or an empty string, 0xfffb is returned.<br/>
  514. If the data node config is not found, 0xfffd is returned. This case cannot
  515. happen in a normal situation.<br/>
  516. If the data node config has no precomputed ranks, 0xfffe is returned. This case
  517. cannot happen in a normal situation.<br/>
  518. If the data node config is not part of the scenario config, 0xfffc is returned as an infinite rank.
  519. """
  520. if not scenario_config_id:
  521. return 0xfffb
  522. dn_config = Config.data_nodes.get(self._config_id, None)
  523. if not dn_config:
  524. self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` is not found.")
  525. return 0xfffd
  526. if not dn_config._ranks:
  527. self._logger.error(f"Data node config `{self.config_id}` for data node `{self.id}` has no rank.")
  528. return 0xfffe
  529. return dn_config._ranks.get(scenario_config_id, 0xfffc)
  530. @abstractmethod
  531. def _read(self):
  532. raise NotImplementedError
  533. def _append(self, data):
  534. raise NotImplementedError
  535. @abstractmethod
  536. def _write(self, data):
  537. raise NotImplementedError
  538. @staticmethod
  539. def _new_id(config_id: str) -> DataNodeId:
  540. """Generate a unique datanode identifier."""
  541. return DataNodeId(
  542. DataNode.__ID_SEPARATOR.join([DataNode._ID_PREFIX, _validate_id(config_id), str(uuid.uuid4())])
  543. )
  544. def _get_user_properties(self) -> Dict[str, Any]:
  545. """Get user properties."""
  546. return {key: value for key, value in self.properties.items() if key not in self._TAIPY_PROPERTIES}
  547. @classmethod
  548. def _get_last_modified_datetime(cls, path: Optional[str] = None) -> Optional[datetime]:
  549. if path and os.path.isfile(path):
  550. return datetime.fromtimestamp(os.path.getmtime(path))
  551. last_modified_datetime = None
  552. if path and os.path.isdir(path):
  553. for filename in os.listdir(path):
  554. filepath = os.path.join(path, filename)
  555. if os.path.isfile(filepath):
  556. file_mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
  557. if last_modified_datetime is None or file_mtime > last_modified_datetime:
  558. last_modified_datetime = file_mtime
  559. return last_modified_datetime
  560. @staticmethod
  561. def _class_map():
  562. def all_subclasses(cls):
  563. subclasses = set(cls.__subclasses__())
  564. for s in cls.__subclasses__():
  565. subclasses.update(all_subclasses(s))
  566. return subclasses
  567. class_map = {}
  568. for c in all_subclasses(DataNode):
  569. try:
  570. if c.storage_type() is not None:
  571. class_map[c.storage_type()] = c
  572. except NotImplementedError:
  573. pass
  574. return class_map
  575. @_make_event.register(DataNode)
  576. def _make_event_for_datanode(
  577. data_node: DataNode,
  578. operation: EventOperation,
  579. /,
  580. attribute_name: Optional[str] = None,
  581. attribute_value: Optional[Any] = None,
  582. **kwargs,
  583. ) -> Event:
  584. metadata = {"config_id": data_node.config_id, "version": data_node._version, **kwargs}
  585. return Event(
  586. entity_type=EventEntityType.DATA_NODE,
  587. entity_id=data_node.id,
  588. operation=operation,
  589. attribute_name=attribute_name,
  590. attribute_value=attribute_value,
  591. metadata=metadata,
  592. )