_context.py 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import json
  12. import typing as t
  13. from collections import defaultdict
  14. from numbers import Number
  15. from threading import Lock
  16. try:
  17. import zoneinfo
  18. except ImportError:
  19. from backports import zoneinfo # type: ignore[no-redef]
  20. import pandas as pd
  21. from dateutil import parser
  22. from taipy.config import Config
  23. from taipy.core import (
  24. Cycle,
  25. DataNode,
  26. DataNodeId,
  27. Job,
  28. Scenario,
  29. ScenarioId,
  30. Sequence,
  31. SequenceId,
  32. Submission,
  33. SubmissionId,
  34. cancel_job,
  35. create_scenario,
  36. delete_job,
  37. get_cycles_scenarios,
  38. get_data_nodes,
  39. get_jobs,
  40. is_deletable,
  41. is_editable,
  42. is_promotable,
  43. is_readable,
  44. is_submittable,
  45. set_primary,
  46. )
  47. from taipy.core import delete as core_delete
  48. from taipy.core import get as core_get
  49. from taipy.core import submit as core_submit
  50. from taipy.core.notification import CoreEventConsumerBase, EventEntityType
  51. from taipy.core.notification.event import Event, EventOperation
  52. from taipy.core.notification.notifier import Notifier
  53. from taipy.core.submission.submission_status import SubmissionStatus
  54. from taipy.core.taipy import can_create
  55. from taipy.gui import Gui, State
  56. from taipy.gui._warnings import _warn
  57. from taipy.gui.gui import _DoNotUpdate
  58. from ._adapters import (
  59. CustomScenarioFilter,
  60. _EntityType,
  61. _get_entity_property,
  62. _GuiCoreDatanodeAdapter,
  63. _GuiCoreScenarioProperties,
  64. _invoke_action,
  65. )
  66. class _GuiCoreContext(CoreEventConsumerBase):
  67. __PROP_ENTITY_ID = "id"
  68. __PROP_ENTITY_COMMENT = "comment"
  69. __PROP_CONFIG_ID = "config"
  70. __PROP_DATE = "date"
  71. __PROP_ENTITY_NAME = "name"
  72. __PROP_SCENARIO_PRIMARY = "primary"
  73. __PROP_SCENARIO_TAGS = "tags"
  74. __ENTITY_PROPS = (__PROP_CONFIG_ID, __PROP_DATE, __PROP_ENTITY_NAME)
  75. __ACTION = "action"
  76. _CORE_CHANGED_NAME = "core_changed"
  77. def __init__(self, gui: Gui) -> None:
  78. self.gui = gui
  79. self.scenario_by_cycle: t.Optional[t.Dict[t.Optional[Cycle], t.List[Scenario]]] = None
  80. self.data_nodes_by_owner: t.Optional[t.Dict[t.Optional[str], t.List[DataNode]]] = None
  81. self.scenario_configs: t.Optional[t.List[t.Tuple[str, str]]] = None
  82. self.jobs_list: t.Optional[t.List[Job]] = None
  83. self.client_submission: t.Dict[str, SubmissionStatus] = {}
  84. # register to taipy core notification
  85. reg_id, reg_queue = Notifier.register()
  86. # locks
  87. self.lock = Lock()
  88. self.submissions_lock = Lock()
  89. # super
  90. super().__init__(reg_id, reg_queue)
  91. self.start()
  92. def process_event(self, event: Event):
  93. if event.entity_type == EventEntityType.SCENARIO:
  94. with self.gui._get_autorization(system=True):
  95. self.scenario_refresh(
  96. event.entity_id
  97. if event.operation == EventOperation.DELETION or is_readable(t.cast(ScenarioId, event.entity_id))
  98. else None
  99. )
  100. elif event.entity_type == EventEntityType.SEQUENCE and event.entity_id:
  101. sequence = None
  102. try:
  103. with self.gui._get_autorization(system=True):
  104. sequence = (
  105. core_get(event.entity_id)
  106. if event.operation != EventOperation.DELETION
  107. and is_readable(t.cast(SequenceId, event.entity_id))
  108. else None
  109. )
  110. if sequence and hasattr(sequence, "parent_ids") and sequence.parent_ids: # type: ignore
  111. self.broadcast_core_changed({"scenario": list(sequence.parent_ids)})
  112. except Exception as e:
  113. _warn(f"Access to sequence {event.entity_id} failed", e)
  114. elif event.entity_type == EventEntityType.JOB:
  115. with self.lock:
  116. self.jobs_list = None
  117. self.broadcast_core_changed({"jobs": event.entity_id})
  118. elif event.entity_type == EventEntityType.SUBMISSION:
  119. self.submission_status_callback(event.entity_id, event)
  120. elif event.entity_type == EventEntityType.DATA_NODE:
  121. with self.lock:
  122. self.data_nodes_by_owner = None
  123. self.broadcast_core_changed(
  124. {"datanode": event.entity_id if event.operation != EventOperation.DELETION else True}
  125. )
  126. def broadcast_core_changed(self, payload: t.Dict[str, t.Any], client_id: t.Optional[str] = None):
  127. self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, payload, client_id)
  128. def scenario_refresh(self, scenario_id: t.Optional[str]):
  129. with self.lock:
  130. self.scenario_by_cycle = None
  131. self.data_nodes_by_owner = None
  132. self.broadcast_core_changed({"scenario": scenario_id or True})
  133. def submission_status_callback(self, submission_id: t.Optional[str] = None, event: t.Optional[Event] = None):
  134. if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
  135. return
  136. submission = None
  137. new_status = None
  138. payload: t.Optional[t.Dict[str, t.Any]] = None
  139. client_id: t.Optional[str] = None
  140. try:
  141. last_status = self.client_submission.get(submission_id)
  142. if not last_status:
  143. return
  144. submission = t.cast(Submission, core_get(submission_id))
  145. if not submission or not submission.entity_id:
  146. return
  147. payload = {}
  148. new_status = t.cast(SubmissionStatus, submission.submission_status)
  149. client_id = submission.properties.get("client_id")
  150. if client_id:
  151. running_tasks = {}
  152. with self.gui._get_autorization(client_id):
  153. for job in submission.jobs:
  154. job = job if isinstance(job, Job) else core_get(job)
  155. running_tasks[job.task.id] = (
  156. SubmissionStatus.RUNNING.value
  157. if job.is_running()
  158. else SubmissionStatus.PENDING.value
  159. if job.is_pending()
  160. else None
  161. )
  162. payload.update(tasks=running_tasks)
  163. if last_status != new_status:
  164. # callback
  165. submission_name = submission.properties.get("on_submission")
  166. if submission_name:
  167. self.gui.invoke_callback(
  168. client_id,
  169. submission_name,
  170. [
  171. core_get(submission.id),
  172. {
  173. "submission_status": new_status.name,
  174. "submittable_entity": core_get(submission.entity_id),
  175. **(event.metadata if event else {}),
  176. },
  177. ],
  178. submission.properties.get("module_context"),
  179. )
  180. with self.submissions_lock:
  181. if new_status in (
  182. SubmissionStatus.COMPLETED,
  183. SubmissionStatus.FAILED,
  184. SubmissionStatus.CANCELED,
  185. ):
  186. self.client_submission.pop(submission_id, None)
  187. else:
  188. self.client_submission[submission_id] = new_status
  189. except Exception as e:
  190. _warn(f"Submission ({submission_id}) is not available", e)
  191. finally:
  192. if payload is not None:
  193. payload.update(jobs=True)
  194. entity_id = submission.entity_id if submission else None
  195. if entity_id:
  196. payload.update(scenario=entity_id)
  197. if new_status:
  198. payload.update(submission=new_status.value)
  199. self.broadcast_core_changed(payload, client_id)
  200. def no_change_adapter(self, entity: t.List):
  201. return entity
  202. def cycle_adapter(self, cycle: Cycle, sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None):
  203. try:
  204. if (
  205. isinstance(cycle, Cycle)
  206. and is_readable(cycle.id)
  207. and core_get(cycle.id) is not None
  208. and self.scenario_by_cycle
  209. ):
  210. return [
  211. cycle.id,
  212. cycle.get_simple_label(),
  213. self.get_sorted_scenario_list(self.scenario_by_cycle.get(cycle, []), sorts),
  214. _EntityType.CYCLE.value,
  215. False,
  216. ]
  217. except Exception as e:
  218. _warn(
  219. f"Access to {type(cycle).__name__} " + f"({cycle.id if hasattr(cycle, 'id') else 'No_id'})" + " failed",
  220. e,
  221. )
  222. return None
  223. def scenario_adapter(self, scenario: Scenario):
  224. if isinstance(scenario, (tuple, list)):
  225. return scenario
  226. try:
  227. if isinstance(scenario, Scenario) and is_readable(scenario.id) and core_get(scenario.id) is not None:
  228. return [
  229. scenario.id,
  230. scenario.get_simple_label(),
  231. None,
  232. _EntityType.SCENARIO.value,
  233. scenario.is_primary,
  234. ]
  235. except Exception as e:
  236. _warn(
  237. f"Access to {type(scenario).__name__} "
  238. + f"({scenario.id if hasattr(scenario, 'id') else 'No_id'})"
  239. + " failed",
  240. e,
  241. )
  242. return None
  243. def filter_entities(
  244. self, cycle_scenario: t.List, col: str, col_type: str, is_dn: bool, action: str, val: t.Any, col_fn=None
  245. ):
  246. cycle_scenario[2] = [
  247. e for e in cycle_scenario[2] if _invoke_action(e, col, col_type, is_dn, action, val, col_fn)
  248. ]
  249. return cycle_scenario
  250. def adapt_scenarios(self, cycle: t.List):
  251. cycle[2] = [self.scenario_adapter(e) for e in cycle[2]]
  252. return cycle
  253. def get_sorted_scenario_list(
  254. self,
  255. entities: t.Union[t.List[t.Union[Cycle, Scenario]], t.List[Scenario]],
  256. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  257. ):
  258. if sorts:
  259. sorted_list = entities
  260. for sd in reversed(sorts):
  261. col = sd.get("col", "")
  262. order = sd.get("order", True)
  263. sorted_list = sorted(sorted_list, key=_get_entity_property(col, Scenario, Cycle), reverse=not order)
  264. else:
  265. sorted_list = sorted(entities, key=_get_entity_property("creation_date", Scenario, Cycle))
  266. return [self.cycle_adapter(e, sorts) if isinstance(e, Cycle) else e for e in sorted_list]
  267. def get_filtered_scenario_list(
  268. self,
  269. entities: t.List[t.Union[t.List, Scenario]],
  270. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  271. ):
  272. if not filters:
  273. return entities
  274. # filtering
  275. filtered_list = list(entities)
  276. for fd in filters:
  277. col = fd.get("col", "")
  278. is_datanode_prop = _GuiCoreScenarioProperties.is_datanode_property(col)
  279. col_type = fd.get("type", "no type")
  280. col_fn = cp[0] if (cp := col.split("(")) and len(cp) > 1 else None
  281. val = fd.get("value")
  282. action = fd.get("action", "")
  283. customs = CustomScenarioFilter._get_custom(col)
  284. if customs:
  285. with self.gui._set_locals_context(customs[0] or None):
  286. fn = self.gui._get_user_function(customs[1])
  287. if callable(fn):
  288. col = fn
  289. if (
  290. isinstance(col, str)
  291. and next(filter(lambda s: not s.isidentifier(), (col_fn or col).split(".")), False) is True
  292. ):
  293. _warn(f'Error filtering with "{col_fn or col}": not a valid Python identifier.')
  294. continue
  295. # level 1 filtering
  296. filtered_list = [
  297. e
  298. for e in filtered_list
  299. if not isinstance(e, Scenario)
  300. or _invoke_action(e, col, col_type, is_datanode_prop, action, val, col_fn)
  301. ]
  302. # level 2 filtering
  303. filtered_list = [
  304. e
  305. if isinstance(e, Scenario)
  306. else self.filter_entities(e, col, col_type, is_datanode_prop, action, val, col_fn)
  307. for e in filtered_list
  308. ]
  309. # remove empty cycles
  310. return [e for e in filtered_list if isinstance(e, Scenario) or (isinstance(e, (tuple, list)) and len(e[2]))]
  311. def get_scenarios(
  312. self,
  313. scenarios: t.Optional[t.List[t.Union[Cycle, Scenario]]],
  314. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  315. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  316. ):
  317. cycles_scenarios: t.List[t.Union[Cycle, Scenario]] = []
  318. with self.lock:
  319. # always needed to get scenarios for a cycle in cycle_adapter
  320. if self.scenario_by_cycle is None:
  321. self.scenario_by_cycle = get_cycles_scenarios()
  322. if scenarios is None:
  323. for cycle, c_scenarios in self.scenario_by_cycle.items():
  324. if cycle is None:
  325. cycles_scenarios.extend(c_scenarios)
  326. else:
  327. cycles_scenarios.append(cycle)
  328. if scenarios is not None:
  329. cycles_scenarios = scenarios
  330. adapted_list = self.get_sorted_scenario_list(cycles_scenarios, sorts)
  331. adapted_list = self.get_filtered_scenario_list(adapted_list, filters)
  332. return adapted_list
  333. def select_scenario(self, state: State, id: str, payload: t.Dict[str, str]):
  334. args = payload.get("args")
  335. if args is None or not isinstance(args, list) or len(args) < 2:
  336. return
  337. state.assign(args[0], args[1])
  338. def get_scenario_by_id(self, id: str) -> t.Optional[Scenario]:
  339. if not id or not is_readable(t.cast(ScenarioId, id)):
  340. return None
  341. try:
  342. return core_get(t.cast(ScenarioId, id))
  343. except Exception:
  344. return None
  345. def get_scenario_configs(self):
  346. with self.lock:
  347. if self.scenario_configs is None:
  348. configs = Config.scenarios
  349. if isinstance(configs, dict):
  350. self.scenario_configs = [(id, f"{c.id}") for id, c in configs.items() if id != "default"]
  351. return self.scenario_configs
  352. def crud_scenario(self, state: State, id: str, payload: t.Dict[str, str]): # noqa: C901
  353. args = payload.get("args")
  354. start_idx = 2
  355. if (
  356. args is None
  357. or not isinstance(args, list)
  358. or len(args) < start_idx + 3
  359. or not isinstance(args[start_idx], bool)
  360. or not isinstance(args[start_idx + 1], bool)
  361. or not isinstance(args[start_idx + 2], dict)
  362. ):
  363. return
  364. error_var = payload.get("error_id")
  365. update = args[start_idx]
  366. delete = args[start_idx + 1]
  367. data = args[start_idx + 2]
  368. with_dialog = True if len(args) < start_idx + 4 else bool(args[start_idx + 3])
  369. scenario = None
  370. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  371. if update:
  372. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  373. if delete:
  374. if not is_deletable(scenario_id):
  375. state.assign(error_var, f"Scenario. {scenario_id} is not deletable.")
  376. return
  377. try:
  378. core_delete(scenario_id)
  379. except Exception as e:
  380. state.assign(error_var, f"Error deleting Scenario. {e}")
  381. else:
  382. if not self.__check_readable_editable(state, scenario_id, "Scenario", error_var):
  383. return
  384. scenario = core_get(scenario_id)
  385. else:
  386. if with_dialog:
  387. config_id = data.get(_GuiCoreContext.__PROP_CONFIG_ID)
  388. scenario_config = Config.scenarios.get(config_id)
  389. if with_dialog and scenario_config is None:
  390. state.assign(error_var, f"Invalid configuration id ({config_id})")
  391. return
  392. date_str = data.get(_GuiCoreContext.__PROP_DATE)
  393. try:
  394. date = parser.parse(date_str) if isinstance(date_str, str) else None
  395. except Exception as e:
  396. state.assign(error_var, f"Invalid date ({date_str}).{e}")
  397. return
  398. else:
  399. scenario_config = None
  400. date = None
  401. scenario_id = None
  402. try:
  403. gui = state.get_gui()
  404. on_creation = args[0] if isinstance(args[0], str) else None
  405. on_creation_function = gui._get_user_function(on_creation) if on_creation else None
  406. if callable(on_creation_function):
  407. try:
  408. res = gui._call_function_with_state(
  409. on_creation_function,
  410. [
  411. id,
  412. {
  413. "action": on_creation,
  414. "config": scenario_config,
  415. "date": date,
  416. "label": name,
  417. "properties": {v.get("key"): v.get("value") for v in data.get("properties", {})},
  418. },
  419. ],
  420. )
  421. if isinstance(res, Scenario):
  422. # everything's fine
  423. scenario_id = res.id
  424. state.assign(error_var, "")
  425. return
  426. if res:
  427. # do not create
  428. state.assign(error_var, f"{res}")
  429. return
  430. except Exception as e: # pragma: no cover
  431. if not gui._call_on_exception(on_creation, e):
  432. _warn(f"on_creation(): Exception raised in '{on_creation}()'", e)
  433. state.assign(
  434. error_var,
  435. f"Error creating Scenario with '{on_creation}()'. {e}",
  436. )
  437. return
  438. elif on_creation is not None:
  439. _warn(f"on_creation(): '{on_creation}' is not a function.")
  440. elif not with_dialog:
  441. if len(Config.scenarios) == 2:
  442. scenario_config = next(sc for k, sc in Config.scenarios.items() if k != "default")
  443. else:
  444. state.assign(
  445. error_var,
  446. "Error creating Scenario: only one scenario config needed "
  447. + f"({len(Config.scenarios) - 1}) found.",
  448. )
  449. return
  450. scenario = create_scenario(scenario_config, date, name)
  451. scenario_id = scenario.id
  452. except Exception as e:
  453. state.assign(error_var, f"Error creating Scenario. {e}")
  454. finally:
  455. self.scenario_refresh(scenario_id)
  456. if scenario and (sel_scenario_var := args[1] if isinstance(args[1], str) else None):
  457. try:
  458. var_name, _ = gui._get_real_var_name(sel_scenario_var)
  459. state.assign(var_name, scenario)
  460. except Exception as e: # pragma: no cover
  461. _warn("Can't find value variable name in context", e)
  462. if scenario:
  463. if not is_editable(scenario):
  464. state.assign(error_var, f"Scenario {scenario_id or name} is not editable.")
  465. return
  466. with scenario as sc:
  467. sc.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  468. if props := data.get("properties"):
  469. try:
  470. new_keys = [prop.get("key") for prop in props]
  471. for key in t.cast(dict, sc.properties).keys():
  472. if key and key not in _GuiCoreContext.__ENTITY_PROPS and key not in new_keys:
  473. t.cast(dict, sc.properties).pop(key, None)
  474. for prop in props:
  475. key = prop.get("key")
  476. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  477. sc._properties[key] = prop.get("value")
  478. state.assign(error_var, "")
  479. except Exception as e:
  480. state.assign(error_var, f"Error creating Scenario. {e}")
  481. @staticmethod
  482. def __assign_var(state: State, var_name: t.Optional[str], msg: str):
  483. if var_name:
  484. state.assign(var_name, msg)
  485. def edit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  486. args = payload.get("args")
  487. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  488. return
  489. error_var = payload.get("error_id")
  490. data = args[0]
  491. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  492. sequence = data.get("sequence")
  493. if not self.__check_readable_editable(state, entity_id, "Scenario", error_var):
  494. return
  495. scenario: Scenario = core_get(entity_id)
  496. if scenario:
  497. try:
  498. if not sequence:
  499. if isinstance(sequence, str) and (name := data.get(_GuiCoreContext.__PROP_ENTITY_NAME)):
  500. scenario.add_sequence(name, data.get("task_ids"))
  501. else:
  502. primary = data.get(_GuiCoreContext.__PROP_SCENARIO_PRIMARY)
  503. if primary is True:
  504. if not is_promotable(scenario):
  505. _GuiCoreContext.__assign_var(
  506. state, error_var, f"Scenario {entity_id} is not promotable."
  507. )
  508. return
  509. set_primary(scenario)
  510. self.__edit_properties(scenario, data)
  511. else:
  512. if data.get("del", False):
  513. scenario.remove_sequence(sequence)
  514. else:
  515. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  516. if sequence != name:
  517. scenario.rename_sequence(sequence, name)
  518. if seqEntity := scenario.sequences.get(name):
  519. seqEntity.tasks = data.get("task_ids")
  520. self.__edit_properties(seqEntity, data)
  521. else:
  522. _GuiCoreContext.__assign_var(
  523. state,
  524. error_var,
  525. f"Sequence {name} is not available in Scenario {entity_id}.",
  526. )
  527. return
  528. _GuiCoreContext.__assign_var(state, error_var, "")
  529. except Exception as e:
  530. _GuiCoreContext.__assign_var(state, error_var, f"Error updating {type(scenario).__name__}. {e}")
  531. def submit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  532. args = payload.get("args")
  533. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  534. return
  535. data = args[0]
  536. error_var = payload.get("error_id")
  537. try:
  538. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  539. entity = core_get(scenario_id)
  540. if sequence := data.get("sequence"):
  541. entity = entity.sequences.get(sequence)
  542. if not (reason := is_submittable(entity)):
  543. _GuiCoreContext.__assign_var(
  544. state,
  545. error_var,
  546. f"{'Sequence' if sequence else 'Scenario'} {sequence or scenario_id} is not submittable: "
  547. + reason.reasons,
  548. )
  549. return
  550. if entity:
  551. on_submission = data.get("on_submission_change")
  552. submission_entity = core_submit(
  553. entity,
  554. on_submission=on_submission,
  555. client_id=self.gui._get_client_id(),
  556. module_context=self.gui._get_locals_context(),
  557. )
  558. with self.submissions_lock:
  559. self.client_submission[submission_entity.id] = submission_entity.submission_status
  560. if Config.core.mode == "development":
  561. with self.submissions_lock:
  562. self.client_submission[submission_entity.id] = SubmissionStatus.SUBMITTED
  563. self.submission_status_callback(submission_entity.id)
  564. _GuiCoreContext.__assign_var(state, error_var, "")
  565. except Exception as e:
  566. _GuiCoreContext.__assign_var(state, error_var, f"Error submitting entity. {e}")
  567. def get_filtered_datanode_list(
  568. self,
  569. entities: t.List[t.Union[t.List, DataNode]],
  570. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  571. ):
  572. if not filters or not entities:
  573. return entities
  574. # filtering
  575. filtered_list = list(entities)
  576. for fd in filters:
  577. col = fd.get("col", "")
  578. col_type = fd.get("type", "no type")
  579. col_fn = cp[0] if (cp := col.split("(")) and len(cp) > 1 else None
  580. val = fd.get("value")
  581. action = fd.get("action", "")
  582. customs = CustomScenarioFilter._get_custom(col)
  583. if customs:
  584. with self.gui._set_locals_context(customs[0] or None):
  585. fn = self.gui._get_user_function(customs[1])
  586. if callable(fn):
  587. col = fn
  588. if (
  589. isinstance(col, str)
  590. and next(filter(lambda s: not s.isidentifier(), (col_fn or col).split(".")), False) is True
  591. ):
  592. _warn(f'Error filtering with "{col_fn or col}": not a valid Python identifier.')
  593. continue
  594. # level 1 filtering
  595. filtered_list = [
  596. e
  597. for e in filtered_list
  598. if not isinstance(e, DataNode) or _invoke_action(e, col, col_type, False, action, val, col_fn)
  599. ]
  600. # level 3 filtering
  601. filtered_list = [
  602. e if isinstance(e, DataNode) else self.filter_entities(d, col, col_type, False, action, val, col_fn)
  603. for e in filtered_list
  604. for d in e[2]
  605. ]
  606. # remove empty cycles
  607. return [e for e in filtered_list if isinstance(e, DataNode) or (isinstance(e, (tuple, list)) and len(e[2]))]
  608. def get_sorted_datanode_list(
  609. self,
  610. entities: t.Union[
  611. t.List[t.Union[Cycle, Scenario, DataNode]], t.List[t.Union[Scenario, DataNode]], t.List[DataNode]
  612. ],
  613. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  614. adapt_dn=False,
  615. ):
  616. if not entities:
  617. return entities
  618. if sorts:
  619. sorted_list = entities
  620. for sd in reversed(sorts):
  621. col = sd.get("col", "")
  622. order = sd.get("order", True)
  623. sorted_list = sorted(sorted_list, key=_get_entity_property(col, DataNode), reverse=not order)
  624. else:
  625. sorted_list = entities
  626. return [self.data_node_adapter(e, sorts, adapt_dn) for e in sorted_list]
  627. def __do_datanodes_tree(self):
  628. if self.data_nodes_by_owner is None:
  629. self.data_nodes_by_owner = defaultdict(list)
  630. for dn in get_data_nodes():
  631. self.data_nodes_by_owner[dn.owner_id].append(dn)
  632. def get_datanodes_tree(
  633. self,
  634. scenarios: t.Optional[t.Union[Scenario, t.List[Scenario]]],
  635. datanodes: t.Optional[t.List[DataNode]],
  636. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  637. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  638. ):
  639. base_list = []
  640. with self.lock:
  641. self.__do_datanodes_tree()
  642. if datanodes is None:
  643. if scenarios is None:
  644. base_list = (self.data_nodes_by_owner or {}).get(None, []) + (
  645. self.get_scenarios(None, None, None) or []
  646. )
  647. else:
  648. if isinstance(scenarios, (list, tuple)) and len(scenarios) > 1:
  649. base_list = scenarios
  650. else:
  651. if self.data_nodes_by_owner:
  652. owners = scenarios if isinstance(scenarios, (list, tuple)) else [scenarios]
  653. base_list = [d for owner in owners for d in (self.data_nodes_by_owner).get(owner.id, [])]
  654. else:
  655. base_list = []
  656. else:
  657. base_list = datanodes
  658. adapted_list = self.get_sorted_datanode_list(base_list, sorts)
  659. return self.get_filtered_datanode_list(adapted_list, filters)
  660. def data_node_adapter(
  661. self,
  662. data: t.Union[Cycle, Scenario, Sequence, DataNode],
  663. sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None,
  664. adapt_dn=True,
  665. ):
  666. if isinstance(data, tuple):
  667. raise NotImplementedError
  668. if isinstance(data, list):
  669. if data[2] and isinstance(data[2][0], (Cycle, Scenario, Sequence, DataNode)):
  670. data[2] = self.get_sorted_datanode_list(data[2], sorts, False)
  671. return data
  672. try:
  673. if hasattr(data, "id") and is_readable(data.id) and core_get(data.id) is not None:
  674. if isinstance(data, DataNode):
  675. return (
  676. [data.id, data.get_simple_label(), None, _EntityType.DATANODE.value, False]
  677. if adapt_dn
  678. else data
  679. )
  680. with self.lock:
  681. self.__do_datanodes_tree()
  682. if self.data_nodes_by_owner:
  683. if isinstance(data, Cycle):
  684. return [
  685. data.id,
  686. data.get_simple_label(),
  687. self.get_sorted_datanode_list(
  688. self.data_nodes_by_owner.get(data.id, [])
  689. + (self.scenario_by_cycle or {}).get(data, []),
  690. sorts,
  691. False,
  692. ),
  693. _EntityType.CYCLE.value,
  694. False,
  695. ]
  696. elif isinstance(data, Scenario):
  697. return [
  698. data.id,
  699. data.get_simple_label(),
  700. self.get_sorted_datanode_list(
  701. self.data_nodes_by_owner.get(data.id, []) + list(data.sequences.values()),
  702. sorts,
  703. False,
  704. ),
  705. _EntityType.SCENARIO.value,
  706. data.is_primary,
  707. ]
  708. elif isinstance(data, Sequence):
  709. if datanodes := self.data_nodes_by_owner.get(data.id):
  710. return [
  711. data.id,
  712. data.get_simple_label(),
  713. self.get_sorted_datanode_list(datanodes, sorts, False),
  714. _EntityType.SEQUENCE.value,
  715. ]
  716. except Exception as e:
  717. _warn(
  718. f"Access to {type(data)} ({data.id if hasattr(data, 'id') else 'No_id'}) failed",
  719. e,
  720. )
  721. return None
  722. def get_jobs_list(self):
  723. with self.lock:
  724. if self.jobs_list is None:
  725. self.jobs_list = get_jobs()
  726. return self.jobs_list
  727. def job_adapter(self, job):
  728. try:
  729. if hasattr(job, "id") and is_readable(job.id) and core_get(job.id) is not None:
  730. if isinstance(job, Job):
  731. entity = core_get(job.owner_id)
  732. return (
  733. job.id,
  734. job.get_simple_label(),
  735. [],
  736. entity.get_simple_label() if entity else "",
  737. entity.id if entity else "",
  738. job.submit_id,
  739. job.creation_date,
  740. job.status.value,
  741. is_deletable(job),
  742. is_readable(job),
  743. is_editable(job),
  744. )
  745. except Exception as e:
  746. _warn(f"Access to job ({job.id if hasattr(job, 'id') else 'No_id'}) failed", e)
  747. return None
  748. def act_on_jobs(self, state: State, id: str, payload: t.Dict[str, str]):
  749. args = payload.get("args")
  750. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  751. return
  752. data = args[0]
  753. job_ids = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  754. job_action = data.get(_GuiCoreContext.__ACTION)
  755. if job_action and isinstance(job_ids, list):
  756. errs = []
  757. if job_action == "delete":
  758. for job_id in job_ids:
  759. if not is_readable(job_id):
  760. errs.append(f"Job {job_id} is not readable.")
  761. continue
  762. if not is_deletable(job_id):
  763. errs.append(f"Job {job_id} is not deletable.")
  764. continue
  765. try:
  766. delete_job(core_get(job_id))
  767. except Exception as e:
  768. errs.append(f"Error deleting job. {e}")
  769. elif job_action == "cancel":
  770. for job_id in job_ids:
  771. if not is_readable(job_id):
  772. errs.append(f"Job {job_id} is not readable.")
  773. continue
  774. if not is_editable(job_id):
  775. errs.append(f"Job {job_id} is not cancelable.")
  776. continue
  777. try:
  778. cancel_job(job_id)
  779. except Exception as e:
  780. errs.append(f"Error canceling job. {e}")
  781. _GuiCoreContext.__assign_var(state, payload.get("error_id"), "<br/>".join(errs) if errs else "")
  782. def edit_data_node(self, state: State, id: str, payload: t.Dict[str, str]):
  783. args = payload.get("args")
  784. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  785. return
  786. error_var = payload.get("error_id")
  787. data = args[0]
  788. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  789. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  790. return
  791. entity: DataNode = core_get(entity_id)
  792. if isinstance(entity, DataNode):
  793. try:
  794. self.__edit_properties(entity, data)
  795. _GuiCoreContext.__assign_var(state, error_var, "")
  796. except Exception as e:
  797. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode. {e}")
  798. def lock_datanode_for_edit(self, state: State, id: str, payload: t.Dict[str, str]):
  799. args = payload.get("args")
  800. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  801. return
  802. data = args[0]
  803. error_var = payload.get("error_id")
  804. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  805. if not self.__check_readable_editable(state, entity_id, "Datanode", error_var):
  806. return
  807. lock = data.get("lock", True)
  808. entity: DataNode = core_get(entity_id)
  809. if isinstance(entity, DataNode):
  810. try:
  811. if lock:
  812. entity.lock_edit(self.gui._get_client_id())
  813. else:
  814. entity.unlock_edit(self.gui._get_client_id())
  815. _GuiCoreContext.__assign_var(state, error_var, "")
  816. except Exception as e:
  817. _GuiCoreContext.__assign_var(state, error_var, f"Error locking Datanode. {e}")
  818. def __edit_properties(self, entity: t.Union[Scenario, Sequence, DataNode], data: t.Dict[str, str]):
  819. with entity as ent:
  820. if isinstance(ent, Scenario):
  821. tags = data.get(_GuiCoreContext.__PROP_SCENARIO_TAGS)
  822. if isinstance(tags, (list, tuple)):
  823. ent.tags = set(tags)
  824. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  825. if isinstance(name, str):
  826. if hasattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME):
  827. setattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME, name)
  828. else:
  829. ent.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  830. props = data.get("properties")
  831. if isinstance(props, (list, tuple)):
  832. for prop in props:
  833. key = prop.get("key")
  834. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  835. ent.properties[key] = prop.get("value")
  836. deleted_props = data.get("deleted_properties")
  837. if isinstance(deleted_props, (list, tuple)):
  838. for prop in deleted_props:
  839. key = prop.get("key")
  840. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  841. ent.properties.pop(key, None)
  842. def get_scenarios_for_owner(self, owner_id: str):
  843. cycles_scenarios: t.List[t.Union[Scenario, Cycle]] = []
  844. with self.lock:
  845. if self.scenario_by_cycle is None:
  846. self.scenario_by_cycle = get_cycles_scenarios()
  847. if owner_id:
  848. if owner_id == "GLOBAL":
  849. for cycle, scenarios in self.scenario_by_cycle.items():
  850. if cycle is None:
  851. cycles_scenarios.extend(scenarios)
  852. else:
  853. cycles_scenarios.append(cycle)
  854. elif is_readable(t.cast(ScenarioId, owner_id)):
  855. entity = core_get(owner_id)
  856. if entity and (scenarios_cycle := self.scenario_by_cycle.get(t.cast(Cycle, entity))):
  857. cycles_scenarios.extend(scenarios_cycle)
  858. elif isinstance(entity, Scenario):
  859. cycles_scenarios.append(entity)
  860. return sorted(cycles_scenarios, key=_get_entity_property("creation_date", Scenario))
  861. def get_data_node_history(self, id: str):
  862. if id and (dn := core_get(id)) and isinstance(dn, DataNode):
  863. res = []
  864. for e in dn.edits:
  865. job_id = e.get("job_id")
  866. job: t.Optional[Job] = None
  867. if job_id:
  868. if not is_readable(job_id):
  869. job_id += " not readable"
  870. else:
  871. job = core_get(job_id)
  872. res.append(
  873. (
  874. e.get("timestamp"),
  875. job_id if job_id else e.get("writer_identifier", ""),
  876. f"Execution of task {job.task.get_simple_label()}."
  877. if job and job.task
  878. else e.get("comment", ""),
  879. )
  880. )
  881. return sorted(res, key=lambda r: r[0], reverse=True)
  882. return _DoNotUpdate()
  883. def __check_readable_editable(self, state: State, id: str, ent_type: str, var: t.Optional[str]):
  884. if not is_readable(t.cast(ScenarioId, id)):
  885. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not readable.")
  886. return False
  887. if not is_editable(t.cast(ScenarioId, id)):
  888. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not editable.")
  889. return False
  890. return True
  891. def update_data(self, state: State, id: str, payload: t.Dict[str, str]):
  892. args = payload.get("args")
  893. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  894. return
  895. data = args[0]
  896. error_var = payload.get("error_id")
  897. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  898. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  899. return
  900. entity: DataNode = core_get(entity_id)
  901. if isinstance(entity, DataNode):
  902. try:
  903. entity.write(
  904. parser.parse(data.get("value"))
  905. if data.get("type") == "date"
  906. else int(data.get("value"))
  907. if data.get("type") == "int"
  908. else float(data.get("value"))
  909. if data.get("type") == "float"
  910. else data.get("value"),
  911. comment=data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT),
  912. )
  913. entity.unlock_edit(self.gui._get_client_id())
  914. _GuiCoreContext.__assign_var(state, error_var, "")
  915. except Exception as e:
  916. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode value. {e}")
  917. _GuiCoreContext.__assign_var(state, payload.get("data_id"), entity_id) # this will update the data value
  918. def tabular_data_edit(self, state: State, var_name: str, payload: dict):
  919. error_var = payload.get("error_id")
  920. user_data = payload.get("user_data", {})
  921. dn_id = user_data.get("dn_id")
  922. if not self.__check_readable_editable(state, dn_id, "DataNode", error_var):
  923. return
  924. datanode = core_get(dn_id) if dn_id else None
  925. if isinstance(datanode, DataNode):
  926. try:
  927. idx = t.cast(int, payload.get("index"))
  928. col = t.cast(str, payload.get("col"))
  929. tz = payload.get("tz")
  930. val = (
  931. parser.parse(str(payload.get("value"))).astimezone(zoneinfo.ZoneInfo(tz)).replace(tzinfo=None)
  932. if tz is not None
  933. else payload.get("value")
  934. )
  935. # user_value = payload.get("user_value")
  936. data = self.__read_tabular_data(datanode)
  937. new_data: t.Any = None
  938. if isinstance(data, (pd.DataFrame, pd.Series)):
  939. if isinstance(data, pd.DataFrame):
  940. data.at[idx, col] = val
  941. elif isinstance(data, pd.Series):
  942. data.at[idx] = val
  943. new_data = data
  944. else:
  945. data_tuple = False
  946. if isinstance(data, tuple):
  947. data_tuple = True
  948. data = list(data)
  949. if isinstance(data, list):
  950. row = data[idx]
  951. row_tuple = False
  952. if isinstance(row, tuple):
  953. row = list(row)
  954. row_tuple = True
  955. if isinstance(row, list):
  956. row[int(col)] = val
  957. if row_tuple:
  958. data[idx] = tuple(row)
  959. new_data = data
  960. elif col == "0" and (isinstance(row, (str, Number)) or "date" in type(row).__name__):
  961. data[idx] = val
  962. new_data = data
  963. else:
  964. _GuiCoreContext.__assign_var(
  965. state,
  966. error_var,
  967. "Error updating Datanode: cannot handle multi-column list value.",
  968. )
  969. if data_tuple and new_data is not None:
  970. new_data = tuple(new_data)
  971. else:
  972. _GuiCoreContext.__assign_var(
  973. state,
  974. error_var,
  975. "Error updating Datanode tabular value: type does not support at[] indexer.",
  976. )
  977. if new_data is not None:
  978. datanode.write(new_data, comment=user_data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT))
  979. _GuiCoreContext.__assign_var(state, error_var, "")
  980. except Exception as e:
  981. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode tabular value. {e}")
  982. _GuiCoreContext.__assign_var(state, payload.get("data_id"), dn_id)
  983. def get_data_node_properties(self, id: str):
  984. if id and is_readable(t.cast(DataNodeId, id)) and (dn := core_get(id)) and isinstance(dn, DataNode):
  985. try:
  986. return (
  987. (
  988. (k, f"{v}")
  989. for k, v in dn._get_user_properties().items()
  990. if k != _GuiCoreContext.__PROP_ENTITY_NAME
  991. ),
  992. )
  993. except Exception:
  994. return None
  995. return None
  996. def __read_tabular_data(self, datanode: DataNode):
  997. return datanode.read()
  998. def get_data_node_tabular_data(self, id: str):
  999. if (
  1000. id
  1001. and is_readable(t.cast(DataNodeId, id))
  1002. and (dn := core_get(id))
  1003. and isinstance(dn, DataNode)
  1004. and dn.is_ready_for_reading
  1005. ):
  1006. try:
  1007. value = self.__read_tabular_data(dn)
  1008. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  1009. return value
  1010. except Exception:
  1011. return None
  1012. return None
  1013. def get_data_node_tabular_columns(self, id: str):
  1014. if (
  1015. id
  1016. and is_readable(t.cast(DataNodeId, id))
  1017. and (dn := core_get(id))
  1018. and isinstance(dn, DataNode)
  1019. and dn.is_ready_for_reading
  1020. ):
  1021. try:
  1022. value = self.__read_tabular_data(dn)
  1023. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  1024. return self.gui._tbl_cols(
  1025. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=value
  1026. )
  1027. except Exception:
  1028. return None
  1029. return None
  1030. def get_data_node_chart_config(self, id: str):
  1031. if (
  1032. id
  1033. and is_readable(t.cast(DataNodeId, id))
  1034. and (dn := core_get(id))
  1035. and isinstance(dn, DataNode)
  1036. and dn.is_ready_for_reading
  1037. ):
  1038. try:
  1039. return self.gui._chart_conf(
  1040. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=self.__read_tabular_data(dn)
  1041. )
  1042. except Exception:
  1043. return None
  1044. return None
  1045. def on_dag_select(self, state: State, id: str, payload: t.Dict[str, str]):
  1046. args = payload.get("args")
  1047. if args is None or not isinstance(args, list) or len(args) < 2:
  1048. return
  1049. on_action_function = self.gui._get_user_function(args[1]) if args[1] else None
  1050. if callable(on_action_function):
  1051. try:
  1052. entity = core_get(args[0]) if is_readable(args[0]) else f"unredable({args[0]})"
  1053. self.gui._call_function_with_state(
  1054. on_action_function,
  1055. [entity],
  1056. )
  1057. except Exception as e:
  1058. if not self.gui._call_on_exception(args[1], e):
  1059. _warn(f"dag.on_action(): Exception raised in '{args[1]}()' with '{args[0]}'", e)
  1060. elif args[1]:
  1061. _warn(f"dag.on_action(): Invalid function '{args[1]}()'.")
  1062. def get_creation_reason(self):
  1063. return "" if (reason := can_create()) else f"Cannot create scenario: {reason.reasons}"