_context.py 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import json
  12. import typing as t
  13. from collections import defaultdict
  14. from datetime import date, datetime
  15. from numbers import Number
  16. from operator import attrgetter
  17. from threading import Lock
  18. try:
  19. import zoneinfo
  20. except ImportError:
  21. from backports import zoneinfo # type: ignore[no-redef]
  22. import pandas as pd
  23. from dateutil import parser
  24. from taipy.config import Config
  25. from taipy.core import (
  26. Cycle,
  27. DataNode,
  28. DataNodeId,
  29. Job,
  30. Scenario,
  31. ScenarioId,
  32. Sequence,
  33. SequenceId,
  34. Submission,
  35. SubmissionId,
  36. cancel_job,
  37. create_scenario,
  38. delete_job,
  39. get_cycles_scenarios,
  40. get_data_nodes,
  41. get_jobs,
  42. is_deletable,
  43. is_editable,
  44. is_promotable,
  45. is_readable,
  46. is_submittable,
  47. set_primary,
  48. )
  49. from taipy.core import delete as core_delete
  50. from taipy.core import get as core_get
  51. from taipy.core import submit as core_submit
  52. from taipy.core.notification import CoreEventConsumerBase, EventEntityType
  53. from taipy.core.notification.event import Event, EventOperation
  54. from taipy.core.notification.notifier import Notifier
  55. from taipy.core.submission.submission_status import SubmissionStatus
  56. from taipy.gui import Gui, State
  57. from taipy.gui._warnings import _warn
  58. from taipy.gui.gui import _DoNotUpdate
  59. from ._adapters import (
  60. _EntityType,
  61. _get_datanode_property,
  62. _GuiCoreDatanodeAdapter,
  63. _GuiCoreScenarioProperties,
  64. _invoke_action,
  65. _is_debugging,
  66. )
  67. class _GuiCoreContext(CoreEventConsumerBase):
  68. __PROP_ENTITY_ID = "id"
  69. __PROP_ENTITY_COMMENT = "comment"
  70. __PROP_CONFIG_ID = "config"
  71. __PROP_DATE = "date"
  72. __PROP_ENTITY_NAME = "name"
  73. __PROP_SCENARIO_PRIMARY = "primary"
  74. __PROP_SCENARIO_TAGS = "tags"
  75. __ENTITY_PROPS = (__PROP_CONFIG_ID, __PROP_DATE, __PROP_ENTITY_NAME)
  76. __ACTION = "action"
  77. _CORE_CHANGED_NAME = "core_changed"
  78. def __init__(self, gui: Gui) -> None:
  79. self.gui = gui
  80. self.scenario_by_cycle: t.Optional[t.Dict[t.Optional[Cycle], t.List[Scenario]]] = None
  81. self.data_nodes_by_owner: t.Optional[t.Dict[t.Optional[str], t.List[DataNode]]] = None
  82. self.scenario_configs: t.Optional[t.List[t.Tuple[str, str]]] = None
  83. self.jobs_list: t.Optional[t.List[Job]] = None
  84. self.client_submission: t.Dict[str, SubmissionStatus] = {}
  85. # register to taipy core notification
  86. reg_id, reg_queue = Notifier.register()
  87. # locks
  88. self.lock = Lock()
  89. self.submissions_lock = Lock()
  90. # super
  91. super().__init__(reg_id, reg_queue)
  92. self.start()
  93. def process_event(self, event: Event):
  94. if event.entity_type == EventEntityType.SCENARIO:
  95. with self.gui._get_autorization(system=True):
  96. self.scenario_refresh(
  97. event.entity_id
  98. if event.operation != EventOperation.DELETION and is_readable(t.cast(ScenarioId, event.entity_id))
  99. else None
  100. )
  101. elif event.entity_type == EventEntityType.SEQUENCE and event.entity_id:
  102. sequence = None
  103. try:
  104. with self.gui._get_autorization(system=True):
  105. sequence = (
  106. core_get(event.entity_id)
  107. if event.operation != EventOperation.DELETION
  108. and is_readable(t.cast(SequenceId, event.entity_id))
  109. else None
  110. )
  111. if sequence and hasattr(sequence, "parent_ids") and sequence.parent_ids: # type: ignore
  112. self.gui._broadcast(
  113. _GuiCoreContext._CORE_CHANGED_NAME,
  114. {"scenario": list(sequence.parent_ids)}, # type: ignore
  115. )
  116. except Exception as e:
  117. _warn(f"Access to sequence {event.entity_id} failed", e)
  118. elif event.entity_type == EventEntityType.JOB:
  119. with self.lock:
  120. self.jobs_list = None
  121. elif event.entity_type == EventEntityType.SUBMISSION:
  122. self.submission_status_callback(event.entity_id, event)
  123. elif event.entity_type == EventEntityType.DATA_NODE:
  124. with self.lock:
  125. self.data_nodes_by_owner = None
  126. self.gui._broadcast(
  127. _GuiCoreContext._CORE_CHANGED_NAME,
  128. {"datanode": event.entity_id if event.operation != EventOperation.DELETION else True},
  129. )
  130. def scenario_refresh(self, scenario_id: t.Optional[str]):
  131. with self.lock:
  132. self.scenario_by_cycle = None
  133. self.data_nodes_by_owner = None
  134. self.gui._broadcast(
  135. _GuiCoreContext._CORE_CHANGED_NAME,
  136. {"scenario": scenario_id or True},
  137. )
  138. def submission_status_callback(self, submission_id: t.Optional[str] = None, event: t.Optional[Event] = None):
  139. if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
  140. return
  141. try:
  142. last_status = self.client_submission.get(submission_id)
  143. if not last_status:
  144. return
  145. submission = t.cast(Submission, core_get(submission_id))
  146. if not submission or not submission.entity_id:
  147. return
  148. new_status = submission.submission_status
  149. client_id = submission.properties.get("client_id")
  150. if client_id:
  151. running_tasks = {}
  152. with self.gui._get_autorization(client_id):
  153. for job in submission.jobs:
  154. job = job if isinstance(job, Job) else core_get(job)
  155. running_tasks[job.task.id] = (
  156. SubmissionStatus.RUNNING.value
  157. if job.is_running()
  158. else SubmissionStatus.PENDING.value
  159. if job.is_pending()
  160. else None
  161. )
  162. self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, {"tasks": running_tasks}, client_id)
  163. if last_status != new_status:
  164. # callback
  165. submission_name = submission.properties.get("on_submission")
  166. if submission_name:
  167. self.gui._call_user_callback(
  168. client_id,
  169. submission_name,
  170. [
  171. core_get(submission.id),
  172. {
  173. "submission_status": new_status.name,
  174. "submittable_entity": core_get(submission.entity_id),
  175. **(event.metadata if event else {}),
  176. },
  177. ],
  178. submission.properties.get("module_context"),
  179. )
  180. with self.submissions_lock:
  181. if new_status in (
  182. SubmissionStatus.COMPLETED,
  183. SubmissionStatus.FAILED,
  184. SubmissionStatus.CANCELED,
  185. ):
  186. self.client_submission.pop(submission_id, None)
  187. else:
  188. self.client_submission[submission_id] = new_status
  189. except Exception as e:
  190. _warn(f"Submission ({submission_id}) is not available", e)
  191. finally:
  192. self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, {"jobs": True})
  193. def no_change_adapter(self, entity: t.List):
  194. return entity
  195. def cycle_adapter(self, cycle: Cycle, sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None):
  196. try:
  197. if (
  198. isinstance(cycle, Cycle)
  199. and is_readable(cycle.id)
  200. and core_get(cycle.id) is not None
  201. and self.scenario_by_cycle
  202. ):
  203. return [
  204. cycle.id,
  205. cycle.get_simple_label(),
  206. self.get_sorted_entity_list(self.scenario_by_cycle.get(cycle, []), sorts),
  207. _EntityType.CYCLE.value,
  208. False,
  209. ]
  210. except Exception as e:
  211. _warn(
  212. f"Access to {type(cycle).__name__} " + f"({cycle.id if hasattr(cycle, 'id') else 'No_id'})" + " failed",
  213. e,
  214. )
  215. return None
  216. def scenario_adapter(self, scenario: Scenario):
  217. if isinstance(scenario, (tuple, list)):
  218. return scenario
  219. try:
  220. if isinstance(scenario, Scenario) and is_readable(scenario.id) and core_get(scenario.id) is not None:
  221. return [
  222. scenario.id,
  223. scenario.get_simple_label(),
  224. None,
  225. _EntityType.SCENARIO.value,
  226. scenario.is_primary,
  227. ]
  228. except Exception as e:
  229. _warn(
  230. f"Access to {type(scenario).__name__} "
  231. + f"({scenario.id if hasattr(scenario, 'id') else 'No_id'})"
  232. + " failed",
  233. e,
  234. )
  235. return None
  236. def filter_scenarios(self, cycle: t.List, col: str, col_type: str, is_dn: bool, action: str, val: t.Any):
  237. cycle[2] = [e for e in cycle[2] if _invoke_action(e, col, col_type, is_dn, action, val)]
  238. return cycle
  239. def adapt_scenarios(self, cycle: t.List):
  240. cycle[2] = [self.scenario_adapter(e) for e in cycle[2]]
  241. return cycle
  242. def get_sorted_entity_list(
  243. self,
  244. entities: t.Union[t.List[t.Union[Cycle, Scenario]], t.List[Scenario]],
  245. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  246. ):
  247. if sorts:
  248. sorted_list = entities
  249. for sd in reversed(sorts):
  250. col = sd.get("col", "")
  251. col = _GuiCoreScenarioProperties.get_col_name(col)
  252. order = sd.get("order", True)
  253. sorted_list = sorted(sorted_list, key=_GuiCoreContext.get_entity_property(col), reverse=not order)
  254. else:
  255. sorted_list = sorted(entities, key=_GuiCoreContext.get_entity_property("creation_date"))
  256. return [self.cycle_adapter(e, sorts) if isinstance(e, Cycle) else e for e in sorted_list]
  257. def get_scenarios(
  258. self,
  259. scenarios: t.Optional[t.List[t.Union[Cycle, Scenario]]],
  260. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  261. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  262. ):
  263. cycles_scenarios: t.List[t.Union[Cycle, Scenario]] = []
  264. with self.lock:
  265. if self.scenario_by_cycle is None:
  266. self.scenario_by_cycle = get_cycles_scenarios()
  267. if scenarios is None:
  268. for cycle, c_scenarios in self.scenario_by_cycle.items():
  269. if cycle is None:
  270. cycles_scenarios.extend(c_scenarios)
  271. else:
  272. cycles_scenarios.append(cycle)
  273. if scenarios is not None:
  274. cycles_scenarios = scenarios
  275. adapted_list = self.get_sorted_entity_list(cycles_scenarios, sorts)
  276. if filters:
  277. # filtering
  278. filtered_list = list(adapted_list)
  279. for fd in filters:
  280. col = fd.get("col", "")
  281. is_datanode_prop = _get_datanode_property(col) is not None
  282. col_type = _GuiCoreScenarioProperties.get_type(col)
  283. col = _GuiCoreScenarioProperties.get_col_name(col)
  284. val = fd.get("value")
  285. action = fd.get("action", "")
  286. if isinstance(val, str) and col_type == "date":
  287. val = datetime.fromisoformat(val[:-1])
  288. # level 1 filtering
  289. filtered_list = [
  290. e
  291. for e in filtered_list
  292. if not isinstance(e, Scenario) or _invoke_action(e, col, col_type, is_datanode_prop, action, val)
  293. ]
  294. # level 2 filtering
  295. filtered_list = [
  296. self.filter_scenarios(e, col, col_type, is_datanode_prop, action, val)
  297. if not isinstance(e, Scenario)
  298. else e
  299. for e in filtered_list
  300. ]
  301. # remove empty cycles
  302. adapted_list = [
  303. e for e in filtered_list if isinstance(e, Scenario) or (isinstance(e, (tuple, list)) and len(e[2]))
  304. ]
  305. return adapted_list
  306. def select_scenario(self, state: State, id: str, payload: t.Dict[str, str]):
  307. args = payload.get("args")
  308. if args is None or not isinstance(args, list) or len(args) < 2:
  309. return
  310. state.assign(args[0], args[1])
  311. def get_scenario_by_id(self, id: str) -> t.Optional[Scenario]:
  312. if not id or not is_readable(t.cast(ScenarioId, id)):
  313. return None
  314. try:
  315. return core_get(t.cast(ScenarioId, id))
  316. except Exception:
  317. return None
  318. def get_scenario_configs(self):
  319. with self.lock:
  320. if self.scenario_configs is None:
  321. configs = Config.scenarios
  322. if isinstance(configs, dict):
  323. self.scenario_configs = [(id, f"{c.id}") for id, c in configs.items() if id != "default"]
  324. return self.scenario_configs
  325. def crud_scenario(self, state: State, id: str, payload: t.Dict[str, str]): # noqa: C901
  326. args = payload.get("args")
  327. start_idx = 2
  328. if (
  329. args is None
  330. or not isinstance(args, list)
  331. or len(args) < start_idx + 3
  332. or not isinstance(args[start_idx], bool)
  333. or not isinstance(args[start_idx + 1], bool)
  334. or not isinstance(args[start_idx + 2], dict)
  335. ):
  336. return
  337. error_var = payload.get("error_id")
  338. update = args[start_idx]
  339. delete = args[start_idx + 1]
  340. data = args[start_idx + 2]
  341. with_dialog = True if len(args) < start_idx + 4 else bool(args[start_idx + 3])
  342. scenario = None
  343. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  344. if update:
  345. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  346. if delete:
  347. if not is_deletable(scenario_id):
  348. state.assign(error_var, f"Scenario. {scenario_id} is not deletable.")
  349. return
  350. try:
  351. core_delete(scenario_id)
  352. except Exception as e:
  353. state.assign(error_var, f"Error deleting Scenario. {e}")
  354. else:
  355. if not self.__check_readable_editable(state, scenario_id, "Scenario", error_var):
  356. return
  357. scenario = core_get(scenario_id)
  358. else:
  359. if with_dialog:
  360. config_id = data.get(_GuiCoreContext.__PROP_CONFIG_ID)
  361. scenario_config = Config.scenarios.get(config_id)
  362. if with_dialog and scenario_config is None:
  363. state.assign(error_var, f"Invalid configuration id ({config_id})")
  364. return
  365. date_str = data.get(_GuiCoreContext.__PROP_DATE)
  366. try:
  367. date = parser.parse(date_str) if isinstance(date_str, str) else None
  368. except Exception as e:
  369. state.assign(error_var, f"Invalid date ({date_str}).{e}")
  370. return
  371. else:
  372. scenario_config = None
  373. date = None
  374. scenario_id = None
  375. try:
  376. gui = state.get_gui()
  377. on_creation = args[0] if isinstance(args[0], str) else None
  378. on_creation_function = gui._get_user_function(on_creation) if on_creation else None
  379. if callable(on_creation_function):
  380. try:
  381. res = gui._call_function_with_state(
  382. on_creation_function,
  383. [
  384. id,
  385. {
  386. "action": on_creation,
  387. "config": scenario_config,
  388. "date": date,
  389. "label": name,
  390. "properties": {v.get("key"): v.get("value") for v in data.get("properties", {})},
  391. },
  392. ],
  393. )
  394. if isinstance(res, Scenario):
  395. # everything's fine
  396. scenario_id = res.id
  397. state.assign(error_var, "")
  398. return
  399. if res:
  400. # do not create
  401. state.assign(error_var, f"{res}")
  402. return
  403. except Exception as e: # pragma: no cover
  404. if not gui._call_on_exception(on_creation, e):
  405. _warn(f"on_creation(): Exception raised in '{on_creation}()'", e)
  406. state.assign(
  407. error_var,
  408. f"Error creating Scenario with '{on_creation}()'. {e}",
  409. )
  410. return
  411. elif on_creation is not None:
  412. _warn(f"on_creation(): '{on_creation}' is not a function.")
  413. elif not with_dialog:
  414. if len(Config.scenarios) == 2:
  415. scenario_config = next(sc for k, sc in Config.scenarios.items() if k != "default")
  416. else:
  417. state.assign(
  418. error_var,
  419. "Error creating Scenario: only one scenario config needed "
  420. + f"({len(Config.scenarios) - 1}) found.",
  421. )
  422. return
  423. scenario = create_scenario(scenario_config, date, name)
  424. scenario_id = scenario.id
  425. except Exception as e:
  426. state.assign(error_var, f"Error creating Scenario. {e}")
  427. finally:
  428. self.scenario_refresh(scenario_id)
  429. if scenario and (sel_scenario_var := args[1] if isinstance(args[1], str) else None):
  430. try:
  431. var_name, _ = gui._get_real_var_name(sel_scenario_var)
  432. state.assign(var_name, scenario)
  433. except Exception as e: # pragma: no cover
  434. _warn("Can't find value variable name in context", e)
  435. if scenario:
  436. if not is_editable(scenario):
  437. state.assign(error_var, f"Scenario {scenario_id or name} is not editable.")
  438. return
  439. with scenario as sc:
  440. sc.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  441. if props := data.get("properties"):
  442. try:
  443. new_keys = [prop.get("key") for prop in props]
  444. for key in t.cast(dict, sc.properties).keys():
  445. if key and key not in _GuiCoreContext.__ENTITY_PROPS and key not in new_keys:
  446. t.cast(dict, sc.properties).pop(key, None)
  447. for prop in props:
  448. key = prop.get("key")
  449. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  450. sc._properties[key] = prop.get("value")
  451. state.assign(error_var, "")
  452. except Exception as e:
  453. state.assign(error_var, f"Error creating Scenario. {e}")
  454. @staticmethod
  455. def __assign_var(state: State, var_name: t.Optional[str], msg: str):
  456. if var_name:
  457. state.assign(var_name, msg)
  458. def edit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  459. args = payload.get("args")
  460. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  461. return
  462. error_var = payload.get("error_id")
  463. data = args[0]
  464. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  465. sequence = data.get("sequence")
  466. if not self.__check_readable_editable(state, entity_id, "Scenario", error_var):
  467. return
  468. scenario: Scenario = core_get(entity_id)
  469. if scenario:
  470. try:
  471. if not sequence:
  472. if isinstance(sequence, str) and (name := data.get(_GuiCoreContext.__PROP_ENTITY_NAME)):
  473. scenario.add_sequence(name, data.get("task_ids"))
  474. else:
  475. primary = data.get(_GuiCoreContext.__PROP_SCENARIO_PRIMARY)
  476. if primary is True:
  477. if not is_promotable(scenario):
  478. _GuiCoreContext.__assign_var(
  479. state, error_var, f"Scenario {entity_id} is not promotable."
  480. )
  481. return
  482. set_primary(scenario)
  483. self.__edit_properties(scenario, data)
  484. else:
  485. if data.get("del", False):
  486. scenario.remove_sequence(sequence)
  487. else:
  488. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  489. if sequence != name:
  490. scenario.rename_sequence(sequence, name)
  491. if seqEntity := scenario.sequences.get(name):
  492. seqEntity.tasks = data.get("task_ids")
  493. self.__edit_properties(seqEntity, data)
  494. else:
  495. _GuiCoreContext.__assign_var(
  496. state,
  497. error_var,
  498. f"Sequence {name} is not available in Scenario {entity_id}.",
  499. )
  500. return
  501. _GuiCoreContext.__assign_var(state, error_var, "")
  502. except Exception as e:
  503. _GuiCoreContext.__assign_var(state, error_var, f"Error updating {type(scenario).__name__}. {e}")
  504. def submit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  505. args = payload.get("args")
  506. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  507. return
  508. data = args[0]
  509. error_var = payload.get("error_id")
  510. try:
  511. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  512. entity = core_get(scenario_id)
  513. if sequence := data.get("sequence"):
  514. entity = entity.sequences.get(sequence)
  515. if not (reason := is_submittable(entity)):
  516. _GuiCoreContext.__assign_var(
  517. state,
  518. error_var,
  519. f"{'Sequence' if sequence else 'Scenario'} {sequence or scenario_id} is not submittable: "
  520. + reason.reasons,
  521. )
  522. return
  523. if entity:
  524. on_submission = data.get("on_submission_change")
  525. submission_entity = core_submit(
  526. entity,
  527. on_submission=on_submission,
  528. client_id=self.gui._get_client_id(),
  529. module_context=self.gui._get_locals_context(),
  530. )
  531. if on_submission:
  532. with self.submissions_lock:
  533. self.client_submission[submission_entity.id] = submission_entity.submission_status
  534. if Config.core.mode == "development":
  535. with self.submissions_lock:
  536. self.client_submission[submission_entity.id] = SubmissionStatus.SUBMITTED
  537. self.submission_status_callback(submission_entity.id)
  538. _GuiCoreContext.__assign_var(state, error_var, "")
  539. except Exception as e:
  540. _GuiCoreContext.__assign_var(state, error_var, f"Error submitting entity. {e}")
  541. def __do_datanodes_tree(self):
  542. if self.data_nodes_by_owner is None:
  543. self.data_nodes_by_owner = defaultdict(list)
  544. for dn in get_data_nodes():
  545. self.data_nodes_by_owner[dn.owner_id].append(dn)
  546. def get_datanodes_tree(self, scenarios: t.Optional[t.Union[Scenario, t.List[Scenario]]]):
  547. with self.lock:
  548. self.__do_datanodes_tree()
  549. if scenarios is None:
  550. return (self.data_nodes_by_owner.get(None, []) if self.data_nodes_by_owner else []) + (
  551. self.get_scenarios(None, None, None) or []
  552. )
  553. if not self.data_nodes_by_owner:
  554. return []
  555. if isinstance(scenarios, (list, tuple)) and len(scenarios) > 1:
  556. return scenarios
  557. owners = scenarios if isinstance(scenarios, (list, tuple)) else [scenarios]
  558. return [d for owner in owners for d in self.data_nodes_by_owner.get(owner.id, [])]
  559. def data_node_adapter(self, data):
  560. if isinstance(data, (tuple, list)):
  561. return data
  562. try:
  563. if hasattr(data, "id") and is_readable(data.id) and core_get(data.id) is not None:
  564. if isinstance(data, DataNode):
  565. return (data.id, data.get_simple_label(), None, _EntityType.DATANODE.value, False)
  566. with self.lock:
  567. self.__do_datanodes_tree()
  568. if self.data_nodes_by_owner:
  569. if isinstance(data, Cycle):
  570. return (
  571. data.id,
  572. data.get_simple_label(),
  573. self.data_nodes_by_owner[data.id] + self.scenario_by_cycle.get(data, []),
  574. _EntityType.CYCLE.value,
  575. False,
  576. )
  577. elif isinstance(data, Scenario):
  578. return (
  579. data.id,
  580. data.get_simple_label(),
  581. self.data_nodes_by_owner[data.id] + list(data.sequences.values()),
  582. _EntityType.SCENARIO.value,
  583. data.is_primary,
  584. )
  585. elif isinstance(data, Sequence):
  586. if datanodes := self.data_nodes_by_owner.get(data.id):
  587. return (
  588. data.id,
  589. data.get_simple_label(),
  590. datanodes,
  591. _EntityType.SEQUENCE.value,
  592. False,
  593. )
  594. except Exception as e:
  595. _warn(
  596. f"Access to {type(data)} ({data.id if hasattr(data, 'id') else 'No_id'}) failed",
  597. e,
  598. )
  599. return None
  600. def get_jobs_list(self):
  601. with self.lock:
  602. if self.jobs_list is None:
  603. self.jobs_list = get_jobs()
  604. return self.jobs_list
  605. def job_adapter(self, job):
  606. try:
  607. if hasattr(job, "id") and is_readable(job.id) and core_get(job.id) is not None:
  608. if isinstance(job, Job):
  609. entity = core_get(job.owner_id)
  610. return (
  611. job.id,
  612. job.get_simple_label(),
  613. [],
  614. entity.get_simple_label() if entity else "",
  615. entity.id if entity else "",
  616. job.submit_id,
  617. job.creation_date,
  618. job.status.value,
  619. is_deletable(job),
  620. is_readable(job),
  621. is_editable(job),
  622. )
  623. except Exception as e:
  624. _warn(f"Access to job ({job.id if hasattr(job, 'id') else 'No_id'}) failed", e)
  625. return None
  626. def act_on_jobs(self, state: State, id: str, payload: t.Dict[str, str]):
  627. args = payload.get("args")
  628. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  629. return
  630. data = args[0]
  631. job_ids = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  632. job_action = data.get(_GuiCoreContext.__ACTION)
  633. if job_action and isinstance(job_ids, list):
  634. errs = []
  635. if job_action == "delete":
  636. for job_id in job_ids:
  637. if not is_readable(job_id):
  638. errs.append(f"Job {job_id} is not readable.")
  639. continue
  640. if not is_deletable(job_id):
  641. errs.append(f"Job {job_id} is not deletable.")
  642. continue
  643. try:
  644. delete_job(core_get(job_id))
  645. except Exception as e:
  646. errs.append(f"Error deleting job. {e}")
  647. elif job_action == "cancel":
  648. for job_id in job_ids:
  649. if not is_readable(job_id):
  650. errs.append(f"Job {job_id} is not readable.")
  651. continue
  652. if not is_editable(job_id):
  653. errs.append(f"Job {job_id} is not cancelable.")
  654. continue
  655. try:
  656. cancel_job(job_id)
  657. except Exception as e:
  658. errs.append(f"Error canceling job. {e}")
  659. _GuiCoreContext.__assign_var(state, payload.get("error_id"), "<br/>".join(errs) if errs else "")
  660. def edit_data_node(self, state: State, id: str, payload: t.Dict[str, str]):
  661. args = payload.get("args")
  662. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  663. return
  664. error_var = payload.get("error_id")
  665. data = args[0]
  666. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  667. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  668. return
  669. entity: DataNode = core_get(entity_id)
  670. if isinstance(entity, DataNode):
  671. try:
  672. self.__edit_properties(entity, data)
  673. _GuiCoreContext.__assign_var(state, error_var, "")
  674. except Exception as e:
  675. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode. {e}")
  676. def lock_datanode_for_edit(self, state: State, id: str, payload: t.Dict[str, str]):
  677. args = payload.get("args")
  678. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  679. return
  680. data = args[0]
  681. error_var = payload.get("error_id")
  682. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  683. if not self.__check_readable_editable(state, entity_id, "Datanode", error_var):
  684. return
  685. lock = data.get("lock", True)
  686. entity: DataNode = core_get(entity_id)
  687. if isinstance(entity, DataNode):
  688. try:
  689. if lock:
  690. entity.lock_edit(self.gui._get_client_id())
  691. else:
  692. entity.unlock_edit(self.gui._get_client_id())
  693. _GuiCoreContext.__assign_var(state, error_var, "")
  694. except Exception as e:
  695. _GuiCoreContext.__assign_var(state, error_var, f"Error locking Datanode. {e}")
  696. def __edit_properties(self, entity: t.Union[Scenario, Sequence, DataNode], data: t.Dict[str, str]):
  697. with entity as ent:
  698. if isinstance(ent, Scenario):
  699. tags = data.get(_GuiCoreContext.__PROP_SCENARIO_TAGS)
  700. if isinstance(tags, (list, tuple)):
  701. ent.tags = dict(tags)
  702. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  703. if isinstance(name, str):
  704. if hasattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME):
  705. setattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME, name)
  706. else:
  707. ent.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  708. props = data.get("properties")
  709. if isinstance(props, (list, tuple)):
  710. for prop in props:
  711. key = prop.get("key")
  712. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  713. ent.properties[key] = prop.get("value")
  714. deleted_props = data.get("deleted_properties")
  715. if isinstance(deleted_props, (list, tuple)):
  716. for prop in deleted_props:
  717. key = prop.get("key")
  718. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  719. ent.properties.pop(key, None)
  720. @staticmethod
  721. def get_entity_property(col: str):
  722. def sort_key(entity: t.Union[Scenario, Cycle]):
  723. # we compare only strings
  724. try:
  725. val = attrgetter(col)(entity)
  726. except AttributeError as e:
  727. if _is_debugging():
  728. _warn("Attribute", e)
  729. val = ""
  730. return val.isoformat() if isinstance(val, (datetime, date)) else str(val)
  731. return sort_key
  732. def get_scenarios_for_owner(self, owner_id: str):
  733. cycles_scenarios: t.List[t.Union[Scenario, Cycle]] = []
  734. with self.lock:
  735. if self.scenario_by_cycle is None:
  736. self.scenario_by_cycle = get_cycles_scenarios()
  737. if owner_id:
  738. if owner_id == "GLOBAL":
  739. for cycle, scenarios in self.scenario_by_cycle.items():
  740. if cycle is None:
  741. cycles_scenarios.extend(scenarios)
  742. else:
  743. cycles_scenarios.append(cycle)
  744. elif is_readable(t.cast(ScenarioId, owner_id)):
  745. entity = core_get(owner_id)
  746. if entity and (scenarios_cycle := self.scenario_by_cycle.get(t.cast(Cycle, entity))):
  747. cycles_scenarios.extend(scenarios_cycle)
  748. elif isinstance(entity, Scenario):
  749. cycles_scenarios.append(entity)
  750. return sorted(cycles_scenarios, key=_GuiCoreContext.get_entity_property("creation_date"))
  751. def get_data_node_history(self, id: str):
  752. if id and (dn := core_get(id)) and isinstance(dn, DataNode):
  753. res = []
  754. for e in dn.edits:
  755. job_id = e.get("job_id")
  756. job: t.Optional[Job] = None
  757. if job_id:
  758. if not is_readable(job_id):
  759. job_id += " not readable"
  760. else:
  761. job = core_get(job_id)
  762. res.append(
  763. (
  764. e.get("timestamp"),
  765. job_id if job_id else e.get("writer_identifier", ""),
  766. f"Execution of task {job.task.get_simple_label()}."
  767. if job and job.task
  768. else e.get("comment", ""),
  769. )
  770. )
  771. return sorted(res, key=lambda r: r[0], reverse=True)
  772. return _DoNotUpdate()
  773. def __check_readable_editable(self, state: State, id: str, ent_type: str, var: t.Optional[str]):
  774. if not is_readable(t.cast(ScenarioId, id)):
  775. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not readable.")
  776. return False
  777. if not is_editable(t.cast(ScenarioId, id)):
  778. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not editable.")
  779. return False
  780. return True
  781. def update_data(self, state: State, id: str, payload: t.Dict[str, str]):
  782. args = payload.get("args")
  783. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  784. return
  785. data = args[0]
  786. error_var = payload.get("error_id")
  787. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  788. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  789. return
  790. entity: DataNode = core_get(entity_id)
  791. if isinstance(entity, DataNode):
  792. try:
  793. entity.write(
  794. parser.parse(data.get("value"))
  795. if data.get("type") == "date"
  796. else int(data.get("value"))
  797. if data.get("type") == "int"
  798. else float(data.get("value"))
  799. if data.get("type") == "float"
  800. else data.get("value"),
  801. comment=data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT),
  802. )
  803. entity.unlock_edit(self.gui._get_client_id())
  804. _GuiCoreContext.__assign_var(state, error_var, "")
  805. except Exception as e:
  806. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode value. {e}")
  807. _GuiCoreContext.__assign_var(state, payload.get("data_id"), entity_id) # this will update the data value
  808. def tabular_data_edit(self, state: State, var_name: str, payload: dict):
  809. error_var = payload.get("error_id")
  810. user_data = payload.get("user_data", {})
  811. dn_id = user_data.get("dn_id")
  812. if not self.__check_readable_editable(state, dn_id, "DataNode", error_var):
  813. return
  814. datanode = core_get(dn_id) if dn_id else None
  815. if isinstance(datanode, DataNode):
  816. try:
  817. idx = t.cast(int, payload.get("index"))
  818. col = t.cast(str, payload.get("col"))
  819. tz = payload.get("tz")
  820. val = (
  821. parser.parse(str(payload.get("value"))).astimezone(zoneinfo.ZoneInfo(tz)).replace(tzinfo=None)
  822. if tz is not None
  823. else payload.get("value")
  824. )
  825. # user_value = payload.get("user_value")
  826. data = self.__read_tabular_data(datanode)
  827. new_data: t.Any = None
  828. if isinstance(data, (pd.DataFrame, pd.Series)):
  829. if isinstance(data, pd.DataFrame):
  830. data.at[idx, col] = val
  831. elif isinstance(data, pd.Series):
  832. data.at[idx] = val
  833. new_data = data
  834. else:
  835. data_tuple = False
  836. if isinstance(data, tuple):
  837. data_tuple = True
  838. data = list(data)
  839. if isinstance(data, list):
  840. row = data[idx]
  841. row_tuple = False
  842. if isinstance(row, tuple):
  843. row = list(row)
  844. row_tuple = True
  845. if isinstance(row, list):
  846. row[int(col)] = val
  847. if row_tuple:
  848. data[idx] = tuple(row)
  849. new_data = data
  850. elif col == "0" and (isinstance(row, (str, Number)) or "date" in type(row).__name__):
  851. data[idx] = val
  852. new_data = data
  853. else:
  854. _GuiCoreContext.__assign_var(
  855. state,
  856. error_var,
  857. "Error updating Datanode: cannot handle multi-column list value.",
  858. )
  859. if data_tuple and new_data is not None:
  860. new_data = tuple(new_data)
  861. else:
  862. _GuiCoreContext.__assign_var(
  863. state,
  864. error_var,
  865. "Error updating Datanode tabular value: type does not support at[] indexer.",
  866. )
  867. if new_data is not None:
  868. datanode.write(new_data, comment=user_data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT))
  869. _GuiCoreContext.__assign_var(state, error_var, "")
  870. except Exception as e:
  871. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode tabular value. {e}")
  872. _GuiCoreContext.__assign_var(state, payload.get("data_id"), dn_id)
  873. def get_data_node_properties(self, id: str):
  874. if id and is_readable(t.cast(DataNodeId, id)) and (dn := core_get(id)) and isinstance(dn, DataNode):
  875. try:
  876. return (
  877. (
  878. (k, f"{v}")
  879. for k, v in dn._get_user_properties().items()
  880. if k != _GuiCoreContext.__PROP_ENTITY_NAME
  881. ),
  882. )
  883. except Exception:
  884. return None
  885. return None
  886. def __read_tabular_data(self, datanode: DataNode):
  887. return datanode.read()
  888. def get_data_node_tabular_data(self, id: str):
  889. if (
  890. id
  891. and is_readable(t.cast(DataNodeId, id))
  892. and (dn := core_get(id))
  893. and isinstance(dn, DataNode)
  894. and dn.is_ready_for_reading
  895. ):
  896. try:
  897. value = self.__read_tabular_data(dn)
  898. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  899. return value
  900. except Exception:
  901. return None
  902. return None
  903. def get_data_node_tabular_columns(self, id: str):
  904. if (
  905. id
  906. and is_readable(t.cast(DataNodeId, id))
  907. and (dn := core_get(id))
  908. and isinstance(dn, DataNode)
  909. and dn.is_ready_for_reading
  910. ):
  911. try:
  912. value = self.__read_tabular_data(dn)
  913. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  914. return self.gui._tbl_cols(
  915. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=value
  916. )
  917. except Exception:
  918. return None
  919. return None
  920. def get_data_node_chart_config(self, id: str):
  921. if (
  922. id
  923. and is_readable(t.cast(DataNodeId, id))
  924. and (dn := core_get(id))
  925. and isinstance(dn, DataNode)
  926. and dn.is_ready_for_reading
  927. ):
  928. try:
  929. return self.gui._chart_conf(
  930. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=self.__read_tabular_data(dn)
  931. )
  932. except Exception:
  933. return None
  934. return None
  935. def on_dag_select(self, state: State, id: str, payload: t.Dict[str, str]):
  936. args = payload.get("args")
  937. if args is None or not isinstance(args, list) or len(args) < 2:
  938. return
  939. on_action_function = self.gui._get_user_function(args[1]) if args[1] else None
  940. if callable(on_action_function):
  941. try:
  942. entity = core_get(args[0]) if is_readable(args[0]) else f"unredable({args[0]})"
  943. self.gui._call_function_with_state(
  944. on_action_function,
  945. [entity],
  946. )
  947. except Exception as e:
  948. if not self.gui._call_on_exception(args[1], e):
  949. _warn(f"dag.on_action(): Exception raised in '{args[1]}()' with '{args[0]}'", e)
  950. elif args[1]:
  951. _warn(f"dag.on_action(): Invalid function '{args[1]}()'.")