_context.py 47 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import json
  12. import typing as t
  13. from collections import defaultdict
  14. from datetime import datetime
  15. from numbers import Number
  16. from threading import Lock
  17. try:
  18. import zoneinfo
  19. except ImportError:
  20. from backports import zoneinfo # type: ignore[no-redef]
  21. import pandas as pd
  22. from dateutil import parser
  23. from taipy.config import Config
  24. from taipy.core import (
  25. Cycle,
  26. DataNode,
  27. DataNodeId,
  28. Job,
  29. Scenario,
  30. ScenarioId,
  31. Sequence,
  32. SequenceId,
  33. Submission,
  34. SubmissionId,
  35. cancel_job,
  36. create_scenario,
  37. delete_job,
  38. get_cycles_scenarios,
  39. get_data_nodes,
  40. get_jobs,
  41. is_deletable,
  42. is_editable,
  43. is_promotable,
  44. is_readable,
  45. is_submittable,
  46. set_primary,
  47. )
  48. from taipy.core import delete as core_delete
  49. from taipy.core import get as core_get
  50. from taipy.core import submit as core_submit
  51. from taipy.core.notification import CoreEventConsumerBase, EventEntityType
  52. from taipy.core.notification.event import Event, EventOperation
  53. from taipy.core.notification.notifier import Notifier
  54. from taipy.core.submission.submission_status import SubmissionStatus
  55. from taipy.gui import Gui, State
  56. from taipy.gui._warnings import _warn
  57. from taipy.gui.gui import _DoNotUpdate
  58. from ._adapters import (
  59. _EntityType,
  60. _get_datanode_property,
  61. _get_entity_property,
  62. _GuiCoreDatanodeAdapter,
  63. _GuiCoreDatanodeProperties,
  64. _GuiCoreScenarioProperties,
  65. _invoke_action,
  66. )
  67. class _GuiCoreContext(CoreEventConsumerBase):
  68. __PROP_ENTITY_ID = "id"
  69. __PROP_ENTITY_COMMENT = "comment"
  70. __PROP_CONFIG_ID = "config"
  71. __PROP_DATE = "date"
  72. __PROP_ENTITY_NAME = "name"
  73. __PROP_SCENARIO_PRIMARY = "primary"
  74. __PROP_SCENARIO_TAGS = "tags"
  75. __ENTITY_PROPS = (__PROP_CONFIG_ID, __PROP_DATE, __PROP_ENTITY_NAME)
  76. __ACTION = "action"
  77. _CORE_CHANGED_NAME = "core_changed"
  78. def __init__(self, gui: Gui) -> None:
  79. self.gui = gui
  80. self.scenario_by_cycle: t.Optional[t.Dict[t.Optional[Cycle], t.List[Scenario]]] = None
  81. self.data_nodes_by_owner: t.Optional[t.Dict[t.Optional[str], t.List[DataNode]]] = None
  82. self.scenario_configs: t.Optional[t.List[t.Tuple[str, str]]] = None
  83. self.jobs_list: t.Optional[t.List[Job]] = None
  84. self.client_submission: t.Dict[str, SubmissionStatus] = {}
  85. # register to taipy core notification
  86. reg_id, reg_queue = Notifier.register()
  87. # locks
  88. self.lock = Lock()
  89. self.submissions_lock = Lock()
  90. # super
  91. super().__init__(reg_id, reg_queue)
  92. self.start()
  93. def process_event(self, event: Event):
  94. if event.entity_type == EventEntityType.SCENARIO:
  95. with self.gui._get_autorization(system=True):
  96. self.scenario_refresh(
  97. event.entity_id
  98. if event.operation != EventOperation.DELETION and is_readable(t.cast(ScenarioId, event.entity_id))
  99. else None
  100. )
  101. elif event.entity_type == EventEntityType.SEQUENCE and event.entity_id:
  102. sequence = None
  103. try:
  104. with self.gui._get_autorization(system=True):
  105. sequence = (
  106. core_get(event.entity_id)
  107. if event.operation != EventOperation.DELETION
  108. and is_readable(t.cast(SequenceId, event.entity_id))
  109. else None
  110. )
  111. if sequence and hasattr(sequence, "parent_ids") and sequence.parent_ids: # type: ignore
  112. self.gui._broadcast(
  113. _GuiCoreContext._CORE_CHANGED_NAME,
  114. {"scenario": list(sequence.parent_ids)}, # type: ignore
  115. )
  116. except Exception as e:
  117. _warn(f"Access to sequence {event.entity_id} failed", e)
  118. elif event.entity_type == EventEntityType.JOB:
  119. with self.lock:
  120. self.jobs_list = None
  121. elif event.entity_type == EventEntityType.SUBMISSION:
  122. self.submission_status_callback(event.entity_id, event)
  123. elif event.entity_type == EventEntityType.DATA_NODE:
  124. with self.lock:
  125. self.data_nodes_by_owner = None
  126. self.gui._broadcast(
  127. _GuiCoreContext._CORE_CHANGED_NAME,
  128. {"datanode": event.entity_id if event.operation != EventOperation.DELETION else True},
  129. )
  130. def scenario_refresh(self, scenario_id: t.Optional[str]):
  131. with self.lock:
  132. self.scenario_by_cycle = None
  133. self.data_nodes_by_owner = None
  134. self.gui._broadcast(
  135. _GuiCoreContext._CORE_CHANGED_NAME,
  136. {"scenario": scenario_id or True},
  137. )
  138. def submission_status_callback(self, submission_id: t.Optional[str] = None, event: t.Optional[Event] = None):
  139. if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
  140. return
  141. try:
  142. last_status = self.client_submission.get(submission_id)
  143. if not last_status:
  144. return
  145. submission = t.cast(Submission, core_get(submission_id))
  146. if not submission or not submission.entity_id:
  147. return
  148. new_status = submission.submission_status
  149. client_id = submission.properties.get("client_id")
  150. if client_id:
  151. running_tasks = {}
  152. with self.gui._get_autorization(client_id):
  153. for job in submission.jobs:
  154. job = job if isinstance(job, Job) else core_get(job)
  155. running_tasks[job.task.id] = (
  156. SubmissionStatus.RUNNING.value
  157. if job.is_running()
  158. else SubmissionStatus.PENDING.value
  159. if job.is_pending()
  160. else None
  161. )
  162. self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, {"tasks": running_tasks}, client_id)
  163. if last_status != new_status:
  164. # callback
  165. submission_name = submission.properties.get("on_submission")
  166. if submission_name:
  167. self.gui._call_user_callback(
  168. client_id,
  169. submission_name,
  170. [
  171. core_get(submission.id),
  172. {
  173. "submission_status": new_status.name,
  174. "submittable_entity": core_get(submission.entity_id),
  175. **(event.metadata if event else {}),
  176. },
  177. ],
  178. submission.properties.get("module_context"),
  179. )
  180. with self.submissions_lock:
  181. if new_status in (
  182. SubmissionStatus.COMPLETED,
  183. SubmissionStatus.FAILED,
  184. SubmissionStatus.CANCELED,
  185. ):
  186. self.client_submission.pop(submission_id, None)
  187. else:
  188. self.client_submission[submission_id] = new_status
  189. except Exception as e:
  190. _warn(f"Submission ({submission_id}) is not available", e)
  191. finally:
  192. self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, {"jobs": True})
  193. def no_change_adapter(self, entity: t.List):
  194. return entity
  195. def cycle_adapter(self, cycle: Cycle, sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None):
  196. try:
  197. if (
  198. isinstance(cycle, Cycle)
  199. and is_readable(cycle.id)
  200. and core_get(cycle.id) is not None
  201. and self.scenario_by_cycle
  202. ):
  203. return [
  204. cycle.id,
  205. cycle.get_simple_label(),
  206. self.get_sorted_scenario_list(self.scenario_by_cycle.get(cycle, []), sorts),
  207. _EntityType.CYCLE.value,
  208. False,
  209. ]
  210. except Exception as e:
  211. _warn(
  212. f"Access to {type(cycle).__name__} " + f"({cycle.id if hasattr(cycle, 'id') else 'No_id'})" + " failed",
  213. e,
  214. )
  215. return None
  216. def scenario_adapter(self, scenario: Scenario):
  217. if isinstance(scenario, (tuple, list)):
  218. return scenario
  219. try:
  220. if isinstance(scenario, Scenario) and is_readable(scenario.id) and core_get(scenario.id) is not None:
  221. return [
  222. scenario.id,
  223. scenario.get_simple_label(),
  224. None,
  225. _EntityType.SCENARIO.value,
  226. scenario.is_primary,
  227. ]
  228. except Exception as e:
  229. _warn(
  230. f"Access to {type(scenario).__name__} "
  231. + f"({scenario.id if hasattr(scenario, 'id') else 'No_id'})"
  232. + " failed",
  233. e,
  234. )
  235. return None
  236. def filter_entities(
  237. self, cycle_scenario: t.List, col: str, col_type: str, is_dn: bool, action: str, val: t.Any, col_fn=None
  238. ):
  239. cycle_scenario[2] = [
  240. e for e in cycle_scenario[2] if _invoke_action(e, col, col_type, is_dn, action, val, col_fn)
  241. ]
  242. return cycle_scenario
  243. def adapt_scenarios(self, cycle: t.List):
  244. cycle[2] = [self.scenario_adapter(e) for e in cycle[2]]
  245. return cycle
  246. def get_sorted_scenario_list(
  247. self,
  248. entities: t.Union[t.List[t.Union[Cycle, Scenario]], t.List[Scenario]],
  249. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  250. ):
  251. if sorts:
  252. sorted_list = entities
  253. for sd in reversed(sorts):
  254. col = sd.get("col", "")
  255. col = _GuiCoreScenarioProperties.get_col_name(col)
  256. order = sd.get("order", True)
  257. sorted_list = sorted(sorted_list, key=_get_entity_property(col, Scenario), reverse=not order)
  258. else:
  259. sorted_list = sorted(entities, key=_get_entity_property("creation_date", Scenario))
  260. return [self.cycle_adapter(e, sorts) if isinstance(e, Cycle) else e for e in sorted_list]
  261. def get_filtered_scenario_list(
  262. self,
  263. entities: t.List[t.Union[t.List, Scenario]],
  264. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  265. ):
  266. if not filters:
  267. return entities
  268. # filtering
  269. filtered_list = list(entities)
  270. for fd in filters:
  271. col = fd.get("col", "")
  272. is_datanode_prop = _get_datanode_property(col) is not None
  273. col_type = _GuiCoreScenarioProperties.get_type(col)
  274. col = _GuiCoreScenarioProperties.get_col_name(col)
  275. col_fn = cp[0] if (cp := col.split("(")) and len(cp) > 1 else None
  276. val = fd.get("value")
  277. action = fd.get("action", "")
  278. # level 1 filtering
  279. filtered_list = [
  280. e
  281. for e in filtered_list
  282. if not isinstance(e, Scenario)
  283. or _invoke_action(e, col, col_type, is_datanode_prop, action, val, col_fn)
  284. ]
  285. # level 2 filtering
  286. filtered_list = [
  287. e
  288. if isinstance(e, Scenario)
  289. else self.filter_entities(e, col, col_type, is_datanode_prop, action, val, col_fn)
  290. for e in filtered_list
  291. ]
  292. # remove empty cycles
  293. return [e for e in filtered_list if isinstance(e, Scenario) or (isinstance(e, (tuple, list)) and len(e[2]))]
  294. def get_scenarios(
  295. self,
  296. scenarios: t.Optional[t.List[t.Union[Cycle, Scenario]]],
  297. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  298. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  299. ):
  300. cycles_scenarios: t.List[t.Union[Cycle, Scenario]] = []
  301. with self.lock:
  302. # always needed to get scenarios for a cycle in cycle_adapter
  303. if self.scenario_by_cycle is None:
  304. self.scenario_by_cycle = get_cycles_scenarios()
  305. if scenarios is None:
  306. for cycle, c_scenarios in self.scenario_by_cycle.items():
  307. if cycle is None:
  308. cycles_scenarios.extend(c_scenarios)
  309. else:
  310. cycles_scenarios.append(cycle)
  311. if scenarios is not None:
  312. cycles_scenarios = scenarios
  313. adapted_list = self.get_sorted_scenario_list(cycles_scenarios, sorts)
  314. adapted_list = self.get_filtered_scenario_list(adapted_list, filters)
  315. return adapted_list
  316. def select_scenario(self, state: State, id: str, payload: t.Dict[str, str]):
  317. args = payload.get("args")
  318. if args is None or not isinstance(args, list) or len(args) < 2:
  319. return
  320. state.assign(args[0], args[1])
  321. def get_scenario_by_id(self, id: str) -> t.Optional[Scenario]:
  322. if not id or not is_readable(t.cast(ScenarioId, id)):
  323. return None
  324. try:
  325. return core_get(t.cast(ScenarioId, id))
  326. except Exception:
  327. return None
  328. def get_scenario_configs(self):
  329. with self.lock:
  330. if self.scenario_configs is None:
  331. configs = Config.scenarios
  332. if isinstance(configs, dict):
  333. self.scenario_configs = [(id, f"{c.id}") for id, c in configs.items() if id != "default"]
  334. return self.scenario_configs
  335. def crud_scenario(self, state: State, id: str, payload: t.Dict[str, str]): # noqa: C901
  336. args = payload.get("args")
  337. start_idx = 2
  338. if (
  339. args is None
  340. or not isinstance(args, list)
  341. or len(args) < start_idx + 3
  342. or not isinstance(args[start_idx], bool)
  343. or not isinstance(args[start_idx + 1], bool)
  344. or not isinstance(args[start_idx + 2], dict)
  345. ):
  346. return
  347. error_var = payload.get("error_id")
  348. update = args[start_idx]
  349. delete = args[start_idx + 1]
  350. data = args[start_idx + 2]
  351. with_dialog = True if len(args) < start_idx + 4 else bool(args[start_idx + 3])
  352. scenario = None
  353. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  354. if update:
  355. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  356. if delete:
  357. if not is_deletable(scenario_id):
  358. state.assign(error_var, f"Scenario. {scenario_id} is not deletable.")
  359. return
  360. try:
  361. core_delete(scenario_id)
  362. except Exception as e:
  363. state.assign(error_var, f"Error deleting Scenario. {e}")
  364. else:
  365. if not self.__check_readable_editable(state, scenario_id, "Scenario", error_var):
  366. return
  367. scenario = core_get(scenario_id)
  368. else:
  369. if with_dialog:
  370. config_id = data.get(_GuiCoreContext.__PROP_CONFIG_ID)
  371. scenario_config = Config.scenarios.get(config_id)
  372. if with_dialog and scenario_config is None:
  373. state.assign(error_var, f"Invalid configuration id ({config_id})")
  374. return
  375. date_str = data.get(_GuiCoreContext.__PROP_DATE)
  376. try:
  377. date = parser.parse(date_str) if isinstance(date_str, str) else None
  378. except Exception as e:
  379. state.assign(error_var, f"Invalid date ({date_str}).{e}")
  380. return
  381. else:
  382. scenario_config = None
  383. date = None
  384. scenario_id = None
  385. try:
  386. gui = state.get_gui()
  387. on_creation = args[0] if isinstance(args[0], str) else None
  388. on_creation_function = gui._get_user_function(on_creation) if on_creation else None
  389. if callable(on_creation_function):
  390. try:
  391. res = gui._call_function_with_state(
  392. on_creation_function,
  393. [
  394. id,
  395. {
  396. "action": on_creation,
  397. "config": scenario_config,
  398. "date": date,
  399. "label": name,
  400. "properties": {v.get("key"): v.get("value") for v in data.get("properties", {})},
  401. },
  402. ],
  403. )
  404. if isinstance(res, Scenario):
  405. # everything's fine
  406. scenario_id = res.id
  407. state.assign(error_var, "")
  408. return
  409. if res:
  410. # do not create
  411. state.assign(error_var, f"{res}")
  412. return
  413. except Exception as e: # pragma: no cover
  414. if not gui._call_on_exception(on_creation, e):
  415. _warn(f"on_creation(): Exception raised in '{on_creation}()'", e)
  416. state.assign(
  417. error_var,
  418. f"Error creating Scenario with '{on_creation}()'. {e}",
  419. )
  420. return
  421. elif on_creation is not None:
  422. _warn(f"on_creation(): '{on_creation}' is not a function.")
  423. elif not with_dialog:
  424. if len(Config.scenarios) == 2:
  425. scenario_config = next(sc for k, sc in Config.scenarios.items() if k != "default")
  426. else:
  427. state.assign(
  428. error_var,
  429. "Error creating Scenario: only one scenario config needed "
  430. + f"({len(Config.scenarios) - 1}) found.",
  431. )
  432. return
  433. scenario = create_scenario(scenario_config, date, name)
  434. scenario_id = scenario.id
  435. except Exception as e:
  436. state.assign(error_var, f"Error creating Scenario. {e}")
  437. finally:
  438. self.scenario_refresh(scenario_id)
  439. if scenario and (sel_scenario_var := args[1] if isinstance(args[1], str) else None):
  440. try:
  441. var_name, _ = gui._get_real_var_name(sel_scenario_var)
  442. state.assign(var_name, scenario)
  443. except Exception as e: # pragma: no cover
  444. _warn("Can't find value variable name in context", e)
  445. if scenario:
  446. if not is_editable(scenario):
  447. state.assign(error_var, f"Scenario {scenario_id or name} is not editable.")
  448. return
  449. with scenario as sc:
  450. sc.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  451. if props := data.get("properties"):
  452. try:
  453. new_keys = [prop.get("key") for prop in props]
  454. for key in t.cast(dict, sc.properties).keys():
  455. if key and key not in _GuiCoreContext.__ENTITY_PROPS and key not in new_keys:
  456. t.cast(dict, sc.properties).pop(key, None)
  457. for prop in props:
  458. key = prop.get("key")
  459. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  460. sc._properties[key] = prop.get("value")
  461. state.assign(error_var, "")
  462. except Exception as e:
  463. state.assign(error_var, f"Error creating Scenario. {e}")
  464. @staticmethod
  465. def __assign_var(state: State, var_name: t.Optional[str], msg: str):
  466. if var_name:
  467. state.assign(var_name, msg)
  468. def edit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  469. args = payload.get("args")
  470. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  471. return
  472. error_var = payload.get("error_id")
  473. data = args[0]
  474. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  475. sequence = data.get("sequence")
  476. if not self.__check_readable_editable(state, entity_id, "Scenario", error_var):
  477. return
  478. scenario: Scenario = core_get(entity_id)
  479. if scenario:
  480. try:
  481. if not sequence:
  482. if isinstance(sequence, str) and (name := data.get(_GuiCoreContext.__PROP_ENTITY_NAME)):
  483. scenario.add_sequence(name, data.get("task_ids"))
  484. else:
  485. primary = data.get(_GuiCoreContext.__PROP_SCENARIO_PRIMARY)
  486. if primary is True:
  487. if not is_promotable(scenario):
  488. _GuiCoreContext.__assign_var(
  489. state, error_var, f"Scenario {entity_id} is not promotable."
  490. )
  491. return
  492. set_primary(scenario)
  493. self.__edit_properties(scenario, data)
  494. else:
  495. if data.get("del", False):
  496. scenario.remove_sequence(sequence)
  497. else:
  498. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  499. if sequence != name:
  500. scenario.rename_sequence(sequence, name)
  501. if seqEntity := scenario.sequences.get(name):
  502. seqEntity.tasks = data.get("task_ids")
  503. self.__edit_properties(seqEntity, data)
  504. else:
  505. _GuiCoreContext.__assign_var(
  506. state,
  507. error_var,
  508. f"Sequence {name} is not available in Scenario {entity_id}.",
  509. )
  510. return
  511. _GuiCoreContext.__assign_var(state, error_var, "")
  512. except Exception as e:
  513. _GuiCoreContext.__assign_var(state, error_var, f"Error updating {type(scenario).__name__}. {e}")
  514. def submit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  515. args = payload.get("args")
  516. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  517. return
  518. data = args[0]
  519. error_var = payload.get("error_id")
  520. try:
  521. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  522. entity = core_get(scenario_id)
  523. if sequence := data.get("sequence"):
  524. entity = entity.sequences.get(sequence)
  525. if not (reason := is_submittable(entity)):
  526. _GuiCoreContext.__assign_var(
  527. state,
  528. error_var,
  529. f"{'Sequence' if sequence else 'Scenario'} {sequence or scenario_id} is not submittable: "
  530. + reason.reasons,
  531. )
  532. return
  533. if entity:
  534. on_submission = data.get("on_submission_change")
  535. submission_entity = core_submit(
  536. entity,
  537. on_submission=on_submission,
  538. client_id=self.gui._get_client_id(),
  539. module_context=self.gui._get_locals_context(),
  540. )
  541. if on_submission:
  542. with self.submissions_lock:
  543. self.client_submission[submission_entity.id] = submission_entity.submission_status
  544. if Config.core.mode == "development":
  545. with self.submissions_lock:
  546. self.client_submission[submission_entity.id] = SubmissionStatus.SUBMITTED
  547. self.submission_status_callback(submission_entity.id)
  548. _GuiCoreContext.__assign_var(state, error_var, "")
  549. except Exception as e:
  550. _GuiCoreContext.__assign_var(state, error_var, f"Error submitting entity. {e}")
  551. def get_filtered_datanode_list(
  552. self,
  553. entities: t.List[t.Union[t.List, DataNode]],
  554. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  555. ):
  556. if not filters or not entities:
  557. return entities
  558. # filtering
  559. filtered_list = list(entities)
  560. for fd in filters:
  561. col = fd.get("col", "")
  562. col_type = _GuiCoreDatanodeProperties.get_type(col)
  563. col = _GuiCoreDatanodeProperties.get_col_name(col)
  564. col_fn = cp[0] if (cp := col.split("(")) and len(cp) > 1 else None
  565. val = fd.get("value")
  566. action = fd.get("action", "")
  567. if isinstance(val, str) and col_type == "date":
  568. val = datetime.fromisoformat(val[:-1])
  569. # level 1 filtering
  570. filtered_list = [
  571. e
  572. for e in filtered_list
  573. if not isinstance(e, DataNode) or _invoke_action(e, col, col_type, False, action, val, col_fn)
  574. ]
  575. # level 3 filtering
  576. filtered_list = [
  577. e if isinstance(e, DataNode) else self.filter_entities(d, col, col_type, False, action, val, col_fn)
  578. for e in filtered_list
  579. for d in e[2]
  580. ]
  581. # remove empty cycles
  582. return [e for e in filtered_list if isinstance(e, DataNode) or (isinstance(e, (tuple, list)) and len(e[2]))]
  583. def get_sorted_datanode_list(
  584. self,
  585. entities: t.Union[
  586. t.List[t.Union[Cycle, Scenario, DataNode]], t.List[t.Union[Scenario, DataNode]], t.List[DataNode]
  587. ],
  588. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  589. adapt_dn=False,
  590. ):
  591. if not entities:
  592. return entities
  593. if sorts:
  594. sorted_list = entities
  595. for sd in reversed(sorts):
  596. col = sd.get("col", "")
  597. col = _GuiCoreDatanodeProperties.get_col_name(col)
  598. order = sd.get("order", True)
  599. sorted_list = sorted(sorted_list, key=_get_entity_property(col, DataNode), reverse=not order)
  600. else:
  601. sorted_list = entities
  602. return [self.data_node_adapter(e, sorts, adapt_dn) for e in sorted_list]
  603. def __do_datanodes_tree(self):
  604. if self.data_nodes_by_owner is None:
  605. self.data_nodes_by_owner = defaultdict(list)
  606. for dn in get_data_nodes():
  607. self.data_nodes_by_owner[dn.owner_id].append(dn)
  608. def get_datanodes_tree(
  609. self,
  610. scenarios: t.Optional[t.Union[Scenario, t.List[Scenario]]],
  611. datanodes: t.Optional[t.List[DataNode]],
  612. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  613. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  614. ):
  615. base_list = []
  616. with self.lock:
  617. self.__do_datanodes_tree()
  618. if datanodes is None:
  619. if scenarios is None:
  620. base_list = (self.data_nodes_by_owner or {}).get(None, []) + (
  621. self.get_scenarios(None, None, None) or []
  622. )
  623. else:
  624. if isinstance(scenarios, (list, tuple)) and len(scenarios) > 1:
  625. base_list = scenarios
  626. else:
  627. if self.data_nodes_by_owner:
  628. owners = scenarios if isinstance(scenarios, (list, tuple)) else [scenarios]
  629. base_list = [d for owner in owners for d in (self.data_nodes_by_owner).get(owner.id, [])]
  630. else:
  631. base_list = []
  632. else:
  633. base_list = datanodes
  634. adapted_list = self.get_sorted_datanode_list(base_list, sorts)
  635. return self.get_filtered_datanode_list(adapted_list, filters)
  636. def data_node_adapter(
  637. self,
  638. data: t.Union[Cycle, Scenario, Sequence, DataNode],
  639. sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None,
  640. adapt_dn=True,
  641. ):
  642. if isinstance(data, tuple):
  643. raise NotImplementedError
  644. if isinstance(data, list):
  645. if data[2] and isinstance(data[2][0], (Cycle, Scenario, Sequence, DataNode)):
  646. data[2] = self.get_sorted_datanode_list(data[2], sorts, adapt_dn)
  647. return data
  648. try:
  649. if hasattr(data, "id") and is_readable(data.id) and core_get(data.id) is not None:
  650. if isinstance(data, DataNode):
  651. return (
  652. [data.id, data.get_simple_label(), None, _EntityType.DATANODE.value, False]
  653. if adapt_dn
  654. else data
  655. )
  656. with self.lock:
  657. self.__do_datanodes_tree()
  658. if self.data_nodes_by_owner:
  659. if isinstance(data, Cycle):
  660. return [
  661. data.id,
  662. data.get_simple_label(),
  663. self.get_sorted_datanode_list(
  664. self.data_nodes_by_owner.get(data.id, [])
  665. + (self.scenario_by_cycle or {}).get(data, []),
  666. sorts,
  667. adapt_dn,
  668. ),
  669. _EntityType.CYCLE.value,
  670. False,
  671. ]
  672. elif isinstance(data, Scenario):
  673. return [
  674. data.id,
  675. data.get_simple_label(),
  676. self.get_sorted_datanode_list(
  677. self.data_nodes_by_owner.get(data.id, []) + list(data.sequences.values()),
  678. sorts,
  679. adapt_dn,
  680. ),
  681. _EntityType.SCENARIO.value,
  682. data.is_primary,
  683. ]
  684. elif isinstance(data, Sequence):
  685. if datanodes := self.data_nodes_by_owner.get(data.id):
  686. return [
  687. data.id,
  688. data.get_simple_label(),
  689. self.get_sorted_datanode_list(datanodes, sorts, adapt_dn),
  690. _EntityType.SEQUENCE.value,
  691. ]
  692. except Exception as e:
  693. _warn(
  694. f"Access to {type(data)} ({data.id if hasattr(data, 'id') else 'No_id'}) failed",
  695. e,
  696. )
  697. return None
  698. def get_jobs_list(self):
  699. with self.lock:
  700. if self.jobs_list is None:
  701. self.jobs_list = get_jobs()
  702. return self.jobs_list
  703. def job_adapter(self, job):
  704. try:
  705. if hasattr(job, "id") and is_readable(job.id) and core_get(job.id) is not None:
  706. if isinstance(job, Job):
  707. entity = core_get(job.owner_id)
  708. return (
  709. job.id,
  710. job.get_simple_label(),
  711. [],
  712. entity.get_simple_label() if entity else "",
  713. entity.id if entity else "",
  714. job.submit_id,
  715. job.creation_date,
  716. job.status.value,
  717. is_deletable(job),
  718. is_readable(job),
  719. is_editable(job),
  720. )
  721. except Exception as e:
  722. _warn(f"Access to job ({job.id if hasattr(job, 'id') else 'No_id'}) failed", e)
  723. return None
  724. def act_on_jobs(self, state: State, id: str, payload: t.Dict[str, str]):
  725. args = payload.get("args")
  726. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  727. return
  728. data = args[0]
  729. job_ids = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  730. job_action = data.get(_GuiCoreContext.__ACTION)
  731. if job_action and isinstance(job_ids, list):
  732. errs = []
  733. if job_action == "delete":
  734. for job_id in job_ids:
  735. if not is_readable(job_id):
  736. errs.append(f"Job {job_id} is not readable.")
  737. continue
  738. if not is_deletable(job_id):
  739. errs.append(f"Job {job_id} is not deletable.")
  740. continue
  741. try:
  742. delete_job(core_get(job_id))
  743. except Exception as e:
  744. errs.append(f"Error deleting job. {e}")
  745. elif job_action == "cancel":
  746. for job_id in job_ids:
  747. if not is_readable(job_id):
  748. errs.append(f"Job {job_id} is not readable.")
  749. continue
  750. if not is_editable(job_id):
  751. errs.append(f"Job {job_id} is not cancelable.")
  752. continue
  753. try:
  754. cancel_job(job_id)
  755. except Exception as e:
  756. errs.append(f"Error canceling job. {e}")
  757. _GuiCoreContext.__assign_var(state, payload.get("error_id"), "<br/>".join(errs) if errs else "")
  758. def edit_data_node(self, state: State, id: str, payload: t.Dict[str, str]):
  759. args = payload.get("args")
  760. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  761. return
  762. error_var = payload.get("error_id")
  763. data = args[0]
  764. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  765. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  766. return
  767. entity: DataNode = core_get(entity_id)
  768. if isinstance(entity, DataNode):
  769. try:
  770. self.__edit_properties(entity, data)
  771. _GuiCoreContext.__assign_var(state, error_var, "")
  772. except Exception as e:
  773. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode. {e}")
  774. def lock_datanode_for_edit(self, state: State, id: str, payload: t.Dict[str, str]):
  775. args = payload.get("args")
  776. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  777. return
  778. data = args[0]
  779. error_var = payload.get("error_id")
  780. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  781. if not self.__check_readable_editable(state, entity_id, "Datanode", error_var):
  782. return
  783. lock = data.get("lock", True)
  784. entity: DataNode = core_get(entity_id)
  785. if isinstance(entity, DataNode):
  786. try:
  787. if lock:
  788. entity.lock_edit(self.gui._get_client_id())
  789. else:
  790. entity.unlock_edit(self.gui._get_client_id())
  791. _GuiCoreContext.__assign_var(state, error_var, "")
  792. except Exception as e:
  793. _GuiCoreContext.__assign_var(state, error_var, f"Error locking Datanode. {e}")
  794. def __edit_properties(self, entity: t.Union[Scenario, Sequence, DataNode], data: t.Dict[str, str]):
  795. with entity as ent:
  796. if isinstance(ent, Scenario):
  797. tags = data.get(_GuiCoreContext.__PROP_SCENARIO_TAGS)
  798. if isinstance(tags, (list, tuple)):
  799. ent.tags = dict(tags)
  800. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  801. if isinstance(name, str):
  802. if hasattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME):
  803. setattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME, name)
  804. else:
  805. ent.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  806. props = data.get("properties")
  807. if isinstance(props, (list, tuple)):
  808. for prop in props:
  809. key = prop.get("key")
  810. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  811. ent.properties[key] = prop.get("value")
  812. deleted_props = data.get("deleted_properties")
  813. if isinstance(deleted_props, (list, tuple)):
  814. for prop in deleted_props:
  815. key = prop.get("key")
  816. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  817. ent.properties.pop(key, None)
  818. def get_scenarios_for_owner(self, owner_id: str):
  819. cycles_scenarios: t.List[t.Union[Scenario, Cycle]] = []
  820. with self.lock:
  821. if self.scenario_by_cycle is None:
  822. self.scenario_by_cycle = get_cycles_scenarios()
  823. if owner_id:
  824. if owner_id == "GLOBAL":
  825. for cycle, scenarios in self.scenario_by_cycle.items():
  826. if cycle is None:
  827. cycles_scenarios.extend(scenarios)
  828. else:
  829. cycles_scenarios.append(cycle)
  830. elif is_readable(t.cast(ScenarioId, owner_id)):
  831. entity = core_get(owner_id)
  832. if entity and (scenarios_cycle := self.scenario_by_cycle.get(t.cast(Cycle, entity))):
  833. cycles_scenarios.extend(scenarios_cycle)
  834. elif isinstance(entity, Scenario):
  835. cycles_scenarios.append(entity)
  836. return sorted(cycles_scenarios, key=_get_entity_property("creation_date", Scenario))
  837. def get_data_node_history(self, id: str):
  838. if id and (dn := core_get(id)) and isinstance(dn, DataNode):
  839. res = []
  840. for e in dn.edits:
  841. job_id = e.get("job_id")
  842. job: t.Optional[Job] = None
  843. if job_id:
  844. if not is_readable(job_id):
  845. job_id += " not readable"
  846. else:
  847. job = core_get(job_id)
  848. res.append(
  849. (
  850. e.get("timestamp"),
  851. job_id if job_id else e.get("writer_identifier", ""),
  852. f"Execution of task {job.task.get_simple_label()}."
  853. if job and job.task
  854. else e.get("comment", ""),
  855. )
  856. )
  857. return sorted(res, key=lambda r: r[0], reverse=True)
  858. return _DoNotUpdate()
  859. def __check_readable_editable(self, state: State, id: str, ent_type: str, var: t.Optional[str]):
  860. if not is_readable(t.cast(ScenarioId, id)):
  861. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not readable.")
  862. return False
  863. if not is_editable(t.cast(ScenarioId, id)):
  864. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not editable.")
  865. return False
  866. return True
  867. def update_data(self, state: State, id: str, payload: t.Dict[str, str]):
  868. args = payload.get("args")
  869. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  870. return
  871. data = args[0]
  872. error_var = payload.get("error_id")
  873. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  874. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  875. return
  876. entity: DataNode = core_get(entity_id)
  877. if isinstance(entity, DataNode):
  878. try:
  879. entity.write(
  880. parser.parse(data.get("value"))
  881. if data.get("type") == "date"
  882. else int(data.get("value"))
  883. if data.get("type") == "int"
  884. else float(data.get("value"))
  885. if data.get("type") == "float"
  886. else data.get("value"),
  887. comment=data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT),
  888. )
  889. entity.unlock_edit(self.gui._get_client_id())
  890. _GuiCoreContext.__assign_var(state, error_var, "")
  891. except Exception as e:
  892. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode value. {e}")
  893. _GuiCoreContext.__assign_var(state, payload.get("data_id"), entity_id) # this will update the data value
  894. def tabular_data_edit(self, state: State, var_name: str, payload: dict):
  895. error_var = payload.get("error_id")
  896. user_data = payload.get("user_data", {})
  897. dn_id = user_data.get("dn_id")
  898. if not self.__check_readable_editable(state, dn_id, "DataNode", error_var):
  899. return
  900. datanode = core_get(dn_id) if dn_id else None
  901. if isinstance(datanode, DataNode):
  902. try:
  903. idx = t.cast(int, payload.get("index"))
  904. col = t.cast(str, payload.get("col"))
  905. tz = payload.get("tz")
  906. val = (
  907. parser.parse(str(payload.get("value"))).astimezone(zoneinfo.ZoneInfo(tz)).replace(tzinfo=None)
  908. if tz is not None
  909. else payload.get("value")
  910. )
  911. # user_value = payload.get("user_value")
  912. data = self.__read_tabular_data(datanode)
  913. new_data: t.Any = None
  914. if isinstance(data, (pd.DataFrame, pd.Series)):
  915. if isinstance(data, pd.DataFrame):
  916. data.at[idx, col] = val
  917. elif isinstance(data, pd.Series):
  918. data.at[idx] = val
  919. new_data = data
  920. else:
  921. data_tuple = False
  922. if isinstance(data, tuple):
  923. data_tuple = True
  924. data = list(data)
  925. if isinstance(data, list):
  926. row = data[idx]
  927. row_tuple = False
  928. if isinstance(row, tuple):
  929. row = list(row)
  930. row_tuple = True
  931. if isinstance(row, list):
  932. row[int(col)] = val
  933. if row_tuple:
  934. data[idx] = tuple(row)
  935. new_data = data
  936. elif col == "0" and (isinstance(row, (str, Number)) or "date" in type(row).__name__):
  937. data[idx] = val
  938. new_data = data
  939. else:
  940. _GuiCoreContext.__assign_var(
  941. state,
  942. error_var,
  943. "Error updating Datanode: cannot handle multi-column list value.",
  944. )
  945. if data_tuple and new_data is not None:
  946. new_data = tuple(new_data)
  947. else:
  948. _GuiCoreContext.__assign_var(
  949. state,
  950. error_var,
  951. "Error updating Datanode tabular value: type does not support at[] indexer.",
  952. )
  953. if new_data is not None:
  954. datanode.write(new_data, comment=user_data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT))
  955. _GuiCoreContext.__assign_var(state, error_var, "")
  956. except Exception as e:
  957. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode tabular value. {e}")
  958. _GuiCoreContext.__assign_var(state, payload.get("data_id"), dn_id)
  959. def get_data_node_properties(self, id: str):
  960. if id and is_readable(t.cast(DataNodeId, id)) and (dn := core_get(id)) and isinstance(dn, DataNode):
  961. try:
  962. return (
  963. (
  964. (k, f"{v}")
  965. for k, v in dn._get_user_properties().items()
  966. if k != _GuiCoreContext.__PROP_ENTITY_NAME
  967. ),
  968. )
  969. except Exception:
  970. return None
  971. return None
  972. def __read_tabular_data(self, datanode: DataNode):
  973. return datanode.read()
  974. def get_data_node_tabular_data(self, id: str):
  975. if (
  976. id
  977. and is_readable(t.cast(DataNodeId, id))
  978. and (dn := core_get(id))
  979. and isinstance(dn, DataNode)
  980. and dn.is_ready_for_reading
  981. ):
  982. try:
  983. value = self.__read_tabular_data(dn)
  984. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  985. return value
  986. except Exception:
  987. return None
  988. return None
  989. def get_data_node_tabular_columns(self, id: str):
  990. if (
  991. id
  992. and is_readable(t.cast(DataNodeId, id))
  993. and (dn := core_get(id))
  994. and isinstance(dn, DataNode)
  995. and dn.is_ready_for_reading
  996. ):
  997. try:
  998. value = self.__read_tabular_data(dn)
  999. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  1000. return self.gui._tbl_cols(
  1001. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=value
  1002. )
  1003. except Exception:
  1004. return None
  1005. return None
  1006. def get_data_node_chart_config(self, id: str):
  1007. if (
  1008. id
  1009. and is_readable(t.cast(DataNodeId, id))
  1010. and (dn := core_get(id))
  1011. and isinstance(dn, DataNode)
  1012. and dn.is_ready_for_reading
  1013. ):
  1014. try:
  1015. return self.gui._chart_conf(
  1016. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=self.__read_tabular_data(dn)
  1017. )
  1018. except Exception:
  1019. return None
  1020. return None
  1021. def on_dag_select(self, state: State, id: str, payload: t.Dict[str, str]):
  1022. args = payload.get("args")
  1023. if args is None or not isinstance(args, list) or len(args) < 2:
  1024. return
  1025. on_action_function = self.gui._get_user_function(args[1]) if args[1] else None
  1026. if callable(on_action_function):
  1027. try:
  1028. entity = core_get(args[0]) if is_readable(args[0]) else f"unredable({args[0]})"
  1029. self.gui._call_function_with_state(
  1030. on_action_function,
  1031. [entity],
  1032. )
  1033. except Exception as e:
  1034. if not self.gui._call_on_exception(args[1], e):
  1035. _warn(f"dag.on_action(): Exception raised in '{args[1]}()' with '{args[0]}'", e)
  1036. elif args[1]:
  1037. _warn(f"dag.on_action(): Invalid function '{args[1]}()'.")