_context.py 55 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import datetime
  12. import json
  13. import typing as t
  14. import zoneinfo
  15. from collections import defaultdict
  16. from numbers import Number
  17. from pathlib import Path
  18. from threading import Lock
  19. import pandas as pd
  20. from dateutil import parser
  21. from taipy.common.config import Config
  22. from taipy.core import (
  23. Cycle,
  24. DataNode,
  25. DataNodeId,
  26. Job,
  27. JobId,
  28. Scenario,
  29. ScenarioId,
  30. Sequence,
  31. SequenceId,
  32. Submission,
  33. SubmissionId,
  34. can_create,
  35. cancel_job,
  36. create_scenario,
  37. delete_job,
  38. get_cycles_scenarios,
  39. get_data_nodes,
  40. get_jobs,
  41. is_deletable,
  42. is_editable,
  43. is_promotable,
  44. is_readable,
  45. is_submittable,
  46. set_primary,
  47. )
  48. from taipy.core import delete as core_delete
  49. from taipy.core import get as core_get
  50. from taipy.core import submit as core_submit
  51. from taipy.core.data._file_datanode_mixin import _FileDataNodeMixin
  52. from taipy.core.notification import CoreEventConsumerBase, EventEntityType
  53. from taipy.core.notification.event import Event, EventOperation
  54. from taipy.core.notification.notifier import Notifier
  55. from taipy.core.reason import ReasonCollection
  56. from taipy.core.submission.submission_status import SubmissionStatus
  57. from taipy.gui import Gui, State, get_state_id
  58. from taipy.gui._warnings import _warn
  59. from taipy.gui.gui import _DoNotUpdate
  60. from taipy.gui.utils._map_dict import _MapDict
  61. from ._adapters import (
  62. _EntityType,
  63. _get_entity_property,
  64. _GuiCoreDatanodeAdapter,
  65. _GuiCoreScenarioProperties,
  66. _invoke_action,
  67. )
  68. from .filters import CustomScenarioFilter
  69. class _GuiCoreContext(CoreEventConsumerBase):
  70. __PROP_ENTITY_ID = "id"
  71. __PROP_ENTITY_COMMENT = "comment"
  72. __PROP_CONFIG_ID = "config"
  73. __PROP_DATE = "date"
  74. __PROP_ENTITY_NAME = "name"
  75. __PROP_SCENARIO_PRIMARY = "primary"
  76. __PROP_SCENARIO_TAGS = "tags"
  77. __ENTITY_PROPS = (__PROP_CONFIG_ID, __PROP_DATE, __PROP_ENTITY_NAME)
  78. __ACTION = "action"
  79. _CORE_CHANGED_NAME = "core_changed"
  80. _AUTH_CHANGED_NAME = "auth_changed"
  81. def __init__(self, gui: Gui) -> None:
  82. self.gui = gui
  83. self.scenario_by_cycle: t.Optional[t.Dict[t.Optional[Cycle], t.List[Scenario]]] = None
  84. self.data_nodes_by_owner: t.Optional[t.Dict[t.Optional[str], t.List[DataNode]]] = None
  85. self.scenario_configs: t.Optional[t.List[t.Tuple[str, str]]] = None
  86. self.jobs_list: t.Optional[t.List[Job]] = None
  87. self.client_submission: t.Dict[str, SubmissionStatus] = {}
  88. # register to taipy core notification
  89. reg_id, reg_queue = Notifier.register()
  90. # locks
  91. self.lock = Lock()
  92. self.submissions_lock = Lock()
  93. # lazy_start
  94. self.__started = False
  95. # Gui event listener
  96. gui._add_event_listener("authorization", self._auth_listener, with_state=True)
  97. # super
  98. super().__init__(reg_id, reg_queue)
  99. def on_user_init(self, state: State):
  100. self.gui._fire_event("authorization", get_state_id(state), {})
  101. def __lazy_start(self):
  102. if self.__started:
  103. return
  104. self.__started = True
  105. self.start()
  106. def process_event(self, event: Event):
  107. self.__lazy_start()
  108. if event.entity_type is EventEntityType.SCENARIO:
  109. with self.gui._get_authorization(system=True):
  110. self.scenario_refresh(
  111. event.entity_id
  112. if event.operation is EventOperation.DELETION or is_readable(t.cast(ScenarioId, event.entity_id))
  113. else None
  114. )
  115. elif event.entity_type is EventEntityType.SEQUENCE and event.entity_id:
  116. sequence = None
  117. try:
  118. with self.gui._get_authorization(system=True):
  119. sequence = (
  120. core_get(event.entity_id)
  121. if event.operation is not EventOperation.DELETION
  122. and is_readable(t.cast(SequenceId, event.entity_id))
  123. else None
  124. )
  125. if sequence and hasattr(sequence, "parent_ids") and sequence.parent_ids: # type: ignore
  126. self.broadcast_core_changed({"scenario": list(sequence.parent_ids)}) # type: ignore
  127. except Exception as e:
  128. _warn(f"Access to sequence {event.entity_id} failed", e)
  129. elif event.entity_type is EventEntityType.JOB:
  130. with self.lock:
  131. self.jobs_list = None
  132. # no broadcast because the submission status will do the job
  133. if event.operation is EventOperation.DELETION:
  134. self.broadcast_core_changed({"jobs": True})
  135. elif event.entity_type is EventEntityType.SUBMISSION:
  136. self.submission_status_callback(event.entity_id, event)
  137. elif event.entity_type is EventEntityType.DATA_NODE:
  138. with self.lock:
  139. self.data_nodes_by_owner = None
  140. self.broadcast_core_changed(
  141. {"datanode": event.entity_id if event.operation != EventOperation.DELETION else True}
  142. )
  143. def broadcast_core_changed(self, payload: t.Dict[str, t.Any], client_id: t.Optional[str] = None):
  144. self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, payload, client_id)
  145. def scenario_refresh(self, scenario_id: t.Optional[str]):
  146. with self.lock:
  147. self.scenario_by_cycle = None
  148. self.data_nodes_by_owner = None
  149. self.broadcast_core_changed({"scenario": scenario_id or True})
  150. def submission_status_callback(self, submission_id: t.Optional[str] = None, event: t.Optional[Event] = None):
  151. if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
  152. return
  153. submission = None
  154. new_status = None
  155. payload: t.Optional[t.Dict[str, t.Any]] = None
  156. client_id: t.Optional[str] = None
  157. try:
  158. last_status = self.client_submission.get(submission_id)
  159. if not last_status:
  160. return
  161. submission = t.cast(Submission, core_get(submission_id))
  162. if not submission or not submission.entity_id:
  163. return
  164. payload = {}
  165. new_status = t.cast(SubmissionStatus, submission.submission_status)
  166. client_id = submission.properties.get("client_id")
  167. if client_id:
  168. running_tasks = {}
  169. with self.gui._get_authorization(client_id):
  170. for job in submission.jobs:
  171. job = job if isinstance(job, Job) else t.cast(Job, core_get(job))
  172. running_tasks[job.task.id] = (
  173. SubmissionStatus.RUNNING.value
  174. if job.is_running()
  175. else SubmissionStatus.PENDING.value
  176. if job.is_pending()
  177. else None
  178. )
  179. payload.update(tasks=running_tasks)
  180. if last_status is not new_status:
  181. # callback
  182. submission_name = submission.properties.get("on_submission")
  183. if submission_name:
  184. self.gui.invoke_callback(
  185. client_id,
  186. submission_name,
  187. [
  188. core_get(submission.id),
  189. {
  190. "submission_status": new_status.name,
  191. "submittable_entity": core_get(submission.entity_id),
  192. **(event.metadata if event else {}),
  193. },
  194. ],
  195. submission.properties.get("module_context"),
  196. )
  197. with self.submissions_lock:
  198. if new_status in (
  199. SubmissionStatus.COMPLETED,
  200. SubmissionStatus.FAILED,
  201. SubmissionStatus.CANCELED,
  202. ):
  203. self.client_submission.pop(submission_id, None)
  204. else:
  205. self.client_submission[submission_id] = new_status
  206. except Exception as e:
  207. _warn(f"Submission ({submission_id}) is not available", e)
  208. finally:
  209. if payload is not None:
  210. payload.update(jobs=True)
  211. entity_id = submission.entity_id if submission else None
  212. if entity_id:
  213. payload.update(scenario=entity_id)
  214. if new_status:
  215. payload.update(submission=new_status.value)
  216. self.broadcast_core_changed(payload, client_id)
  217. def no_change_adapter(self, entity: t.List):
  218. return entity
  219. def cycle_adapter(self, cycle: Cycle, sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None):
  220. self.__lazy_start()
  221. try:
  222. if (
  223. isinstance(cycle, Cycle)
  224. and is_readable(cycle.id)
  225. and core_get(cycle.id) is not None
  226. and self.scenario_by_cycle
  227. ):
  228. return [
  229. cycle.id,
  230. cycle.get_simple_label(),
  231. self.get_sorted_scenario_list(self.scenario_by_cycle.get(cycle, []), sorts),
  232. _EntityType.CYCLE.value,
  233. False,
  234. ]
  235. except Exception as e:
  236. _warn(
  237. f"Access to {type(cycle).__name__} " + f"({cycle.id if hasattr(cycle, 'id') else 'No_id'})" + " failed",
  238. e,
  239. )
  240. return None
  241. def scenario_adapter(self, scenario: Scenario):
  242. self.__lazy_start()
  243. if isinstance(scenario, (tuple, list)):
  244. return scenario
  245. try:
  246. if isinstance(scenario, Scenario) and is_readable(scenario.id) and core_get(scenario.id) is not None:
  247. return [
  248. scenario.id,
  249. scenario.get_simple_label(),
  250. None,
  251. _EntityType.SCENARIO.value,
  252. scenario.is_primary,
  253. ]
  254. except Exception as e:
  255. _warn(
  256. f"Access to {type(scenario).__name__} "
  257. + f"({scenario.id if hasattr(scenario, 'id') else 'No_id'})"
  258. + " failed",
  259. e,
  260. )
  261. return None
  262. def filter_entities(
  263. self, cycle_scenario: t.List, col: str, col_type: str, is_dn: bool, action: str, val: t.Any, col_fn=None
  264. ):
  265. cycle_scenario[2] = [
  266. e for e in cycle_scenario[2] if _invoke_action(e, col, col_type, is_dn, action, val, col_fn)
  267. ]
  268. return cycle_scenario
  269. def adapt_scenarios(self, cycle: t.List):
  270. cycle[2] = [self.scenario_adapter(e) for e in cycle[2]]
  271. return cycle
  272. def get_sorted_scenario_list(
  273. self,
  274. entities: t.Union[t.List[t.Union[Cycle, Scenario]], t.List[Scenario]],
  275. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  276. ):
  277. if sorts:
  278. sorted_list = entities
  279. for sd in reversed(sorts):
  280. col = sd.get("col", "")
  281. order = sd.get("order", True)
  282. sorted_list = sorted(sorted_list, key=_get_entity_property(col, Scenario, Cycle), reverse=not order)
  283. else:
  284. sorted_list = sorted(entities, key=_get_entity_property("creation_date", Scenario, Cycle))
  285. return [self.cycle_adapter(e, sorts) if isinstance(e, Cycle) else e for e in sorted_list]
  286. def get_filtered_scenario_list(
  287. self,
  288. entities: t.List[t.Union[t.List, Scenario, None]],
  289. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  290. ):
  291. if not filters:
  292. return entities
  293. # filtering
  294. filtered_list = list(entities)
  295. for fd in filters:
  296. col = fd.get("col", "")
  297. is_datanode_prop = _GuiCoreScenarioProperties.is_datanode_property(col)
  298. col_type = fd.get("type", "no type")
  299. col_fn = cp[0] if (cp := col.split("(")) and len(cp) > 1 else None
  300. val = fd.get("value")
  301. action = fd.get("action", "")
  302. customs = CustomScenarioFilter._get_custom(col)
  303. if customs:
  304. with self.gui._set_locals_context(customs[0] or None):
  305. fn = self.gui._get_user_function(customs[1])
  306. if callable(fn):
  307. col = fn
  308. if (
  309. isinstance(col, str)
  310. and next(filter(lambda s: not s.isidentifier(), (col_fn or col).split(".")), False) is True
  311. ):
  312. _warn(f'Error filtering with "{col_fn or col}": not a valid Python identifier.')
  313. continue
  314. # level 1 filtering
  315. filtered_list = [
  316. e
  317. for e in filtered_list
  318. if not isinstance(e, Scenario)
  319. or _invoke_action(e, t.cast(str, col), col_type, is_datanode_prop, action, val, col_fn)
  320. ]
  321. # level 2 filtering
  322. filtered_list = [
  323. e
  324. if isinstance(e, Scenario)
  325. else self.filter_entities(
  326. t.cast(list, e), t.cast(str, col), col_type, is_datanode_prop, action, val, col_fn
  327. )
  328. for e in filtered_list
  329. ]
  330. # remove empty cycles
  331. return [e for e in filtered_list if isinstance(e, Scenario) or (isinstance(e, (tuple, list)) and len(e[2]))]
  332. def get_scenarios(
  333. self,
  334. scenarios: t.Optional[t.List[t.Union[Cycle, Scenario]]],
  335. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  336. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  337. ):
  338. self.__lazy_start()
  339. cycles_scenarios: t.List[t.Union[Cycle, Scenario]] = []
  340. with self.lock:
  341. # always needed to get scenarios for a cycle in cycle_adapter
  342. if self.scenario_by_cycle is None:
  343. self.scenario_by_cycle = get_cycles_scenarios()
  344. if scenarios is None:
  345. for cycle, c_scenarios in self.scenario_by_cycle.items():
  346. if cycle is None:
  347. cycles_scenarios.extend(c_scenarios)
  348. else:
  349. cycles_scenarios.append(cycle)
  350. if scenarios is not None:
  351. cycles_scenarios = scenarios
  352. adapted_list = self.get_sorted_scenario_list(cycles_scenarios, sorts)
  353. adapted_list = self.get_filtered_scenario_list(adapted_list, filters)
  354. return adapted_list
  355. def select_scenario(self, state: State, id: str, payload: t.Dict[str, str]):
  356. self.__lazy_start()
  357. args = payload.get("args")
  358. if args is None or not isinstance(args, list) or len(args) < 2:
  359. return
  360. state.assign(args[0], args[1])
  361. def get_scenario_by_id(self, id: str) -> t.Optional[Scenario]:
  362. self.__lazy_start()
  363. if not id or not is_readable(t.cast(ScenarioId, id)):
  364. return None
  365. try:
  366. return core_get(t.cast(ScenarioId, id))
  367. except Exception:
  368. return None
  369. def get_scenario_configs(self):
  370. self.__lazy_start()
  371. with self.lock:
  372. if self.scenario_configs is None:
  373. configs = Config.scenarios
  374. if isinstance(configs, dict):
  375. self.scenario_configs = [(id, f"{c.id}") for id, c in configs.items() if id != "default"]
  376. return self.scenario_configs
  377. def crud_scenario(self, state: State, id: str, payload: t.Dict[str, str]): # noqa: C901
  378. self.__lazy_start()
  379. args = payload.get("args")
  380. start_idx = 3
  381. if (
  382. args is None
  383. or not isinstance(args, list)
  384. or len(args) < start_idx + 3
  385. or not isinstance(args[start_idx], bool)
  386. or not isinstance(args[start_idx + 1], bool)
  387. or not isinstance(args[start_idx + 2], dict)
  388. ):
  389. return
  390. error_var = t.cast(str, payload.get("error_id"))
  391. update = args[start_idx]
  392. delete = args[start_idx + 1]
  393. data = t.cast(dict, args[start_idx + 2])
  394. with_dialog = True if len(args) < start_idx + 4 else bool(args[start_idx + 3])
  395. scenario = None
  396. user_scenario = None
  397. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  398. if update:
  399. scenario_id = t.cast(ScenarioId, data.get(_GuiCoreContext.__PROP_ENTITY_ID))
  400. if delete:
  401. if not (reason := is_deletable(scenario_id)):
  402. state.assign(error_var, f"Scenario. {scenario_id} is not deletable: {_get_reason(reason)}.")
  403. return
  404. try:
  405. core_delete(scenario_id)
  406. except Exception as e:
  407. state.assign(error_var, f"Error deleting Scenario. {e}")
  408. else:
  409. if not self.__check_readable_editable(state, scenario_id, "Scenario", error_var):
  410. return
  411. scenario = core_get(scenario_id)
  412. else:
  413. if with_dialog:
  414. config_id = data.get(_GuiCoreContext.__PROP_CONFIG_ID)
  415. scenario_config = Config.scenarios.get(config_id)
  416. if with_dialog and scenario_config is None:
  417. state.assign(error_var, f"Invalid configuration id ({config_id})")
  418. return
  419. date_str = data.get(_GuiCoreContext.__PROP_DATE)
  420. try:
  421. date = parser.parse(date_str) if isinstance(date_str, str) else None
  422. except Exception as e:
  423. state.assign(error_var, f"Invalid date ({date_str}).{e}")
  424. return
  425. else:
  426. scenario_config = None
  427. date = None
  428. scenario_id = None
  429. gui = state.get_gui()
  430. try:
  431. on_creation = args[0] if isinstance(args[0], str) else None
  432. on_creation_function = gui._get_user_function(on_creation) if on_creation else None
  433. if callable(on_creation_function) and on_creation:
  434. try:
  435. res = gui._call_function_with_state(
  436. on_creation_function,
  437. [
  438. id,
  439. {
  440. "action": on_creation,
  441. "config": scenario_config,
  442. "date": date,
  443. "label": name,
  444. "properties": {v.get("key"): v.get("value") for v in data.get("properties", {})},
  445. },
  446. ],
  447. )
  448. if isinstance(res, Scenario):
  449. # everything's fine
  450. user_scenario = res
  451. scenario_id = user_scenario.id
  452. state.assign(error_var, "")
  453. return
  454. if res:
  455. # do not create
  456. state.assign(error_var, f"{res}")
  457. return
  458. except Exception as e: # pragma: no cover
  459. if not gui._call_on_exception(on_creation, e):
  460. _warn(f"on_creation(): Exception raised in '{on_creation}()'", e)
  461. state.assign(
  462. error_var,
  463. f"Error creating Scenario with '{on_creation}()'. {e}",
  464. )
  465. return
  466. elif on_creation is not None:
  467. _warn(f"on_creation(): '{on_creation}' is not a function.")
  468. elif not with_dialog:
  469. if len(Config.scenarios) == 2:
  470. scenario_config = next(sc for k, sc in Config.scenarios.items() if k != "default")
  471. else:
  472. state.assign(
  473. error_var,
  474. "Error creating Scenario: only one scenario config needed "
  475. + f"({len(Config.scenarios) - 1}) found.",
  476. )
  477. return
  478. scenario = create_scenario(scenario_config, date, name) if scenario_config else None
  479. scenario_id = scenario.id if scenario else None
  480. except Exception as e:
  481. state.assign(error_var, f"Error creating Scenario. {e}")
  482. finally:
  483. self.scenario_refresh(scenario_id)
  484. if (scenario or user_scenario) and (sel_scenario_var := args[1] if isinstance(args[1], str) else None):
  485. self.gui._update_var(sel_scenario_var, scenario or user_scenario, on_change=args[2])
  486. if scenario:
  487. if not (reason := is_editable(scenario)):
  488. state.assign(error_var, f"Scenario {scenario_id or name} is not editable: {_get_reason(reason)}.")
  489. return
  490. with scenario as sc:
  491. sc.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  492. if props := data.get("properties"):
  493. try:
  494. new_keys = [prop.get("key") for prop in props]
  495. for key in t.cast(dict, sc.properties).keys():
  496. if key and key not in _GuiCoreContext.__ENTITY_PROPS and key not in new_keys:
  497. t.cast(dict, sc.properties).pop(key, None)
  498. for prop in props:
  499. key = prop.get("key")
  500. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  501. sc._properties[key] = prop.get("value")
  502. state.assign(error_var, "")
  503. except Exception as e:
  504. state.assign(error_var, f"Error creating Scenario. {e}")
  505. @staticmethod
  506. def __assign_var(state: State, var_name: t.Optional[str], msg: str):
  507. if var_name:
  508. state.assign(var_name, msg)
  509. def edit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  510. self.__lazy_start()
  511. args = payload.get("args")
  512. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  513. return
  514. error_var = payload.get("error_id")
  515. data = t.cast(dict, args[0])
  516. entity_id = t.cast(str, data.get(_GuiCoreContext.__PROP_ENTITY_ID))
  517. sequence = data.get("sequence")
  518. if not self.__check_readable_editable(state, entity_id, "Scenario", error_var):
  519. return
  520. scenario = t.cast(Scenario, core_get(entity_id))
  521. if scenario:
  522. try:
  523. if not sequence:
  524. if isinstance(sequence, str) and (name := data.get(_GuiCoreContext.__PROP_ENTITY_NAME)):
  525. scenario.add_sequence(name, t.cast(list, data.get("task_ids")))
  526. else:
  527. primary = data.get(_GuiCoreContext.__PROP_SCENARIO_PRIMARY)
  528. if primary is True:
  529. if not (reason := is_promotable(scenario)):
  530. _GuiCoreContext.__assign_var(
  531. state, error_var, f"Scenario {entity_id} is not promotable: {_get_reason(reason)}."
  532. )
  533. return
  534. set_primary(scenario)
  535. self.__edit_properties(scenario, data)
  536. else:
  537. if data.get("del", False):
  538. scenario.remove_sequence(sequence)
  539. else:
  540. name = t.cast(str, data.get(_GuiCoreContext.__PROP_ENTITY_NAME))
  541. if sequence != name:
  542. scenario.rename_sequence(sequence, name)
  543. if seqEntity := scenario.sequences.get(name):
  544. seqEntity.tasks = t.cast(list, data.get("task_ids"))
  545. self.__edit_properties(seqEntity, data)
  546. else:
  547. _GuiCoreContext.__assign_var(
  548. state,
  549. error_var,
  550. f"Sequence {name} is not available in Scenario {entity_id}.",
  551. )
  552. return
  553. _GuiCoreContext.__assign_var(state, error_var, "")
  554. except Exception as e:
  555. _GuiCoreContext.__assign_var(state, error_var, f"Error updating {type(scenario).__name__}. {e}")
  556. def submit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  557. self.__lazy_start()
  558. args = payload.get("args")
  559. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  560. return
  561. data = t.cast(dict, args[0])
  562. error_var = payload.get("error_id")
  563. try:
  564. scenario_id = t.cast(str, data.get(_GuiCoreContext.__PROP_ENTITY_ID))
  565. entity = t.cast(Scenario, core_get(scenario_id))
  566. if sequence := t.cast(str, data.get("sequence")):
  567. entity = t.cast(Sequence, entity.sequences.get(sequence))
  568. if not (reason := is_submittable(entity)):
  569. _GuiCoreContext.__assign_var(
  570. state,
  571. error_var,
  572. f"{'Sequence' if sequence else 'Scenario'} {sequence or scenario_id} is not submittable: "
  573. + f"{_get_reason(reason)}.",
  574. )
  575. return
  576. if entity:
  577. on_submission = data.get("on_submission_change")
  578. submission_entity = core_submit(
  579. entity,
  580. on_submission=on_submission,
  581. client_id=self.gui._get_client_id(),
  582. module_context=self.gui._get_locals_context(),
  583. )
  584. with self.submissions_lock:
  585. self.client_submission[submission_entity.id] = submission_entity.submission_status
  586. if Config.core.mode == "development":
  587. with self.submissions_lock:
  588. self.client_submission[submission_entity.id] = SubmissionStatus.SUBMITTED
  589. self.submission_status_callback(submission_entity.id)
  590. _GuiCoreContext.__assign_var(state, error_var, "")
  591. except Exception as e:
  592. _GuiCoreContext.__assign_var(state, error_var, f"Error submitting entity. {e}")
  593. def get_filtered_datanode_list(
  594. self,
  595. entities: t.List[t.Union[t.List, DataNode, None]],
  596. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  597. ):
  598. if not filters or not entities:
  599. return entities
  600. # filtering
  601. filtered_list = list(entities)
  602. for fd in filters:
  603. col = fd.get("col", "")
  604. col_type = fd.get("type", "no type")
  605. col_fn = cp[0] if (cp := col.split("(")) and len(cp) > 1 else None
  606. val = fd.get("value")
  607. action = fd.get("action", "")
  608. customs = CustomScenarioFilter._get_custom(col)
  609. if customs:
  610. with self.gui._set_locals_context(customs[0] or None):
  611. fn = self.gui._get_user_function(customs[1])
  612. if callable(fn):
  613. col = fn
  614. if (
  615. isinstance(col, str)
  616. and next(filter(lambda s: not s.isidentifier(), (col_fn or col).split(".")), False) is True
  617. ):
  618. _warn(f'Error filtering with "{col_fn or col}": not a valid Python identifier.')
  619. continue
  620. # level 1 filtering
  621. filtered_list = [
  622. e
  623. for e in filtered_list
  624. if not isinstance(e, DataNode)
  625. or _invoke_action(e, t.cast(str, col), col_type, False, action, val, col_fn)
  626. ]
  627. # level 3 filtering
  628. filtered_list = [
  629. e
  630. if isinstance(e, DataNode)
  631. else self.filter_entities(d, t.cast(str, col), col_type, False, action, val, col_fn)
  632. for e in filtered_list
  633. for d in t.cast(list, t.cast(list, e)[2])
  634. ]
  635. # remove empty cycles
  636. return [e for e in filtered_list if isinstance(e, DataNode) or (isinstance(e, (tuple, list)) and len(e[2]))]
  637. def get_sorted_datanode_list(
  638. self,
  639. entities: t.Union[
  640. t.List[t.Union[Cycle, Scenario, DataNode]], t.List[t.Union[Scenario, DataNode]], t.List[DataNode]
  641. ],
  642. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  643. adapt_dn=False,
  644. ):
  645. if not entities:
  646. return entities
  647. if sorts:
  648. sorted_list = entities
  649. for sd in reversed(sorts):
  650. col = sd.get("col", "")
  651. order = sd.get("order", True)
  652. sorted_list = sorted(sorted_list, key=_get_entity_property(col, DataNode), reverse=not order)
  653. else:
  654. sorted_list = entities
  655. return [self.data_node_adapter(e, sorts, adapt_dn) for e in sorted_list]
  656. def __do_datanodes_tree(self):
  657. if self.data_nodes_by_owner is None:
  658. self.data_nodes_by_owner = defaultdict(list)
  659. for dn in get_data_nodes():
  660. self.data_nodes_by_owner[dn.owner_id].append(dn)
  661. def get_datanodes_tree(
  662. self,
  663. scenarios: t.Optional[t.Union[Scenario, t.List[Scenario]]],
  664. datanodes: t.Optional[t.List[DataNode]],
  665. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  666. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  667. ):
  668. self.__lazy_start()
  669. base_list = []
  670. with self.lock:
  671. self.__do_datanodes_tree()
  672. if datanodes is None:
  673. if scenarios is None:
  674. base_list = (self.data_nodes_by_owner or {}).get(None, []) + (
  675. self.get_scenarios(None, None, None) or []
  676. )
  677. else:
  678. if isinstance(scenarios, (list, tuple)) and len(scenarios) > 1:
  679. base_list = list(scenarios)
  680. else:
  681. if self.data_nodes_by_owner:
  682. owners = scenarios if isinstance(scenarios, (list, tuple)) else [scenarios]
  683. base_list = [d for owner in owners for d in (self.data_nodes_by_owner).get(owner.id, [])]
  684. else:
  685. base_list = []
  686. else:
  687. base_list = datanodes
  688. adapted_list = self.get_sorted_datanode_list(t.cast(list, base_list), sorts)
  689. return self.get_filtered_datanode_list(t.cast(list, adapted_list), filters)
  690. def data_node_adapter(
  691. self,
  692. data: t.Union[Cycle, Scenario, Sequence, DataNode],
  693. sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None,
  694. adapt_dn=True,
  695. ):
  696. self.__lazy_start()
  697. if isinstance(data, tuple):
  698. raise NotImplementedError
  699. if isinstance(data, list):
  700. if data[2] and isinstance(t.cast(list, data[2])[0], (Cycle, Scenario, Sequence, DataNode)):
  701. data[2] = self.get_sorted_datanode_list(t.cast(list, data[2]), sorts, False)
  702. return data
  703. try:
  704. if hasattr(data, "id") and is_readable(data.id) and core_get(data.id) is not None:
  705. if isinstance(data, DataNode):
  706. return (
  707. [data.id, data.get_simple_label(), None, _EntityType.DATANODE.value, False]
  708. if adapt_dn
  709. else data
  710. )
  711. with self.lock:
  712. self.__do_datanodes_tree()
  713. if self.data_nodes_by_owner:
  714. if isinstance(data, Cycle):
  715. return [
  716. data.id,
  717. data.get_simple_label(),
  718. self.get_sorted_datanode_list(
  719. self.data_nodes_by_owner.get(data.id, [])
  720. + (self.scenario_by_cycle or {}).get(data, []),
  721. sorts,
  722. False,
  723. ),
  724. _EntityType.CYCLE.value,
  725. False,
  726. ]
  727. elif isinstance(data, Scenario):
  728. return [
  729. data.id,
  730. data.get_simple_label(),
  731. self.get_sorted_datanode_list(
  732. t.cast(list, self.data_nodes_by_owner.get(data.id, []) + list(data.sequences.values())),
  733. sorts,
  734. False,
  735. ),
  736. _EntityType.SCENARIO.value,
  737. data.is_primary,
  738. ]
  739. elif isinstance(data, Sequence):
  740. if datanodes := self.data_nodes_by_owner.get(data.id):
  741. return [
  742. data.id,
  743. data.get_simple_label(),
  744. self.get_sorted_datanode_list(datanodes, sorts, False),
  745. _EntityType.SEQUENCE.value,
  746. ]
  747. except Exception as e:
  748. _warn(
  749. f"Access to {type(data)} ({data.id if hasattr(data, 'id') else 'No_id'}) failed",
  750. e,
  751. )
  752. return None
  753. def get_jobs_list(self):
  754. self.__lazy_start()
  755. with self.lock:
  756. if self.jobs_list is None:
  757. self.jobs_list = get_jobs()
  758. return self.jobs_list
  759. def job_adapter(self, job):
  760. self.__lazy_start()
  761. try:
  762. if hasattr(job, "id") and is_readable(job.id) and core_get(job.id) is not None:
  763. if isinstance(job, Job):
  764. entity = core_get(job.owner_id)
  765. return (
  766. job.id,
  767. job.get_simple_label(),
  768. [],
  769. entity.id if entity else "",
  770. entity.get_simple_label() if entity else "",
  771. job.submit_id,
  772. job.creation_date,
  773. job.status.value,
  774. _get_reason(is_deletable(job)),
  775. _get_reason(is_readable(job)),
  776. _get_reason(is_editable(job)),
  777. )
  778. except Exception as e:
  779. _warn(f"Access to job ({job.id if hasattr(job, 'id') else 'No_id'}) failed", e)
  780. return None
  781. def act_on_jobs(self, state: State, id: str, payload: t.Dict[str, str]):
  782. self.__lazy_start()
  783. args = payload.get("args")
  784. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  785. return
  786. data = t.cast(dict, args[0])
  787. job_ids = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  788. job_action = data.get(_GuiCoreContext.__ACTION)
  789. if job_action and isinstance(job_ids, list):
  790. errs = []
  791. if job_action == "delete":
  792. for job_id in job_ids:
  793. if not (reason := is_readable(job_id)):
  794. errs.append(f"Job {job_id} is not readable: {_get_reason(reason)}.")
  795. continue
  796. if not (reason := is_deletable(job_id)):
  797. errs.append(f"Job {job_id} is not deletable: {_get_reason(reason)}.")
  798. continue
  799. try:
  800. delete_job(core_get(job_id))
  801. except Exception as e:
  802. errs.append(f"Error deleting job. {e}")
  803. elif job_action == "cancel":
  804. for job_id in job_ids:
  805. if not (reason := is_readable(job_id)):
  806. errs.append(f"Job {job_id} is not readable: {_get_reason(reason)}.")
  807. continue
  808. if not (reason := is_editable(job_id)):
  809. errs.append(f"Job {job_id} is not cancelable: {_get_reason(reason)}.")
  810. continue
  811. try:
  812. cancel_job(job_id)
  813. except Exception as e:
  814. errs.append(f"Error canceling job. {e}")
  815. _GuiCoreContext.__assign_var(state, payload.get("error_id"), "<br/>".join(errs) if errs else "")
  816. def get_job_details(self, job_id: t.Optional[JobId]):
  817. try:
  818. if job_id and is_readable(job_id) and (job := core_get(job_id)) is not None:
  819. if isinstance(job, Job):
  820. entity = core_get(job.owner_id)
  821. return (
  822. job.id,
  823. job.get_simple_label(),
  824. entity.id if entity else "",
  825. entity.get_simple_label() if entity else "",
  826. job.submit_id,
  827. job.creation_date,
  828. job.status.value,
  829. _get_reason(is_deletable(job)),
  830. ""
  831. if job.execution_duration is None
  832. else str(datetime.timedelta(seconds=job.execution_duration)),
  833. [] if job.stacktrace is None else job.stacktrace,
  834. )
  835. except Exception as e:
  836. _warn(f"Access to job ({job_id}) failed", e)
  837. return None
  838. def edit_data_node(self, state: State, id: str, payload: t.Dict[str, str]):
  839. self.__lazy_start()
  840. args = payload.get("args")
  841. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  842. return
  843. error_var = payload.get("error_id")
  844. data = t.cast(dict, args[0])
  845. entity_id = t.cast(str, data.get(_GuiCoreContext.__PROP_ENTITY_ID))
  846. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  847. return
  848. entity = t.cast(DataNode, core_get(entity_id))
  849. if isinstance(entity, DataNode):
  850. try:
  851. self.__edit_properties(entity, data)
  852. _GuiCoreContext.__assign_var(state, error_var, "")
  853. except Exception as e:
  854. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Data node. {e}")
  855. def lock_datanode_for_edit(self, state: State, id: str, payload: t.Dict[str, str]):
  856. self.__lazy_start()
  857. args = payload.get("args")
  858. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  859. return
  860. data = t.cast(dict, args[0])
  861. error_var = payload.get("error_id")
  862. entity_id = t.cast(str, data.get(_GuiCoreContext.__PROP_ENTITY_ID))
  863. if not self.__check_readable_editable(state, entity_id, "Data node", error_var):
  864. return
  865. lock = data.get("lock", True)
  866. entity = t.cast(DataNode, core_get(entity_id))
  867. if isinstance(entity, DataNode):
  868. try:
  869. if lock:
  870. entity.lock_edit(self.gui._get_client_id())
  871. else:
  872. entity.unlock_edit(self.gui._get_client_id())
  873. _GuiCoreContext.__assign_var(state, error_var, "")
  874. except Exception as e:
  875. _GuiCoreContext.__assign_var(state, error_var, f"Error locking Data node. {e}")
  876. def __edit_properties(self, entity: t.Union[Scenario, Sequence, DataNode], data: t.Dict[str, str]):
  877. with entity as ent:
  878. if isinstance(ent, Scenario):
  879. tags = data.get(_GuiCoreContext.__PROP_SCENARIO_TAGS)
  880. if isinstance(tags, (list, tuple)):
  881. ent.tags = set(tags)
  882. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  883. if isinstance(name, str):
  884. if hasattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME):
  885. setattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME, name)
  886. else:
  887. ent.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  888. props = data.get("properties")
  889. if isinstance(props, (list, tuple)):
  890. for prop in props:
  891. key = t.cast(dict, prop).get("key")
  892. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  893. ent.properties[key] = t.cast(dict, prop).get("value")
  894. deleted_props = data.get("deleted_properties")
  895. if isinstance(deleted_props, (list, tuple)):
  896. for prop in deleted_props:
  897. key = t.cast(dict, prop).get("key")
  898. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  899. ent.properties.pop(key, None)
  900. def get_scenarios_for_owner(self, owner_id: str):
  901. self.__lazy_start()
  902. cycles_scenarios: t.List[t.Union[Scenario, Cycle]] = []
  903. with self.lock:
  904. if self.scenario_by_cycle is None:
  905. self.scenario_by_cycle = get_cycles_scenarios()
  906. if owner_id:
  907. if owner_id == "GLOBAL":
  908. for cycle, scenarios in self.scenario_by_cycle.items():
  909. if cycle is None:
  910. cycles_scenarios.extend(scenarios)
  911. else:
  912. cycles_scenarios.append(cycle)
  913. elif is_readable(t.cast(ScenarioId, owner_id)):
  914. entity = core_get(owner_id)
  915. if entity and (scenarios_cycle := self.scenario_by_cycle.get(t.cast(Cycle, entity))):
  916. cycles_scenarios.extend(scenarios_cycle)
  917. elif isinstance(entity, Scenario):
  918. cycles_scenarios.append(entity)
  919. return sorted(cycles_scenarios, key=_get_entity_property("creation_date", Scenario))
  920. def get_data_node_history(self, id: str):
  921. self.__lazy_start()
  922. if id and (dn := core_get(id)) and isinstance(dn, DataNode):
  923. res = []
  924. for e in dn.edits:
  925. job_id = e.get("job_id")
  926. job: t.Optional[Job] = None
  927. if job_id:
  928. if not (reason := is_readable(job_id)):
  929. job_id += f" is not readable: {_get_reason(reason)}."
  930. else:
  931. job = core_get(job_id)
  932. res.append(
  933. (
  934. e.get("timestamp"),
  935. job_id if job_id else e.get("writer_identifier", ""),
  936. f"Execution of task {job.task.get_simple_label()}."
  937. if job and job.task
  938. else e.get("comment", ""),
  939. )
  940. )
  941. return sorted(res, key=lambda r: r[0], reverse=True)
  942. return _DoNotUpdate()
  943. def __check_readable_editable(self, state: State, id: str, ent_type: str, var: t.Optional[str]):
  944. if not (reason := is_readable(t.cast(ScenarioId, id))):
  945. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not readable: {_get_reason(reason)}.")
  946. return False
  947. if not (reason := is_editable(t.cast(ScenarioId, id))):
  948. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not editable: {_get_reason(reason)}.")
  949. return False
  950. return True
  951. def update_data(self, state: State, id: str, payload: t.Dict[str, str]):
  952. self.__lazy_start()
  953. args = payload.get("args")
  954. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  955. return
  956. data = t.cast(dict, args[0])
  957. error_var = payload.get("error_id")
  958. entity_id = t.cast(str, data.get(_GuiCoreContext.__PROP_ENTITY_ID))
  959. if not self.__check_readable_editable(state, entity_id, "Data node", error_var):
  960. return
  961. entity = t.cast(DataNode, core_get(entity_id))
  962. if isinstance(entity, DataNode):
  963. try:
  964. val = t.cast(str, data.get("value"))
  965. entity.write(
  966. parser.parse(val)
  967. if data.get("type") == "date"
  968. else int(val)
  969. if data.get("type") == "int"
  970. else float(val)
  971. if data.get("type") == "float"
  972. else data.get("value"),
  973. comment=t.cast(dict, data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT)),
  974. )
  975. entity.unlock_edit(self.gui._get_client_id())
  976. _GuiCoreContext.__assign_var(state, error_var, "")
  977. except Exception as e:
  978. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Data node value. {e}")
  979. _GuiCoreContext.__assign_var(state, payload.get("data_id"), entity_id) # this will update the data value
  980. def tabular_data_edit(self, state: State, var_name: str, payload: dict): # noqa:C901
  981. self.__lazy_start()
  982. error_var = payload.get("error_id")
  983. user_data = payload.get("user_data", {})
  984. dn_id = user_data.get("dn_id")
  985. if not self.__check_readable_editable(state, dn_id, "Data node", error_var):
  986. return
  987. datanode = core_get(dn_id) if dn_id else None
  988. if isinstance(datanode, DataNode):
  989. try:
  990. idx = t.cast(int, payload.get("index"))
  991. col = t.cast(str, payload.get("col"))
  992. tz = payload.get("tz")
  993. val = (
  994. parser.parse(str(payload.get("value"))).astimezone(zoneinfo.ZoneInfo(tz)).replace(tzinfo=None)
  995. if tz is not None
  996. else payload.get("value")
  997. )
  998. # user_value = payload.get("user_value")
  999. data = self.__read_tabular_data(datanode)
  1000. new_data: t.Any = None
  1001. if isinstance(data, (pd.DataFrame, pd.Series)):
  1002. if isinstance(data, pd.DataFrame):
  1003. data.at[idx, col] = val
  1004. elif isinstance(data, pd.Series):
  1005. data.at[idx] = val
  1006. new_data = data
  1007. elif isinstance(data, (dict, _MapDict)):
  1008. row = data.get(col, None)
  1009. data_tuple = False
  1010. if isinstance(row, tuple):
  1011. row = list(row)
  1012. data_tuple = True
  1013. if isinstance(row, list):
  1014. row[idx] = val
  1015. if data_tuple:
  1016. data[col] = tuple(row)
  1017. new_data = data
  1018. else:
  1019. _GuiCoreContext.__assign_var(
  1020. state,
  1021. error_var,
  1022. "Error updating Data node: dict values must be list or tuple.",
  1023. )
  1024. else:
  1025. data_tuple = False
  1026. if isinstance(data, tuple):
  1027. data_tuple = True
  1028. data = list(data)
  1029. if isinstance(data, list):
  1030. row = data[idx]
  1031. row_tuple = False
  1032. if isinstance(row, tuple):
  1033. row = list(row)
  1034. row_tuple = True
  1035. if isinstance(row, list):
  1036. row[int(col)] = val
  1037. if row_tuple:
  1038. data[idx] = tuple(row)
  1039. new_data = data
  1040. elif col == "0" and (isinstance(row, (str, Number)) or "date" in type(row).__name__):
  1041. data[idx] = val
  1042. new_data = data
  1043. else:
  1044. _GuiCoreContext.__assign_var(
  1045. state,
  1046. error_var,
  1047. "Error updating data node: cannot handle multi-column list value.",
  1048. )
  1049. if data_tuple and new_data is not None:
  1050. new_data = tuple(new_data)
  1051. else:
  1052. _GuiCoreContext.__assign_var(
  1053. state,
  1054. error_var,
  1055. "Error updating data node tabular value: type does not support at[] indexer.",
  1056. )
  1057. if new_data is not None:
  1058. datanode.write(new_data, comment=user_data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT))
  1059. _GuiCoreContext.__assign_var(state, error_var, "")
  1060. except Exception as e:
  1061. _GuiCoreContext.__assign_var(state, error_var, f"Error updating data node tabular value. {e}")
  1062. _GuiCoreContext.__assign_var(state, payload.get("data_id"), dn_id)
  1063. def get_data_node_properties(self, id: str):
  1064. self.__lazy_start()
  1065. if id and is_readable(t.cast(DataNodeId, id)) and (dn := core_get(id)) and isinstance(dn, DataNode):
  1066. try:
  1067. return (
  1068. (
  1069. (k, f"{v}")
  1070. for k, v in dn._get_user_properties().items()
  1071. if k != _GuiCoreContext.__PROP_ENTITY_NAME
  1072. ),
  1073. )
  1074. except Exception:
  1075. return None
  1076. return None
  1077. def __read_tabular_data(self, datanode: DataNode):
  1078. return datanode.read()
  1079. def get_data_node_tabular_data(self, id: str):
  1080. self.__lazy_start()
  1081. if id and is_readable(t.cast(DataNodeId, id)) and (dn := core_get(id)) and isinstance(dn, DataNode):
  1082. if dn.is_ready_for_reading or (dn.edit_in_progress and dn.editor_id == self.gui._get_client_id()):
  1083. try:
  1084. value = self.__read_tabular_data(dn)
  1085. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  1086. return value
  1087. except Exception:
  1088. return None
  1089. return None
  1090. def get_data_node_tabular_columns(self, id: str):
  1091. self.__lazy_start()
  1092. if id and is_readable(t.cast(DataNodeId, id)) and (dn := core_get(id)) and isinstance(dn, DataNode):
  1093. if dn.is_ready_for_reading or (dn.edit_in_progress and dn.editor_id == self.gui._get_client_id()):
  1094. try:
  1095. value = self.__read_tabular_data(dn)
  1096. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  1097. return self.gui._tbl_cols(
  1098. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=value
  1099. )
  1100. except Exception:
  1101. return None
  1102. return None
  1103. def get_data_node_chart_config(self, id: str):
  1104. self.__lazy_start()
  1105. if id and is_readable(t.cast(DataNodeId, id)) and (dn := core_get(id)) and isinstance(dn, DataNode):
  1106. if dn.is_ready_for_reading or (dn.edit_in_progress and dn.editor_id == self.gui._get_client_id()):
  1107. try:
  1108. return self.gui._chart_conf(
  1109. True,
  1110. True,
  1111. "{}",
  1112. json.dumps({"data": "tabular_data"}),
  1113. tabular_data=self.__read_tabular_data(dn),
  1114. )
  1115. except Exception:
  1116. return None
  1117. return None
  1118. def on_dag_select(self, state: State, id: str, payload: t.Dict[str, str]):
  1119. self.__lazy_start()
  1120. args = payload.get("args")
  1121. if args is None or not isinstance(args, list) or len(args) < 2:
  1122. return
  1123. on_action_function = self.gui._get_user_function(args[1]) if args[1] else None
  1124. if callable(on_action_function):
  1125. try:
  1126. entity = (
  1127. core_get(args[0])
  1128. if (reason := is_readable(t.cast(ScenarioId, args[0])))
  1129. else f"{args[0]} is not readable: {_get_reason(reason)}"
  1130. )
  1131. self.gui._call_function_with_state(
  1132. on_action_function,
  1133. [entity],
  1134. )
  1135. except Exception as e:
  1136. if not self.gui._call_on_exception(args[1], e):
  1137. _warn(f"dag.on_action(): Exception raised in '{args[1]}()' with '{args[0]}'", e)
  1138. elif args[1]:
  1139. _warn(f"dag.on_action(): Invalid function '{args[1]}()'.")
  1140. def on_file_action(self, state: State, id: str, payload: t.Dict[str, t.Any]):
  1141. args = t.cast(list, payload.get("args"))
  1142. act_payload = t.cast(t.Dict[str, str], args[0])
  1143. dn_id = t.cast(DataNodeId, act_payload.get("id"))
  1144. error_id = act_payload.get("error_id", "")
  1145. if reason := is_readable(dn_id):
  1146. try:
  1147. dn = t.cast(_FileDataNodeMixin, core_get(dn_id))
  1148. if act_payload.get("action") == "export":
  1149. path = dn._get_downloadable_path()
  1150. if path:
  1151. self.gui._download(Path(path), dn_id)
  1152. else:
  1153. reason = dn.is_downloadable()
  1154. state.assign(
  1155. error_id,
  1156. "Data unavailable: "
  1157. + ("The data node has never been written." if reason else reason.reasons),
  1158. )
  1159. else:
  1160. checker_name = act_payload.get("upload_check")
  1161. checker = self.gui._get_user_function(checker_name) if checker_name else None
  1162. if not (
  1163. reason := dn._upload(
  1164. act_payload.get("path", ""),
  1165. t.cast(t.Callable[[str, t.Any], bool], checker) if callable(checker) else None,
  1166. )
  1167. ):
  1168. state.assign(error_id, f"Data unavailable: {reason.reasons}")
  1169. except Exception as e:
  1170. state.assign(error_id, f"Data node download error: {e}")
  1171. else:
  1172. state.assign(error_id, reason.reasons)
  1173. def _auth_listener(self, state: State, client_id: t.Optional[str], payload: t.Dict[str, t.Any]):
  1174. self.gui._broadcast(
  1175. _GuiCoreContext._AUTH_CHANGED_NAME,
  1176. payload.get("override", "")
  1177. if (reason := can_create())
  1178. else f"Cannot create scenario: {_get_reason(reason)}",
  1179. client_id,
  1180. )
  1181. def _get_reason(reason: t.Union[bool, ReasonCollection]):
  1182. return reason.reasons if isinstance(reason, ReasonCollection) else " "