_context.py 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import json
  12. import typing as t
  13. from collections import defaultdict
  14. from datetime import datetime
  15. from numbers import Number
  16. from threading import Lock
  17. try:
  18. import zoneinfo
  19. except ImportError:
  20. from backports import zoneinfo # type: ignore[no-redef]
  21. import pandas as pd
  22. from dateutil import parser
  23. from taipy.config import Config
  24. from taipy.core import (
  25. Cycle,
  26. DataNode,
  27. DataNodeId,
  28. Job,
  29. Scenario,
  30. ScenarioId,
  31. Sequence,
  32. SequenceId,
  33. Submission,
  34. SubmissionId,
  35. cancel_job,
  36. create_scenario,
  37. delete_job,
  38. get_cycles_scenarios,
  39. get_data_nodes,
  40. get_jobs,
  41. is_deletable,
  42. is_editable,
  43. is_promotable,
  44. is_readable,
  45. is_submittable,
  46. set_primary,
  47. )
  48. from taipy.core import delete as core_delete
  49. from taipy.core import get as core_get
  50. from taipy.core import submit as core_submit
  51. from taipy.core.notification import CoreEventConsumerBase, EventEntityType
  52. from taipy.core.notification.event import Event, EventOperation
  53. from taipy.core.notification.notifier import Notifier
  54. from taipy.core.submission.submission_status import SubmissionStatus
  55. from taipy.core.taipy import can_create
  56. from taipy.gui import Gui, State
  57. from taipy.gui._warnings import _warn
  58. from taipy.gui.gui import _DoNotUpdate
  59. from ._adapters import (
  60. _EntityType,
  61. _get_datanode_property,
  62. _get_entity_property,
  63. _GuiCoreDatanodeAdapter,
  64. _GuiCoreDatanodeProperties,
  65. _GuiCoreScenarioProperties,
  66. _invoke_action,
  67. )
  68. class _GuiCoreContext(CoreEventConsumerBase):
  69. __PROP_ENTITY_ID = "id"
  70. __PROP_ENTITY_COMMENT = "comment"
  71. __PROP_CONFIG_ID = "config"
  72. __PROP_DATE = "date"
  73. __PROP_ENTITY_NAME = "name"
  74. __PROP_SCENARIO_PRIMARY = "primary"
  75. __PROP_SCENARIO_TAGS = "tags"
  76. __ENTITY_PROPS = (__PROP_CONFIG_ID, __PROP_DATE, __PROP_ENTITY_NAME)
  77. __ACTION = "action"
  78. _CORE_CHANGED_NAME = "core_changed"
  79. def __init__(self, gui: Gui) -> None:
  80. self.gui = gui
  81. self.scenario_by_cycle: t.Optional[t.Dict[t.Optional[Cycle], t.List[Scenario]]] = None
  82. self.data_nodes_by_owner: t.Optional[t.Dict[t.Optional[str], t.List[DataNode]]] = None
  83. self.scenario_configs: t.Optional[t.List[t.Tuple[str, str]]] = None
  84. self.jobs_list: t.Optional[t.List[Job]] = None
  85. self.client_submission: t.Dict[str, SubmissionStatus] = {}
  86. # register to taipy core notification
  87. reg_id, reg_queue = Notifier.register()
  88. # locks
  89. self.lock = Lock()
  90. self.submissions_lock = Lock()
  91. # super
  92. super().__init__(reg_id, reg_queue)
  93. self.start()
  94. def process_event(self, event: Event):
  95. if event.entity_type == EventEntityType.SCENARIO:
  96. with self.gui._get_autorization(system=True):
  97. self.scenario_refresh(
  98. event.entity_id
  99. if event.operation == EventOperation.DELETION or is_readable(t.cast(ScenarioId, event.entity_id))
  100. else None
  101. )
  102. elif event.entity_type == EventEntityType.SEQUENCE and event.entity_id:
  103. sequence = None
  104. try:
  105. with self.gui._get_autorization(system=True):
  106. sequence = (
  107. core_get(event.entity_id)
  108. if event.operation != EventOperation.DELETION
  109. and is_readable(t.cast(SequenceId, event.entity_id))
  110. else None
  111. )
  112. if sequence and hasattr(sequence, "parent_ids") and sequence.parent_ids: # type: ignore
  113. self.broadcast_core_changed({"scenario": list(sequence.parent_ids)})
  114. except Exception as e:
  115. _warn(f"Access to sequence {event.entity_id} failed", e)
  116. elif event.entity_type == EventEntityType.JOB:
  117. with self.lock:
  118. self.jobs_list = None
  119. self.broadcast_core_changed({"jobs": event.entity_id})
  120. elif event.entity_type == EventEntityType.SUBMISSION:
  121. self.submission_status_callback(event.entity_id, event)
  122. elif event.entity_type == EventEntityType.DATA_NODE:
  123. with self.lock:
  124. self.data_nodes_by_owner = None
  125. self.broadcast_core_changed(
  126. {"datanode": event.entity_id if event.operation != EventOperation.DELETION else True}
  127. )
  128. def broadcast_core_changed(self, payload: t.Dict[str, t.Any], client_id: t.Optional[str] = None):
  129. self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, payload, client_id)
  130. def scenario_refresh(self, scenario_id: t.Optional[str]):
  131. with self.lock:
  132. self.scenario_by_cycle = None
  133. self.data_nodes_by_owner = None
  134. self.broadcast_core_changed({"scenario": scenario_id or True})
  135. def submission_status_callback(self, submission_id: t.Optional[str] = None, event: t.Optional[Event] = None):
  136. if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
  137. return
  138. submission = None
  139. new_status = None
  140. payload: t.Optional[t.Dict[str, t.Any]] = None
  141. client_id: t.Optional[str] = None
  142. try:
  143. last_status = self.client_submission.get(submission_id)
  144. if not last_status:
  145. return
  146. submission = t.cast(Submission, core_get(submission_id))
  147. if not submission or not submission.entity_id:
  148. return
  149. payload = {}
  150. new_status = t.cast(SubmissionStatus, submission.submission_status)
  151. client_id = submission.properties.get("client_id")
  152. if client_id:
  153. running_tasks = {}
  154. with self.gui._get_autorization(client_id):
  155. for job in submission.jobs:
  156. job = job if isinstance(job, Job) else core_get(job)
  157. running_tasks[job.task.id] = (
  158. SubmissionStatus.RUNNING.value
  159. if job.is_running()
  160. else SubmissionStatus.PENDING.value
  161. if job.is_pending()
  162. else None
  163. )
  164. payload.update(tasks=running_tasks)
  165. if last_status != new_status:
  166. # callback
  167. submission_name = submission.properties.get("on_submission")
  168. if submission_name:
  169. self.gui.invoke_callback(
  170. client_id,
  171. submission_name,
  172. [
  173. core_get(submission.id),
  174. {
  175. "submission_status": new_status.name,
  176. "submittable_entity": core_get(submission.entity_id),
  177. **(event.metadata if event else {}),
  178. },
  179. ],
  180. submission.properties.get("module_context"),
  181. )
  182. with self.submissions_lock:
  183. if new_status in (
  184. SubmissionStatus.COMPLETED,
  185. SubmissionStatus.FAILED,
  186. SubmissionStatus.CANCELED,
  187. ):
  188. self.client_submission.pop(submission_id, None)
  189. else:
  190. self.client_submission[submission_id] = new_status
  191. except Exception as e:
  192. _warn(f"Submission ({submission_id}) is not available", e)
  193. finally:
  194. if payload is not None:
  195. payload.update(jobs=True)
  196. entity_id = submission.entity_id if submission else None
  197. if entity_id:
  198. payload.update(scenario=entity_id)
  199. if new_status:
  200. payload.update(submission=new_status.value)
  201. self.broadcast_core_changed(payload, client_id)
  202. def no_change_adapter(self, entity: t.List):
  203. return entity
  204. def cycle_adapter(self, cycle: Cycle, sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None):
  205. try:
  206. if (
  207. isinstance(cycle, Cycle)
  208. and is_readable(cycle.id)
  209. and core_get(cycle.id) is not None
  210. and self.scenario_by_cycle
  211. ):
  212. return [
  213. cycle.id,
  214. cycle.get_simple_label(),
  215. self.get_sorted_scenario_list(self.scenario_by_cycle.get(cycle, []), sorts),
  216. _EntityType.CYCLE.value,
  217. False,
  218. ]
  219. except Exception as e:
  220. _warn(
  221. f"Access to {type(cycle).__name__} " + f"({cycle.id if hasattr(cycle, 'id') else 'No_id'})" + " failed",
  222. e,
  223. )
  224. return None
  225. def scenario_adapter(self, scenario: Scenario):
  226. if isinstance(scenario, (tuple, list)):
  227. return scenario
  228. try:
  229. if isinstance(scenario, Scenario) and is_readable(scenario.id) and core_get(scenario.id) is not None:
  230. return [
  231. scenario.id,
  232. scenario.get_simple_label(),
  233. None,
  234. _EntityType.SCENARIO.value,
  235. scenario.is_primary,
  236. ]
  237. except Exception as e:
  238. _warn(
  239. f"Access to {type(scenario).__name__} "
  240. + f"({scenario.id if hasattr(scenario, 'id') else 'No_id'})"
  241. + " failed",
  242. e,
  243. )
  244. return None
  245. def filter_entities(
  246. self, cycle_scenario: t.List, col: str, col_type: str, is_dn: bool, action: str, val: t.Any, col_fn=None
  247. ):
  248. cycle_scenario[2] = [
  249. e for e in cycle_scenario[2] if _invoke_action(e, col, col_type, is_dn, action, val, col_fn)
  250. ]
  251. return cycle_scenario
  252. def adapt_scenarios(self, cycle: t.List):
  253. cycle[2] = [self.scenario_adapter(e) for e in cycle[2]]
  254. return cycle
  255. def get_sorted_scenario_list(
  256. self,
  257. entities: t.Union[t.List[t.Union[Cycle, Scenario]], t.List[Scenario]],
  258. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  259. ):
  260. if sorts:
  261. sorted_list = entities
  262. for sd in reversed(sorts):
  263. col = sd.get("col", "")
  264. col = _GuiCoreScenarioProperties.get_col_name(col)
  265. order = sd.get("order", True)
  266. sorted_list = sorted(sorted_list, key=_get_entity_property(col, Scenario), reverse=not order)
  267. else:
  268. sorted_list = sorted(entities, key=_get_entity_property("creation_date", Scenario))
  269. return [self.cycle_adapter(e, sorts) if isinstance(e, Cycle) else e for e in sorted_list]
  270. def get_filtered_scenario_list(
  271. self,
  272. entities: t.List[t.Union[t.List, Scenario]],
  273. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  274. ):
  275. if not filters:
  276. return entities
  277. # filtering
  278. filtered_list = list(entities)
  279. for fd in filters:
  280. col = fd.get("col", "")
  281. is_datanode_prop = _get_datanode_property(col) is not None
  282. col_type = _GuiCoreScenarioProperties.get_type(col)
  283. col = _GuiCoreScenarioProperties.get_col_name(col)
  284. col_fn = cp[0] if (cp := col.split("(")) and len(cp) > 1 else None
  285. val = fd.get("value")
  286. action = fd.get("action", "")
  287. # level 1 filtering
  288. filtered_list = [
  289. e
  290. for e in filtered_list
  291. if not isinstance(e, Scenario)
  292. or _invoke_action(e, col, col_type, is_datanode_prop, action, val, col_fn)
  293. ]
  294. # level 2 filtering
  295. filtered_list = [
  296. e
  297. if isinstance(e, Scenario)
  298. else self.filter_entities(e, col, col_type, is_datanode_prop, action, val, col_fn)
  299. for e in filtered_list
  300. ]
  301. # remove empty cycles
  302. return [e for e in filtered_list if isinstance(e, Scenario) or (isinstance(e, (tuple, list)) and len(e[2]))]
  303. def get_scenarios(
  304. self,
  305. scenarios: t.Optional[t.List[t.Union[Cycle, Scenario]]],
  306. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  307. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  308. ):
  309. cycles_scenarios: t.List[t.Union[Cycle, Scenario]] = []
  310. with self.lock:
  311. # always needed to get scenarios for a cycle in cycle_adapter
  312. if self.scenario_by_cycle is None:
  313. self.scenario_by_cycle = get_cycles_scenarios()
  314. if scenarios is None:
  315. for cycle, c_scenarios in self.scenario_by_cycle.items():
  316. if cycle is None:
  317. cycles_scenarios.extend(c_scenarios)
  318. else:
  319. cycles_scenarios.append(cycle)
  320. if scenarios is not None:
  321. cycles_scenarios = scenarios
  322. adapted_list = self.get_sorted_scenario_list(cycles_scenarios, sorts)
  323. adapted_list = self.get_filtered_scenario_list(adapted_list, filters)
  324. return adapted_list
  325. def select_scenario(self, state: State, id: str, payload: t.Dict[str, str]):
  326. args = payload.get("args")
  327. if args is None or not isinstance(args, list) or len(args) < 2:
  328. return
  329. state.assign(args[0], args[1])
  330. def get_scenario_by_id(self, id: str) -> t.Optional[Scenario]:
  331. if not id or not is_readable(t.cast(ScenarioId, id)):
  332. return None
  333. try:
  334. return core_get(t.cast(ScenarioId, id))
  335. except Exception:
  336. return None
  337. def get_scenario_configs(self):
  338. with self.lock:
  339. if self.scenario_configs is None:
  340. configs = Config.scenarios
  341. if isinstance(configs, dict):
  342. self.scenario_configs = [(id, f"{c.id}") for id, c in configs.items() if id != "default"]
  343. return self.scenario_configs
  344. def crud_scenario(self, state: State, id: str, payload: t.Dict[str, str]): # noqa: C901
  345. args = payload.get("args")
  346. start_idx = 2
  347. if (
  348. args is None
  349. or not isinstance(args, list)
  350. or len(args) < start_idx + 3
  351. or not isinstance(args[start_idx], bool)
  352. or not isinstance(args[start_idx + 1], bool)
  353. or not isinstance(args[start_idx + 2], dict)
  354. ):
  355. return
  356. error_var = payload.get("error_id")
  357. update = args[start_idx]
  358. delete = args[start_idx + 1]
  359. data = args[start_idx + 2]
  360. with_dialog = True if len(args) < start_idx + 4 else bool(args[start_idx + 3])
  361. scenario = None
  362. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  363. if update:
  364. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  365. if delete:
  366. if not is_deletable(scenario_id):
  367. state.assign(error_var, f"Scenario. {scenario_id} is not deletable.")
  368. return
  369. try:
  370. core_delete(scenario_id)
  371. except Exception as e:
  372. state.assign(error_var, f"Error deleting Scenario. {e}")
  373. else:
  374. if not self.__check_readable_editable(state, scenario_id, "Scenario", error_var):
  375. return
  376. scenario = core_get(scenario_id)
  377. else:
  378. if with_dialog:
  379. config_id = data.get(_GuiCoreContext.__PROP_CONFIG_ID)
  380. scenario_config = Config.scenarios.get(config_id)
  381. if with_dialog and scenario_config is None:
  382. state.assign(error_var, f"Invalid configuration id ({config_id})")
  383. return
  384. date_str = data.get(_GuiCoreContext.__PROP_DATE)
  385. try:
  386. date = parser.parse(date_str) if isinstance(date_str, str) else None
  387. except Exception as e:
  388. state.assign(error_var, f"Invalid date ({date_str}).{e}")
  389. return
  390. else:
  391. scenario_config = None
  392. date = None
  393. scenario_id = None
  394. try:
  395. gui = state.get_gui()
  396. on_creation = args[0] if isinstance(args[0], str) else None
  397. on_creation_function = gui._get_user_function(on_creation) if on_creation else None
  398. if callable(on_creation_function):
  399. try:
  400. res = gui._call_function_with_state(
  401. on_creation_function,
  402. [
  403. id,
  404. {
  405. "action": on_creation,
  406. "config": scenario_config,
  407. "date": date,
  408. "label": name,
  409. "properties": {v.get("key"): v.get("value") for v in data.get("properties", {})},
  410. },
  411. ],
  412. )
  413. if isinstance(res, Scenario):
  414. # everything's fine
  415. scenario_id = res.id
  416. state.assign(error_var, "")
  417. return
  418. if res:
  419. # do not create
  420. state.assign(error_var, f"{res}")
  421. return
  422. except Exception as e: # pragma: no cover
  423. if not gui._call_on_exception(on_creation, e):
  424. _warn(f"on_creation(): Exception raised in '{on_creation}()'", e)
  425. state.assign(
  426. error_var,
  427. f"Error creating Scenario with '{on_creation}()'. {e}",
  428. )
  429. return
  430. elif on_creation is not None:
  431. _warn(f"on_creation(): '{on_creation}' is not a function.")
  432. elif not with_dialog:
  433. if len(Config.scenarios) == 2:
  434. scenario_config = next(sc for k, sc in Config.scenarios.items() if k != "default")
  435. else:
  436. state.assign(
  437. error_var,
  438. "Error creating Scenario: only one scenario config needed "
  439. + f"({len(Config.scenarios) - 1}) found.",
  440. )
  441. return
  442. scenario = create_scenario(scenario_config, date, name)
  443. scenario_id = scenario.id
  444. except Exception as e:
  445. state.assign(error_var, f"Error creating Scenario. {e}")
  446. finally:
  447. self.scenario_refresh(scenario_id)
  448. if scenario and (sel_scenario_var := args[1] if isinstance(args[1], str) else None):
  449. try:
  450. var_name, _ = gui._get_real_var_name(sel_scenario_var)
  451. state.assign(var_name, scenario)
  452. except Exception as e: # pragma: no cover
  453. _warn("Can't find value variable name in context", e)
  454. if scenario:
  455. if not is_editable(scenario):
  456. state.assign(error_var, f"Scenario {scenario_id or name} is not editable.")
  457. return
  458. with scenario as sc:
  459. sc.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  460. if props := data.get("properties"):
  461. try:
  462. new_keys = [prop.get("key") for prop in props]
  463. for key in t.cast(dict, sc.properties).keys():
  464. if key and key not in _GuiCoreContext.__ENTITY_PROPS and key not in new_keys:
  465. t.cast(dict, sc.properties).pop(key, None)
  466. for prop in props:
  467. key = prop.get("key")
  468. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  469. sc._properties[key] = prop.get("value")
  470. state.assign(error_var, "")
  471. except Exception as e:
  472. state.assign(error_var, f"Error creating Scenario. {e}")
  473. @staticmethod
  474. def __assign_var(state: State, var_name: t.Optional[str], msg: str):
  475. if var_name:
  476. state.assign(var_name, msg)
  477. def edit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  478. args = payload.get("args")
  479. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  480. return
  481. error_var = payload.get("error_id")
  482. data = args[0]
  483. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  484. sequence = data.get("sequence")
  485. if not self.__check_readable_editable(state, entity_id, "Scenario", error_var):
  486. return
  487. scenario: Scenario = core_get(entity_id)
  488. if scenario:
  489. try:
  490. if not sequence:
  491. if isinstance(sequence, str) and (name := data.get(_GuiCoreContext.__PROP_ENTITY_NAME)):
  492. scenario.add_sequence(name, data.get("task_ids"))
  493. else:
  494. primary = data.get(_GuiCoreContext.__PROP_SCENARIO_PRIMARY)
  495. if primary is True:
  496. if not is_promotable(scenario):
  497. _GuiCoreContext.__assign_var(
  498. state, error_var, f"Scenario {entity_id} is not promotable."
  499. )
  500. return
  501. set_primary(scenario)
  502. self.__edit_properties(scenario, data)
  503. else:
  504. if data.get("del", False):
  505. scenario.remove_sequence(sequence)
  506. else:
  507. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  508. if sequence != name:
  509. scenario.rename_sequence(sequence, name)
  510. if seqEntity := scenario.sequences.get(name):
  511. seqEntity.tasks = data.get("task_ids")
  512. self.__edit_properties(seqEntity, data)
  513. else:
  514. _GuiCoreContext.__assign_var(
  515. state,
  516. error_var,
  517. f"Sequence {name} is not available in Scenario {entity_id}.",
  518. )
  519. return
  520. _GuiCoreContext.__assign_var(state, error_var, "")
  521. except Exception as e:
  522. _GuiCoreContext.__assign_var(state, error_var, f"Error updating {type(scenario).__name__}. {e}")
  523. def submit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  524. args = payload.get("args")
  525. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  526. return
  527. data = args[0]
  528. error_var = payload.get("error_id")
  529. try:
  530. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  531. entity = core_get(scenario_id)
  532. if sequence := data.get("sequence"):
  533. entity = entity.sequences.get(sequence)
  534. if not (reason := is_submittable(entity)):
  535. _GuiCoreContext.__assign_var(
  536. state,
  537. error_var,
  538. f"{'Sequence' if sequence else 'Scenario'} {sequence or scenario_id} is not submittable: "
  539. + reason.reasons,
  540. )
  541. return
  542. if entity:
  543. on_submission = data.get("on_submission_change")
  544. submission_entity = core_submit(
  545. entity,
  546. on_submission=on_submission,
  547. client_id=self.gui._get_client_id(),
  548. module_context=self.gui._get_locals_context(),
  549. )
  550. with self.submissions_lock:
  551. self.client_submission[submission_entity.id] = submission_entity.submission_status
  552. if Config.core.mode == "development":
  553. with self.submissions_lock:
  554. self.client_submission[submission_entity.id] = SubmissionStatus.SUBMITTED
  555. self.submission_status_callback(submission_entity.id)
  556. _GuiCoreContext.__assign_var(state, error_var, "")
  557. except Exception as e:
  558. _GuiCoreContext.__assign_var(state, error_var, f"Error submitting entity. {e}")
  559. def get_filtered_datanode_list(
  560. self,
  561. entities: t.List[t.Union[t.List, DataNode]],
  562. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  563. ):
  564. if not filters or not entities:
  565. return entities
  566. # filtering
  567. filtered_list = list(entities)
  568. for fd in filters:
  569. col = fd.get("col", "")
  570. col_type = _GuiCoreDatanodeProperties.get_type(col)
  571. col = _GuiCoreDatanodeProperties.get_col_name(col)
  572. col_fn = cp[0] if (cp := col.split("(")) and len(cp) > 1 else None
  573. val = fd.get("value")
  574. action = fd.get("action", "")
  575. if isinstance(val, str) and col_type == "date":
  576. val = datetime.fromisoformat(val[:-1])
  577. # level 1 filtering
  578. filtered_list = [
  579. e
  580. for e in filtered_list
  581. if not isinstance(e, DataNode) or _invoke_action(e, col, col_type, False, action, val, col_fn)
  582. ]
  583. # level 3 filtering
  584. filtered_list = [
  585. e if isinstance(e, DataNode) else self.filter_entities(d, col, col_type, False, action, val, col_fn)
  586. for e in filtered_list
  587. for d in e[2]
  588. ]
  589. # remove empty cycles
  590. return [e for e in filtered_list if isinstance(e, DataNode) or (isinstance(e, (tuple, list)) and len(e[2]))]
  591. def get_sorted_datanode_list(
  592. self,
  593. entities: t.Union[
  594. t.List[t.Union[Cycle, Scenario, DataNode]], t.List[t.Union[Scenario, DataNode]], t.List[DataNode]
  595. ],
  596. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  597. adapt_dn=False,
  598. ):
  599. if not entities:
  600. return entities
  601. if sorts:
  602. sorted_list = entities
  603. for sd in reversed(sorts):
  604. col = sd.get("col", "")
  605. col = _GuiCoreDatanodeProperties.get_col_name(col)
  606. order = sd.get("order", True)
  607. sorted_list = sorted(sorted_list, key=_get_entity_property(col, DataNode), reverse=not order)
  608. else:
  609. sorted_list = entities
  610. return [self.data_node_adapter(e, sorts, adapt_dn) for e in sorted_list]
  611. def __do_datanodes_tree(self):
  612. if self.data_nodes_by_owner is None:
  613. self.data_nodes_by_owner = defaultdict(list)
  614. for dn in get_data_nodes():
  615. self.data_nodes_by_owner[dn.owner_id].append(dn)
  616. def get_datanodes_tree(
  617. self,
  618. scenarios: t.Optional[t.Union[Scenario, t.List[Scenario]]],
  619. datanodes: t.Optional[t.List[DataNode]],
  620. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  621. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  622. ):
  623. base_list = []
  624. with self.lock:
  625. self.__do_datanodes_tree()
  626. if datanodes is None:
  627. if scenarios is None:
  628. base_list = (self.data_nodes_by_owner or {}).get(None, []) + (
  629. self.get_scenarios(None, None, None) or []
  630. )
  631. else:
  632. if isinstance(scenarios, (list, tuple)) and len(scenarios) > 1:
  633. base_list = scenarios
  634. else:
  635. if self.data_nodes_by_owner:
  636. owners = scenarios if isinstance(scenarios, (list, tuple)) else [scenarios]
  637. base_list = [d for owner in owners for d in (self.data_nodes_by_owner).get(owner.id, [])]
  638. else:
  639. base_list = []
  640. else:
  641. base_list = datanodes
  642. adapted_list = self.get_sorted_datanode_list(base_list, sorts)
  643. return self.get_filtered_datanode_list(adapted_list, filters)
  644. def data_node_adapter(
  645. self,
  646. data: t.Union[Cycle, Scenario, Sequence, DataNode],
  647. sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None,
  648. adapt_dn=True,
  649. ):
  650. if isinstance(data, tuple):
  651. raise NotImplementedError
  652. if isinstance(data, list):
  653. if data[2] and isinstance(data[2][0], (Cycle, Scenario, Sequence, DataNode)):
  654. data[2] = self.get_sorted_datanode_list(data[2], sorts, False)
  655. return data
  656. try:
  657. if hasattr(data, "id") and is_readable(data.id) and core_get(data.id) is not None:
  658. if isinstance(data, DataNode):
  659. return (
  660. [data.id, data.get_simple_label(), None, _EntityType.DATANODE.value, False]
  661. if adapt_dn
  662. else data
  663. )
  664. with self.lock:
  665. self.__do_datanodes_tree()
  666. if self.data_nodes_by_owner:
  667. if isinstance(data, Cycle):
  668. return [
  669. data.id,
  670. data.get_simple_label(),
  671. self.get_sorted_datanode_list(
  672. self.data_nodes_by_owner.get(data.id, [])
  673. + (self.scenario_by_cycle or {}).get(data, []),
  674. sorts,
  675. False,
  676. ),
  677. _EntityType.CYCLE.value,
  678. False,
  679. ]
  680. elif isinstance(data, Scenario):
  681. return [
  682. data.id,
  683. data.get_simple_label(),
  684. self.get_sorted_datanode_list(
  685. self.data_nodes_by_owner.get(data.id, []) + list(data.sequences.values()),
  686. sorts,
  687. False,
  688. ),
  689. _EntityType.SCENARIO.value,
  690. data.is_primary,
  691. ]
  692. elif isinstance(data, Sequence):
  693. if datanodes := self.data_nodes_by_owner.get(data.id):
  694. return [
  695. data.id,
  696. data.get_simple_label(),
  697. self.get_sorted_datanode_list(datanodes, sorts, False),
  698. _EntityType.SEQUENCE.value,
  699. ]
  700. except Exception as e:
  701. _warn(
  702. f"Access to {type(data)} ({data.id if hasattr(data, 'id') else 'No_id'}) failed",
  703. e,
  704. )
  705. return None
  706. def get_jobs_list(self):
  707. with self.lock:
  708. if self.jobs_list is None:
  709. self.jobs_list = get_jobs()
  710. return self.jobs_list
  711. def job_adapter(self, job):
  712. try:
  713. if hasattr(job, "id") and is_readable(job.id) and core_get(job.id) is not None:
  714. if isinstance(job, Job):
  715. entity = core_get(job.owner_id)
  716. return (
  717. job.id,
  718. job.get_simple_label(),
  719. [],
  720. entity.get_simple_label() if entity else "",
  721. entity.id if entity else "",
  722. job.submit_id,
  723. job.creation_date,
  724. job.status.value,
  725. is_deletable(job),
  726. is_readable(job),
  727. is_editable(job),
  728. )
  729. except Exception as e:
  730. _warn(f"Access to job ({job.id if hasattr(job, 'id') else 'No_id'}) failed", e)
  731. return None
  732. def act_on_jobs(self, state: State, id: str, payload: t.Dict[str, str]):
  733. args = payload.get("args")
  734. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  735. return
  736. data = args[0]
  737. job_ids = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  738. job_action = data.get(_GuiCoreContext.__ACTION)
  739. if job_action and isinstance(job_ids, list):
  740. errs = []
  741. if job_action == "delete":
  742. for job_id in job_ids:
  743. if not is_readable(job_id):
  744. errs.append(f"Job {job_id} is not readable.")
  745. continue
  746. if not is_deletable(job_id):
  747. errs.append(f"Job {job_id} is not deletable.")
  748. continue
  749. try:
  750. delete_job(core_get(job_id))
  751. except Exception as e:
  752. errs.append(f"Error deleting job. {e}")
  753. elif job_action == "cancel":
  754. for job_id in job_ids:
  755. if not is_readable(job_id):
  756. errs.append(f"Job {job_id} is not readable.")
  757. continue
  758. if not is_editable(job_id):
  759. errs.append(f"Job {job_id} is not cancelable.")
  760. continue
  761. try:
  762. cancel_job(job_id)
  763. except Exception as e:
  764. errs.append(f"Error canceling job. {e}")
  765. _GuiCoreContext.__assign_var(state, payload.get("error_id"), "<br/>".join(errs) if errs else "")
  766. def edit_data_node(self, state: State, id: str, payload: t.Dict[str, str]):
  767. args = payload.get("args")
  768. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  769. return
  770. error_var = payload.get("error_id")
  771. data = args[0]
  772. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  773. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  774. return
  775. entity: DataNode = core_get(entity_id)
  776. if isinstance(entity, DataNode):
  777. try:
  778. self.__edit_properties(entity, data)
  779. _GuiCoreContext.__assign_var(state, error_var, "")
  780. except Exception as e:
  781. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode. {e}")
  782. def lock_datanode_for_edit(self, state: State, id: str, payload: t.Dict[str, str]):
  783. args = payload.get("args")
  784. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  785. return
  786. data = args[0]
  787. error_var = payload.get("error_id")
  788. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  789. if not self.__check_readable_editable(state, entity_id, "Datanode", error_var):
  790. return
  791. lock = data.get("lock", True)
  792. entity: DataNode = core_get(entity_id)
  793. if isinstance(entity, DataNode):
  794. try:
  795. if lock:
  796. entity.lock_edit(self.gui._get_client_id())
  797. else:
  798. entity.unlock_edit(self.gui._get_client_id())
  799. _GuiCoreContext.__assign_var(state, error_var, "")
  800. except Exception as e:
  801. _GuiCoreContext.__assign_var(state, error_var, f"Error locking Datanode. {e}")
  802. def __edit_properties(self, entity: t.Union[Scenario, Sequence, DataNode], data: t.Dict[str, str]):
  803. with entity as ent:
  804. if isinstance(ent, Scenario):
  805. tags = data.get(_GuiCoreContext.__PROP_SCENARIO_TAGS)
  806. if isinstance(tags, (list, tuple)):
  807. ent.tags = set(tags)
  808. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  809. if isinstance(name, str):
  810. if hasattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME):
  811. setattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME, name)
  812. else:
  813. ent.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  814. props = data.get("properties")
  815. if isinstance(props, (list, tuple)):
  816. for prop in props:
  817. key = prop.get("key")
  818. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  819. ent.properties[key] = prop.get("value")
  820. deleted_props = data.get("deleted_properties")
  821. if isinstance(deleted_props, (list, tuple)):
  822. for prop in deleted_props:
  823. key = prop.get("key")
  824. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  825. ent.properties.pop(key, None)
  826. def get_scenarios_for_owner(self, owner_id: str):
  827. cycles_scenarios: t.List[t.Union[Scenario, Cycle]] = []
  828. with self.lock:
  829. if self.scenario_by_cycle is None:
  830. self.scenario_by_cycle = get_cycles_scenarios()
  831. if owner_id:
  832. if owner_id == "GLOBAL":
  833. for cycle, scenarios in self.scenario_by_cycle.items():
  834. if cycle is None:
  835. cycles_scenarios.extend(scenarios)
  836. else:
  837. cycles_scenarios.append(cycle)
  838. elif is_readable(t.cast(ScenarioId, owner_id)):
  839. entity = core_get(owner_id)
  840. if entity and (scenarios_cycle := self.scenario_by_cycle.get(t.cast(Cycle, entity))):
  841. cycles_scenarios.extend(scenarios_cycle)
  842. elif isinstance(entity, Scenario):
  843. cycles_scenarios.append(entity)
  844. return sorted(cycles_scenarios, key=_get_entity_property("creation_date", Scenario))
  845. def get_data_node_history(self, id: str):
  846. if id and (dn := core_get(id)) and isinstance(dn, DataNode):
  847. res = []
  848. for e in dn.edits:
  849. job_id = e.get("job_id")
  850. job: t.Optional[Job] = None
  851. if job_id:
  852. if not is_readable(job_id):
  853. job_id += " not readable"
  854. else:
  855. job = core_get(job_id)
  856. res.append(
  857. (
  858. e.get("timestamp"),
  859. job_id if job_id else e.get("writer_identifier", ""),
  860. f"Execution of task {job.task.get_simple_label()}."
  861. if job and job.task
  862. else e.get("comment", ""),
  863. )
  864. )
  865. return sorted(res, key=lambda r: r[0], reverse=True)
  866. return _DoNotUpdate()
  867. def __check_readable_editable(self, state: State, id: str, ent_type: str, var: t.Optional[str]):
  868. if not is_readable(t.cast(ScenarioId, id)):
  869. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not readable.")
  870. return False
  871. if not is_editable(t.cast(ScenarioId, id)):
  872. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not editable.")
  873. return False
  874. return True
  875. def update_data(self, state: State, id: str, payload: t.Dict[str, str]):
  876. args = payload.get("args")
  877. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  878. return
  879. data = args[0]
  880. error_var = payload.get("error_id")
  881. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  882. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  883. return
  884. entity: DataNode = core_get(entity_id)
  885. if isinstance(entity, DataNode):
  886. try:
  887. entity.write(
  888. parser.parse(data.get("value"))
  889. if data.get("type") == "date"
  890. else int(data.get("value"))
  891. if data.get("type") == "int"
  892. else float(data.get("value"))
  893. if data.get("type") == "float"
  894. else data.get("value"),
  895. comment=data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT),
  896. )
  897. entity.unlock_edit(self.gui._get_client_id())
  898. _GuiCoreContext.__assign_var(state, error_var, "")
  899. except Exception as e:
  900. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode value. {e}")
  901. _GuiCoreContext.__assign_var(state, payload.get("data_id"), entity_id) # this will update the data value
  902. def tabular_data_edit(self, state: State, var_name: str, payload: dict):
  903. error_var = payload.get("error_id")
  904. user_data = payload.get("user_data", {})
  905. dn_id = user_data.get("dn_id")
  906. if not self.__check_readable_editable(state, dn_id, "DataNode", error_var):
  907. return
  908. datanode = core_get(dn_id) if dn_id else None
  909. if isinstance(datanode, DataNode):
  910. try:
  911. idx = t.cast(int, payload.get("index"))
  912. col = t.cast(str, payload.get("col"))
  913. tz = payload.get("tz")
  914. val = (
  915. parser.parse(str(payload.get("value"))).astimezone(zoneinfo.ZoneInfo(tz)).replace(tzinfo=None)
  916. if tz is not None
  917. else payload.get("value")
  918. )
  919. # user_value = payload.get("user_value")
  920. data = self.__read_tabular_data(datanode)
  921. new_data: t.Any = None
  922. if isinstance(data, (pd.DataFrame, pd.Series)):
  923. if isinstance(data, pd.DataFrame):
  924. data.at[idx, col] = val
  925. elif isinstance(data, pd.Series):
  926. data.at[idx] = val
  927. new_data = data
  928. else:
  929. data_tuple = False
  930. if isinstance(data, tuple):
  931. data_tuple = True
  932. data = list(data)
  933. if isinstance(data, list):
  934. row = data[idx]
  935. row_tuple = False
  936. if isinstance(row, tuple):
  937. row = list(row)
  938. row_tuple = True
  939. if isinstance(row, list):
  940. row[int(col)] = val
  941. if row_tuple:
  942. data[idx] = tuple(row)
  943. new_data = data
  944. elif col == "0" and (isinstance(row, (str, Number)) or "date" in type(row).__name__):
  945. data[idx] = val
  946. new_data = data
  947. else:
  948. _GuiCoreContext.__assign_var(
  949. state,
  950. error_var,
  951. "Error updating Datanode: cannot handle multi-column list value.",
  952. )
  953. if data_tuple and new_data is not None:
  954. new_data = tuple(new_data)
  955. else:
  956. _GuiCoreContext.__assign_var(
  957. state,
  958. error_var,
  959. "Error updating Datanode tabular value: type does not support at[] indexer.",
  960. )
  961. if new_data is not None:
  962. datanode.write(new_data, comment=user_data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT))
  963. _GuiCoreContext.__assign_var(state, error_var, "")
  964. except Exception as e:
  965. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode tabular value. {e}")
  966. _GuiCoreContext.__assign_var(state, payload.get("data_id"), dn_id)
  967. def get_data_node_properties(self, id: str):
  968. if id and is_readable(t.cast(DataNodeId, id)) and (dn := core_get(id)) and isinstance(dn, DataNode):
  969. try:
  970. return (
  971. (
  972. (k, f"{v}")
  973. for k, v in dn._get_user_properties().items()
  974. if k != _GuiCoreContext.__PROP_ENTITY_NAME
  975. ),
  976. )
  977. except Exception:
  978. return None
  979. return None
  980. def __read_tabular_data(self, datanode: DataNode):
  981. return datanode.read()
  982. def get_data_node_tabular_data(self, id: str):
  983. if (
  984. id
  985. and is_readable(t.cast(DataNodeId, id))
  986. and (dn := core_get(id))
  987. and isinstance(dn, DataNode)
  988. and dn.is_ready_for_reading
  989. ):
  990. try:
  991. value = self.__read_tabular_data(dn)
  992. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  993. return value
  994. except Exception:
  995. return None
  996. return None
  997. def get_data_node_tabular_columns(self, id: str):
  998. if (
  999. id
  1000. and is_readable(t.cast(DataNodeId, id))
  1001. and (dn := core_get(id))
  1002. and isinstance(dn, DataNode)
  1003. and dn.is_ready_for_reading
  1004. ):
  1005. try:
  1006. value = self.__read_tabular_data(dn)
  1007. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  1008. return self.gui._tbl_cols(
  1009. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=value
  1010. )
  1011. except Exception:
  1012. return None
  1013. return None
  1014. def get_data_node_chart_config(self, id: str):
  1015. if (
  1016. id
  1017. and is_readable(t.cast(DataNodeId, id))
  1018. and (dn := core_get(id))
  1019. and isinstance(dn, DataNode)
  1020. and dn.is_ready_for_reading
  1021. ):
  1022. try:
  1023. return self.gui._chart_conf(
  1024. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=self.__read_tabular_data(dn)
  1025. )
  1026. except Exception:
  1027. return None
  1028. return None
  1029. def on_dag_select(self, state: State, id: str, payload: t.Dict[str, str]):
  1030. args = payload.get("args")
  1031. if args is None or not isinstance(args, list) or len(args) < 2:
  1032. return
  1033. on_action_function = self.gui._get_user_function(args[1]) if args[1] else None
  1034. if callable(on_action_function):
  1035. try:
  1036. entity = core_get(args[0]) if is_readable(args[0]) else f"unredable({args[0]})"
  1037. self.gui._call_function_with_state(
  1038. on_action_function,
  1039. [entity],
  1040. )
  1041. except Exception as e:
  1042. if not self.gui._call_on_exception(args[1], e):
  1043. _warn(f"dag.on_action(): Exception raised in '{args[1]}()' with '{args[0]}'", e)
  1044. elif args[1]:
  1045. _warn(f"dag.on_action(): Invalid function '{args[1]}()'.")
  1046. def get_creation_reason(self):
  1047. return "" if (reason := can_create()) else f"Cannot create scenario: {reason.reasons}"