_context.py 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import json
  12. import math
  13. import typing as t
  14. from collections import defaultdict
  15. from numbers import Number
  16. from threading import Lock
  17. try:
  18. import zoneinfo
  19. except ImportError:
  20. from backports import zoneinfo # type: ignore[no-redef]
  21. import pandas as pd
  22. from dateutil import parser
  23. from taipy.config import Config
  24. from taipy.core import (
  25. Cycle,
  26. DataNode,
  27. DataNodeId,
  28. Job,
  29. Scenario,
  30. ScenarioId,
  31. Sequence,
  32. SequenceId,
  33. Submission,
  34. SubmissionId,
  35. cancel_job,
  36. create_scenario,
  37. delete_job,
  38. get_cycles_scenarios,
  39. get_data_nodes,
  40. get_jobs,
  41. is_deletable,
  42. is_editable,
  43. is_promotable,
  44. is_readable,
  45. is_submittable,
  46. set_primary,
  47. )
  48. from taipy.core import delete as core_delete
  49. from taipy.core import get as core_get
  50. from taipy.core import submit as core_submit
  51. from taipy.core.data._abstract_tabular import _AbstractTabularDataNode
  52. from taipy.core.notification import CoreEventConsumerBase, EventEntityType
  53. from taipy.core.notification.event import Event, EventOperation
  54. from taipy.core.notification.notifier import Notifier
  55. from taipy.core.submission.submission_status import SubmissionStatus
  56. from taipy.gui import Gui, State
  57. from taipy.gui._warnings import _warn
  58. from taipy.gui.gui import _DoNotUpdate
  59. from ._adapters import _EntityType
  60. class _GuiCoreContext(CoreEventConsumerBase):
  61. __PROP_ENTITY_ID = "id"
  62. __PROP_ENTITY_COMMENT = "comment"
  63. __PROP_CONFIG_ID = "config"
  64. __PROP_DATE = "date"
  65. __PROP_ENTITY_NAME = "name"
  66. __PROP_SCENARIO_PRIMARY = "primary"
  67. __PROP_SCENARIO_TAGS = "tags"
  68. __ENTITY_PROPS = (__PROP_CONFIG_ID, __PROP_DATE, __PROP_ENTITY_NAME)
  69. __ACTION = "action"
  70. _CORE_CHANGED_NAME = "core_changed"
  71. _SCENARIO_SELECTOR_ERROR_VAR = "gui_core_sc_error"
  72. _SCENARIO_SELECTOR_ID_VAR = "gui_core_sc_id"
  73. _SCENARIO_VIZ_ERROR_VAR = "gui_core_sv_error"
  74. _JOB_SELECTOR_ERROR_VAR = "gui_core_js_error"
  75. _DATANODE_VIZ_ERROR_VAR = "gui_core_dv_error"
  76. _DATANODE_VIZ_OWNER_ID_VAR = "gui_core_dv_owner_id"
  77. _DATANODE_VIZ_HISTORY_ID_VAR = "gui_core_dv_history_id"
  78. _DATANODE_VIZ_DATA_ID_VAR = "gui_core_dv_data_id"
  79. _DATANODE_VIZ_DATA_CHART_ID_VAR = "gui_core_dv_data_chart_id"
  80. _DATANODE_VIZ_DATA_NODE_PROP = "data_node"
  81. _DATANODE_SEL_SCENARIO_PROP = "scenario"
  82. def __init__(self, gui: Gui) -> None:
  83. self.gui = gui
  84. self.scenario_by_cycle: t.Optional[t.Dict[t.Optional[Cycle], t.List[Scenario]]] = None
  85. self.data_nodes_by_owner: t.Optional[t.Dict[t.Optional[str], DataNode]] = None
  86. self.scenario_configs: t.Optional[t.List[t.Tuple[str, str]]] = None
  87. self.jobs_list: t.Optional[t.List[Job]] = None
  88. self.client_submission: t.Dict[str, SubmissionStatus] = dict()
  89. # register to taipy core notification
  90. reg_id, reg_queue = Notifier.register()
  91. # locks
  92. self.lock = Lock()
  93. self.submissions_lock = Lock()
  94. # super
  95. super().__init__(reg_id, reg_queue)
  96. self.start()
  97. def process_event(self, event: Event):
  98. if event.entity_type == EventEntityType.SCENARIO:
  99. with self.gui._get_autorization(system=True):
  100. self.scenario_refresh(
  101. event.entity_id
  102. if event.operation != EventOperation.DELETION and is_readable(t.cast(ScenarioId, event.entity_id))
  103. else None
  104. )
  105. elif event.entity_type == EventEntityType.SEQUENCE and event.entity_id:
  106. sequence = None
  107. try:
  108. with self.gui._get_autorization(system=True):
  109. sequence = (
  110. core_get(event.entity_id)
  111. if event.operation != EventOperation.DELETION
  112. and is_readable(t.cast(SequenceId, event.entity_id))
  113. else None
  114. )
  115. if sequence and hasattr(sequence, "parent_ids") and sequence.parent_ids: # type: ignore
  116. self.gui._broadcast(
  117. _GuiCoreContext._CORE_CHANGED_NAME,
  118. {"scenario": [x for x in sequence.parent_ids]}, # type: ignore
  119. )
  120. except Exception as e:
  121. _warn(f"Access to sequence {event.entity_id} failed", e)
  122. elif event.entity_type == EventEntityType.JOB:
  123. with self.lock:
  124. self.jobs_list = None
  125. elif event.entity_type == EventEntityType.SUBMISSION:
  126. self.submission_status_callback(event.entity_id)
  127. elif event.entity_type == EventEntityType.DATA_NODE:
  128. with self.lock:
  129. self.data_nodes_by_owner = None
  130. self.gui._broadcast(
  131. _GuiCoreContext._CORE_CHANGED_NAME,
  132. {"datanode": event.entity_id if event.operation != EventOperation.DELETION else True},
  133. )
  134. def scenario_refresh(self, scenario_id: t.Optional[str]):
  135. with self.lock:
  136. self.scenario_by_cycle = None
  137. self.data_nodes_by_owner = None
  138. self.gui._broadcast(
  139. _GuiCoreContext._CORE_CHANGED_NAME,
  140. {"scenario": scenario_id or True},
  141. )
  142. def submission_status_callback(self, submission_id: t.Optional[str]):
  143. if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
  144. return
  145. try:
  146. last_status = self.client_submission.get(submission_id)
  147. if not last_status:
  148. return
  149. submission = t.cast(Submission, core_get(submission_id))
  150. if not submission or not submission.entity_id:
  151. return
  152. new_status = submission.submission_status
  153. client_id = submission.properties.get("client_id")
  154. if client_id:
  155. running_tasks = {}
  156. with self.gui._get_autorization(client_id):
  157. for job in submission.jobs:
  158. job = job if isinstance(job, Job) else core_get(job)
  159. running_tasks[job.task.id] = (
  160. SubmissionStatus.RUNNING.value
  161. if job.is_running()
  162. else SubmissionStatus.PENDING.value
  163. if job.is_pending()
  164. else None
  165. )
  166. self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, {"tasks": running_tasks}, client_id)
  167. if last_status != new_status:
  168. # callback
  169. submission_name = submission.properties.get("on_submission")
  170. if submission_name:
  171. self.gui._call_user_callback(
  172. client_id,
  173. submission_name,
  174. [core_get(submission.entity_id), {"submission_status": new_status.name}],
  175. submission.properties.get("module_context"),
  176. )
  177. with self.submissions_lock:
  178. if new_status in (
  179. SubmissionStatus.COMPLETED,
  180. SubmissionStatus.FAILED,
  181. SubmissionStatus.CANCELED,
  182. ):
  183. self.client_submission.pop(submission_id, None)
  184. else:
  185. self.client_submission[submission_id] = new_status
  186. except Exception as e:
  187. _warn(f"Submission ({submission_id}) is not available", e)
  188. finally:
  189. self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, {"jobs": True})
  190. def scenario_adapter(self, scenario_or_cycle):
  191. try:
  192. if (
  193. hasattr(scenario_or_cycle, "id")
  194. and is_readable(scenario_or_cycle.id)
  195. and core_get(scenario_or_cycle.id) is not None
  196. ):
  197. if self.scenario_by_cycle and isinstance(scenario_or_cycle, Cycle):
  198. return (
  199. scenario_or_cycle.id,
  200. scenario_or_cycle.get_simple_label(),
  201. sorted(
  202. self.scenario_by_cycle.get(scenario_or_cycle, []),
  203. key=_GuiCoreContext.get_entity_creation_date_iso,
  204. ),
  205. _EntityType.CYCLE.value,
  206. False,
  207. )
  208. elif isinstance(scenario_or_cycle, Scenario):
  209. return (
  210. scenario_or_cycle.id,
  211. scenario_or_cycle.get_simple_label(),
  212. None,
  213. _EntityType.SCENARIO.value,
  214. scenario_or_cycle.is_primary,
  215. )
  216. except Exception as e:
  217. _warn(
  218. f"Access to {type(scenario_or_cycle)} "
  219. + f"({scenario_or_cycle.id if hasattr(scenario_or_cycle, 'id') else 'No_id'})"
  220. + " failed",
  221. e,
  222. )
  223. return None
  224. def get_scenarios(self):
  225. cycles_scenarios = []
  226. with self.lock:
  227. if self.scenario_by_cycle is None:
  228. self.scenario_by_cycle = get_cycles_scenarios()
  229. for cycle, scenarios in self.scenario_by_cycle.items():
  230. if cycle is None:
  231. cycles_scenarios.extend(scenarios)
  232. else:
  233. cycles_scenarios.append(cycle)
  234. return sorted(cycles_scenarios, key=_GuiCoreContext.get_entity_creation_date_iso)
  235. def select_scenario(self, state: State, id: str, payload: t.Dict[str, str]):
  236. args = payload.get("args")
  237. if args is None or not isinstance(args, list) or len(args) == 0:
  238. return
  239. state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ID_VAR, args[0])
  240. def get_scenario_by_id(self, id: str) -> t.Optional[Scenario]:
  241. if not id or not is_readable(t.cast(ScenarioId, id)):
  242. return None
  243. try:
  244. return core_get(t.cast(ScenarioId, id))
  245. except Exception:
  246. return None
  247. def get_scenario_configs(self):
  248. with self.lock:
  249. if self.scenario_configs is None:
  250. configs = Config.scenarios
  251. if isinstance(configs, dict):
  252. self.scenario_configs = [(id, f"{c.id}") for id, c in configs.items() if id != "default"]
  253. return self.scenario_configs
  254. def crud_scenario(self, state: State, id: str, payload: t.Dict[str, str]): # noqa: C901
  255. args = payload.get("args")
  256. if (
  257. args is None
  258. or not isinstance(args, list)
  259. or len(args) < 4
  260. or not isinstance(args[1], bool)
  261. or not isinstance(args[2], bool)
  262. or not isinstance(args[3], dict)
  263. ):
  264. return
  265. update = args[1]
  266. delete = args[2]
  267. data = args[3]
  268. with_dialog = True if len(args) < 5 else bool(args[4])
  269. scenario = None
  270. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  271. if update:
  272. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  273. if delete:
  274. if not is_deletable(scenario_id):
  275. state.assign(
  276. _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Scenario. {scenario_id} is not deletable."
  277. )
  278. return
  279. try:
  280. core_delete(scenario_id)
  281. except Exception as e:
  282. state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Error deleting Scenario. {e}")
  283. else:
  284. if not self.__check_readable_editable(
  285. state, scenario_id, "Scenario", _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR
  286. ):
  287. return
  288. scenario = core_get(scenario_id)
  289. else:
  290. if with_dialog:
  291. config_id = data.get(_GuiCoreContext.__PROP_CONFIG_ID)
  292. scenario_config = Config.scenarios.get(config_id)
  293. if with_dialog and scenario_config is None:
  294. state.assign(
  295. _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Invalid configuration id ({config_id})"
  296. )
  297. return
  298. date_str = data.get(_GuiCoreContext.__PROP_DATE)
  299. try:
  300. date = parser.parse(date_str) if isinstance(date_str, str) else None
  301. except Exception as e:
  302. state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Invalid date ({date_str}).{e}")
  303. return
  304. else:
  305. scenario_config = None
  306. date = None
  307. scenario_id = None
  308. try:
  309. gui: Gui = state._gui
  310. on_creation = args[0] if isinstance(args[0], str) else None
  311. on_creation_function = gui._get_user_function(on_creation) if on_creation else None
  312. if callable(on_creation_function):
  313. try:
  314. res = gui._call_function_with_state(
  315. on_creation_function,
  316. [
  317. id,
  318. {
  319. "action": on_creation,
  320. "config": scenario_config,
  321. "date": date,
  322. "label": name,
  323. "properties": {
  324. v.get("key"): v.get("value") for v in data.get("properties", dict())
  325. },
  326. },
  327. ],
  328. )
  329. if isinstance(res, Scenario):
  330. # everything's fine
  331. scenario_id = res.id
  332. state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, "")
  333. return
  334. if res:
  335. # do not create
  336. state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"{res}")
  337. return
  338. except Exception as e: # pragma: no cover
  339. if not gui._call_on_exception(on_creation, e):
  340. _warn(f"on_creation(): Exception raised in '{on_creation}()'", e)
  341. state.assign(
  342. _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR,
  343. f"Error creating Scenario with '{on_creation}()'. {e}",
  344. )
  345. return
  346. elif on_creation is not None:
  347. _warn(f"on_creation(): '{on_creation}' is not a function.")
  348. elif not with_dialog:
  349. if len(Config.scenarios) == 2:
  350. scenario_config = [sc for k, sc in Config.scenarios.items() if k != "default"][0]
  351. else:
  352. state.assign(
  353. _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR,
  354. "Error creating Scenario: only one scenario config needed "
  355. + f"({len(Config.scenarios) - 1}) found.",
  356. )
  357. return
  358. scenario = create_scenario(scenario_config, date, name)
  359. scenario_id = scenario.id
  360. except Exception as e:
  361. state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Error creating Scenario. {e}")
  362. finally:
  363. self.scenario_refresh(scenario_id)
  364. if scenario:
  365. if not is_editable(scenario):
  366. state.assign(
  367. _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Scenario {scenario_id or name} is not editable."
  368. )
  369. return
  370. with scenario as sc:
  371. sc.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  372. if props := data.get("properties"):
  373. try:
  374. new_keys = [prop.get("key") for prop in props]
  375. for key in t.cast(dict, sc.properties).keys():
  376. if key and key not in _GuiCoreContext.__ENTITY_PROPS and key not in new_keys:
  377. t.cast(dict, sc.properties).pop(key, None)
  378. for prop in props:
  379. key = prop.get("key")
  380. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  381. sc._properties[key] = prop.get("value")
  382. state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, "")
  383. except Exception as e:
  384. state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Error creating Scenario. {e}")
  385. def edit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  386. args = payload.get("args")
  387. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  388. return
  389. data = args[0]
  390. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  391. sequence = data.get("sequence")
  392. if not self.__check_readable_editable(state, entity_id, "Scenario", _GuiCoreContext._SCENARIO_VIZ_ERROR_VAR):
  393. return
  394. scenario: Scenario = core_get(entity_id)
  395. if scenario:
  396. try:
  397. if not sequence:
  398. if isinstance(sequence, str) and (name := data.get(_GuiCoreContext.__PROP_ENTITY_NAME)):
  399. scenario.add_sequence(name, data.get("task_ids"))
  400. else:
  401. primary = data.get(_GuiCoreContext.__PROP_SCENARIO_PRIMARY)
  402. if primary is True:
  403. if not is_promotable(scenario):
  404. state.assign(
  405. _GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, f"Scenario {entity_id} is not promotable."
  406. )
  407. return
  408. set_primary(scenario)
  409. self.__edit_properties(scenario, data)
  410. else:
  411. if data.get("del", False):
  412. scenario.remove_sequence(sequence)
  413. else:
  414. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  415. if sequence != name:
  416. scenario.rename_sequence(sequence, name)
  417. if seqEntity := scenario.sequences.get(name):
  418. seqEntity.tasks = data.get("task_ids")
  419. self.__edit_properties(seqEntity, data)
  420. else:
  421. state.assign(
  422. _GuiCoreContext._SCENARIO_VIZ_ERROR_VAR,
  423. f"Sequence {name} is not available in Scenario {entity_id}.",
  424. )
  425. return
  426. state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, "")
  427. except Exception as e:
  428. state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, f"Error updating {type(scenario).__name__}. {e}")
  429. def submit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  430. args = payload.get("args")
  431. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  432. return
  433. data = args[0]
  434. try:
  435. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  436. entity = core_get(scenario_id)
  437. if sequence := data.get("sequence"):
  438. entity = entity.sequences.get(sequence)
  439. if not is_submittable(entity):
  440. state.assign(
  441. _GuiCoreContext._SCENARIO_VIZ_ERROR_VAR,
  442. f"{'Sequence' if sequence else 'Scenario'} {sequence or scenario_id} is not submittable.",
  443. )
  444. return
  445. if entity:
  446. on_submission = data.get("on_submission_change")
  447. submission_entity = core_submit(
  448. entity,
  449. on_submission=on_submission,
  450. client_id=self.gui._get_client_id(),
  451. module_context=self.gui._get_locals_context(),
  452. )
  453. if on_submission:
  454. with self.submissions_lock:
  455. self.client_submission[submission_entity.id] = submission_entity.submission_status
  456. if Config.core.mode == "development":
  457. with self.submissions_lock:
  458. self.client_submission[submission_entity.id] = SubmissionStatus.SUBMITTED
  459. self.submission_status_callback(submission_entity.id)
  460. state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, "")
  461. except Exception as e:
  462. state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, f"Error submitting entity. {e}")
  463. def __do_datanodes_tree(self):
  464. if self.data_nodes_by_owner is None:
  465. self.data_nodes_by_owner = defaultdict(list)
  466. for dn in get_data_nodes():
  467. self.data_nodes_by_owner[dn.owner_id].append(dn)
  468. def get_datanodes_tree(self, scenario: t.Optional[Scenario]):
  469. with self.lock:
  470. self.__do_datanodes_tree()
  471. return (
  472. self.data_nodes_by_owner.get(scenario.id if scenario else None, []) if self.data_nodes_by_owner else []
  473. ) + (self.get_scenarios() if not scenario else [])
  474. def data_node_adapter(self, data):
  475. try:
  476. if hasattr(data, "id") and is_readable(data.id) and core_get(data.id) is not None:
  477. if isinstance(data, DataNode):
  478. return (data.id, data.get_simple_label(), None, _EntityType.DATANODE.value, False)
  479. else:
  480. with self.lock:
  481. self.__do_datanodes_tree()
  482. if self.data_nodes_by_owner:
  483. if isinstance(data, Cycle):
  484. return (
  485. data.id,
  486. data.get_simple_label(),
  487. self.data_nodes_by_owner[data.id] + self.scenario_by_cycle.get(data, []),
  488. _EntityType.CYCLE.value,
  489. False,
  490. )
  491. elif isinstance(data, Scenario):
  492. return (
  493. data.id,
  494. data.get_simple_label(),
  495. self.data_nodes_by_owner[data.id] + list(data.sequences.values()),
  496. _EntityType.SCENARIO.value,
  497. data.is_primary,
  498. )
  499. elif isinstance(data, Sequence):
  500. if datanodes := self.data_nodes_by_owner.get(data.id):
  501. return (
  502. data.id,
  503. data.get_simple_label(),
  504. datanodes,
  505. _EntityType.SEQUENCE.value,
  506. False,
  507. )
  508. except Exception as e:
  509. _warn(
  510. f"Access to {type(data)} ({data.id if hasattr(data, 'id') else 'No_id'}) failed",
  511. e,
  512. )
  513. return None
  514. def get_jobs_list(self):
  515. with self.lock:
  516. if self.jobs_list is None:
  517. self.jobs_list = get_jobs()
  518. return self.jobs_list
  519. def job_adapter(self, job):
  520. try:
  521. if hasattr(job, "id") and is_readable(job.id) and core_get(job.id) is not None:
  522. if isinstance(job, Job):
  523. entity = core_get(job.owner_id)
  524. return (
  525. job.id,
  526. job.get_simple_label(),
  527. [],
  528. entity.get_simple_label() if entity else "",
  529. entity.id if entity else "",
  530. job.submit_id,
  531. job.creation_date,
  532. job.status.value,
  533. is_deletable(job),
  534. is_readable(job),
  535. is_editable(job),
  536. )
  537. except Exception as e:
  538. _warn(f"Access to job ({job.id if hasattr(job, 'id') else 'No_id'}) failed", e)
  539. return None
  540. def act_on_jobs(self, state: State, id: str, payload: t.Dict[str, str]):
  541. args = payload.get("args")
  542. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  543. return
  544. data = args[0]
  545. job_ids = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  546. job_action = data.get(_GuiCoreContext.__ACTION)
  547. if job_action and isinstance(job_ids, list):
  548. errs = []
  549. if job_action == "delete":
  550. for job_id in job_ids:
  551. if not is_readable(job_id):
  552. errs.append(f"Job {job_id} is not readable.")
  553. continue
  554. if not is_deletable(job_id):
  555. errs.append(f"Job {job_id} is not deletable.")
  556. continue
  557. try:
  558. delete_job(core_get(job_id))
  559. except Exception as e:
  560. errs.append(f"Error deleting job. {e}")
  561. elif job_action == "cancel":
  562. for job_id in job_ids:
  563. if not is_readable(job_id):
  564. errs.append(f"Job {job_id} is not readable.")
  565. continue
  566. if not is_editable(job_id):
  567. errs.append(f"Job {job_id} is not cancelable.")
  568. continue
  569. try:
  570. cancel_job(job_id)
  571. except Exception as e:
  572. errs.append(f"Error canceling job. {e}")
  573. state.assign(_GuiCoreContext._JOB_SELECTOR_ERROR_VAR, "<br/>".join(errs) if errs else "")
  574. def edit_data_node(self, state: State, id: str, payload: t.Dict[str, str]):
  575. args = payload.get("args")
  576. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  577. return
  578. data = args[0]
  579. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  580. if not self.__check_readable_editable(state, entity_id, "DataNode", _GuiCoreContext._DATANODE_VIZ_ERROR_VAR):
  581. return
  582. entity: DataNode = core_get(entity_id)
  583. if isinstance(entity, DataNode):
  584. try:
  585. self.__edit_properties(entity, data)
  586. state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, "")
  587. except Exception as e:
  588. state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, f"Error updating Datanode. {e}")
  589. def lock_datanode_for_edit(self, state: State, id: str, payload: t.Dict[str, str]):
  590. args = payload.get("args")
  591. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  592. return
  593. data = args[0]
  594. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  595. if not self.__check_readable_editable(state, entity_id, "Datanode", _GuiCoreContext._DATANODE_VIZ_ERROR_VAR):
  596. return
  597. lock = data.get("lock", True)
  598. entity: DataNode = core_get(entity_id)
  599. if isinstance(entity, DataNode):
  600. try:
  601. if lock:
  602. entity.lock_edit(self.gui._get_client_id())
  603. else:
  604. entity.unlock_edit(self.gui._get_client_id())
  605. state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, "")
  606. except Exception as e:
  607. state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, f"Error locking Datanode. {e}")
  608. def __edit_properties(self, entity: t.Union[Scenario, Sequence, DataNode], data: t.Dict[str, str]):
  609. with entity as ent:
  610. if isinstance(ent, Scenario):
  611. tags = data.get(_GuiCoreContext.__PROP_SCENARIO_TAGS)
  612. if isinstance(tags, (list, tuple)):
  613. ent.tags = {t for t in tags}
  614. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  615. if isinstance(name, str):
  616. if hasattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME):
  617. setattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME, name)
  618. else:
  619. ent.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  620. props = data.get("properties")
  621. if isinstance(props, (list, tuple)):
  622. for prop in props:
  623. key = prop.get("key")
  624. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  625. ent.properties[key] = prop.get("value")
  626. deleted_props = data.get("deleted_properties")
  627. if isinstance(deleted_props, (list, tuple)):
  628. for prop in deleted_props:
  629. key = prop.get("key")
  630. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  631. ent.properties.pop(key, None)
  632. @staticmethod
  633. def get_entity_creation_date_iso(entity: t.Union[Scenario, Cycle]):
  634. # we might be comparing naive and aware datetime ISO
  635. return entity.creation_date.isoformat()
  636. def get_scenarios_for_owner(self, owner_id: str):
  637. cycles_scenarios: t.List[t.Union[Scenario, Cycle]] = []
  638. with self.lock:
  639. if self.scenario_by_cycle is None:
  640. self.scenario_by_cycle = get_cycles_scenarios()
  641. if owner_id:
  642. if owner_id == "GLOBAL":
  643. for cycle, scenarios in self.scenario_by_cycle.items():
  644. if cycle is None:
  645. cycles_scenarios.extend(scenarios)
  646. else:
  647. cycles_scenarios.append(cycle)
  648. elif is_readable(t.cast(ScenarioId, owner_id)):
  649. entity = core_get(owner_id)
  650. if entity and (scenarios_cycle := self.scenario_by_cycle.get(t.cast(Cycle, entity))):
  651. cycles_scenarios.extend(scenarios_cycle)
  652. elif isinstance(entity, Scenario):
  653. cycles_scenarios.append(entity)
  654. return sorted(cycles_scenarios, key=_GuiCoreContext.get_entity_creation_date_iso)
  655. def get_data_node_history(self, datanode: DataNode, id: str):
  656. if (
  657. id
  658. and isinstance(datanode, DataNode)
  659. and id == datanode.id
  660. and (dn := core_get(id))
  661. and isinstance(dn, DataNode)
  662. ):
  663. res = []
  664. for e in dn.edits:
  665. job_id = e.get("job_id")
  666. job: t.Optional[Job] = None
  667. if job_id:
  668. if not is_readable(job_id):
  669. job_id += " not readable"
  670. else:
  671. job = core_get(job_id)
  672. res.append(
  673. (
  674. e.get("timestamp"),
  675. job_id if job_id else e.get("writer_identifier", ""),
  676. f"Execution of task {job.task.get_simple_label()}."
  677. if job and job.task
  678. else e.get("comment", ""),
  679. )
  680. )
  681. return sorted(res, key=lambda r: r[0], reverse=True)
  682. return _DoNotUpdate()
  683. @staticmethod
  684. def __is_tabular_data(datanode: DataNode, value: t.Any):
  685. if isinstance(datanode, _AbstractTabularDataNode):
  686. return True
  687. if datanode.is_ready_for_reading:
  688. return isinstance(value, (pd.DataFrame, pd.Series, list, tuple, dict))
  689. return False
  690. def get_data_node_data(self, datanode: DataNode, id: str):
  691. if (
  692. id
  693. and isinstance(datanode, DataNode)
  694. and id == datanode.id
  695. and (dn := core_get(id))
  696. and isinstance(dn, DataNode)
  697. ):
  698. if dn._last_edit_date:
  699. if isinstance(dn, _AbstractTabularDataNode):
  700. return (None, None, True, None)
  701. try:
  702. value = dn.read()
  703. if _GuiCoreContext.__is_tabular_data(dn, value):
  704. return (None, None, True, None)
  705. val_type = (
  706. "date"
  707. if "date" in type(value).__name__
  708. else type(value).__name__
  709. if isinstance(value, Number)
  710. else None
  711. )
  712. if isinstance(value, float):
  713. if math.isnan(value):
  714. value = None
  715. return (
  716. value,
  717. val_type,
  718. None,
  719. None,
  720. )
  721. except Exception as e:
  722. return (None, None, None, f"read data_node: {e}")
  723. return (None, None, None, f"Data unavailable for {dn.get_simple_label()}")
  724. return _DoNotUpdate()
  725. def __check_readable_editable(self, state: State, id: str, ent_type: str, var: str):
  726. if not is_readable(t.cast(ScenarioId, id)):
  727. state.assign(var, f"{ent_type} {id} is not readable.")
  728. return False
  729. if not is_editable(t.cast(ScenarioId, id)):
  730. state.assign(var, f"{ent_type} {id} is not editable.")
  731. return False
  732. return True
  733. def update_data(self, state: State, id: str, payload: t.Dict[str, str]):
  734. args = payload.get("args")
  735. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  736. return
  737. data = args[0]
  738. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  739. if not self.__check_readable_editable(state, entity_id, "DataNode", _GuiCoreContext._DATANODE_VIZ_ERROR_VAR):
  740. return
  741. entity: DataNode = core_get(entity_id)
  742. if isinstance(entity, DataNode):
  743. try:
  744. entity.write(
  745. parser.parse(data.get("value"))
  746. if data.get("type") == "date"
  747. else int(data.get("value"))
  748. if data.get("type") == "int"
  749. else float(data.get("value"))
  750. if data.get("type") == "float"
  751. else data.get("value"),
  752. comment=data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT),
  753. )
  754. entity.unlock_edit(self.gui._get_client_id())
  755. state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, "")
  756. except Exception as e:
  757. state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, f"Error updating Datanode value. {e}")
  758. state.assign(_GuiCoreContext._DATANODE_VIZ_DATA_ID_VAR, entity_id) # this will update the data value
  759. def tabular_data_edit(self, state: State, var_name: str, payload: dict):
  760. user_data = payload.get("user_data", {})
  761. dn_id = user_data.get("dn_id")
  762. if not self.__check_readable_editable(state, dn_id, "DataNode", _GuiCoreContext._DATANODE_VIZ_ERROR_VAR):
  763. return
  764. datanode = core_get(dn_id) if dn_id else None
  765. if isinstance(datanode, DataNode):
  766. try:
  767. idx = t.cast(int, payload.get("index"))
  768. col = t.cast(str, payload.get("col"))
  769. tz = payload.get("tz")
  770. val = (
  771. parser.parse(str(payload.get("value"))).astimezone(zoneinfo.ZoneInfo(tz)).replace(tzinfo=None)
  772. if tz is not None
  773. else payload.get("value")
  774. )
  775. # user_value = payload.get("user_value")
  776. data = self.__read_tabular_data(datanode)
  777. new_data: t.Any = None
  778. if isinstance(data, (pd.DataFrame, pd.Series)):
  779. if isinstance(data, pd.DataFrame):
  780. data.at[idx, col] = val
  781. elif isinstance(data, pd.Series):
  782. data.at[idx] = val
  783. new_data = data
  784. else:
  785. data_tuple = False
  786. if isinstance(data, tuple):
  787. data_tuple = True
  788. data = list(data)
  789. if isinstance(data, list):
  790. row = data[idx]
  791. row_tuple = False
  792. if isinstance(row, tuple):
  793. row = list(row)
  794. row_tuple = True
  795. if isinstance(row, list):
  796. row[int(col)] = val
  797. if row_tuple:
  798. data[idx] = tuple(row)
  799. new_data = data
  800. elif col == "0" and (isinstance(row, (str, Number)) or "date" in type(row).__name__):
  801. data[idx] = val
  802. new_data = data
  803. else:
  804. state.assign(
  805. _GuiCoreContext._DATANODE_VIZ_ERROR_VAR,
  806. "Error updating Datanode: cannot handle multi-column list value.",
  807. )
  808. if data_tuple and new_data is not None:
  809. new_data = tuple(new_data)
  810. else:
  811. state.assign(
  812. _GuiCoreContext._DATANODE_VIZ_ERROR_VAR,
  813. "Error updating Datanode tabular value: type does not support at[] indexer.",
  814. )
  815. if new_data is not None:
  816. datanode.write(new_data, comment=user_data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT))
  817. state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, "")
  818. except Exception as e:
  819. state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, f"Error updating Datanode tabular value. {e}")
  820. setattr(state, _GuiCoreContext._DATANODE_VIZ_DATA_ID_VAR, dn_id)
  821. def __read_tabular_data(self, datanode: DataNode):
  822. return datanode.read()
  823. def get_data_node_tabular_data(self, datanode: DataNode, id: str):
  824. if (
  825. id
  826. and isinstance(datanode, DataNode)
  827. and id == datanode.id
  828. and is_readable(t.cast(DataNodeId, id))
  829. and (dn := core_get(id))
  830. and isinstance(dn, DataNode)
  831. and dn.is_ready_for_reading
  832. ):
  833. try:
  834. value = self.__read_tabular_data(dn)
  835. if _GuiCoreContext.__is_tabular_data(dn, value):
  836. return value
  837. except Exception:
  838. return None
  839. return None
  840. def get_data_node_tabular_columns(self, datanode: DataNode, id: str):
  841. if (
  842. id
  843. and isinstance(datanode, DataNode)
  844. and id == datanode.id
  845. and is_readable(t.cast(DataNodeId, id))
  846. and (dn := core_get(id))
  847. and isinstance(dn, DataNode)
  848. and dn.is_ready_for_reading
  849. ):
  850. try:
  851. value = self.__read_tabular_data(dn)
  852. if _GuiCoreContext.__is_tabular_data(dn, value):
  853. return self.gui._tbl_cols(
  854. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=value
  855. )
  856. except Exception:
  857. return None
  858. return None
  859. def get_data_node_chart_config(self, datanode: DataNode, id: str):
  860. if (
  861. id
  862. and isinstance(datanode, DataNode)
  863. and id == datanode.id
  864. and is_readable(t.cast(DataNodeId, id))
  865. and (dn := core_get(id))
  866. and isinstance(dn, DataNode)
  867. and dn.is_ready_for_reading
  868. ):
  869. try:
  870. return self.gui._chart_conf(
  871. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=self.__read_tabular_data(dn)
  872. )
  873. except Exception:
  874. return None
  875. return None
  876. def select_id(self, state: State, id: str, payload: t.Dict[str, str]):
  877. args = payload.get("args")
  878. if args is None or not isinstance(args, list) or len(args) == 0 and isinstance(args[0], dict):
  879. return
  880. data = args[0]
  881. if owner_id := data.get("owner_id"):
  882. state.assign(_GuiCoreContext._DATANODE_VIZ_OWNER_ID_VAR, owner_id)
  883. elif history_id := data.get("history_id"):
  884. state.assign(_GuiCoreContext._DATANODE_VIZ_HISTORY_ID_VAR, history_id)
  885. elif data_id := data.get("data_id"):
  886. state.assign(_GuiCoreContext._DATANODE_VIZ_DATA_ID_VAR, data_id)
  887. elif chart_id := data.get("chart_id"):
  888. state.assign(_GuiCoreContext._DATANODE_VIZ_DATA_CHART_ID_VAR, chart_id)
  889. def on_dag_select(self, state: State, id: str, payload: t.Dict[str, str]):
  890. args = payload.get("args")
  891. if args is None or not isinstance(args, list) or len(args) < 2:
  892. return
  893. on_action_function = self.gui._get_user_function(args[1]) if args[1] else None
  894. if callable(on_action_function):
  895. try:
  896. entity = core_get(args[0]) if is_readable(args[0]) else f"unredable({args[0]})"
  897. self.gui._call_function_with_state(
  898. on_action_function,
  899. [entity],
  900. )
  901. except Exception as e:
  902. if not self.gui._call_on_exception(args[1], e):
  903. _warn(f"dag.on_action(): Exception raised in '{args[1]}()' with '{args[0]}'", e)
  904. elif args[1]:
  905. _warn(f"dag.on_action(): Invalid function '{args[1]}()'.")