_context.py 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import json
  12. import typing as t
  13. from collections import defaultdict
  14. from numbers import Number
  15. from threading import Lock
  16. try:
  17. import zoneinfo
  18. except ImportError:
  19. from backports import zoneinfo # type: ignore[no-redef]
  20. import pandas as pd
  21. from dateutil import parser
  22. from taipy.config import Config
  23. from taipy.core import (
  24. Cycle,
  25. DataNode,
  26. DataNodeId,
  27. Job,
  28. Scenario,
  29. ScenarioId,
  30. Sequence,
  31. SequenceId,
  32. Submission,
  33. SubmissionId,
  34. cancel_job,
  35. create_scenario,
  36. delete_job,
  37. get_cycles_scenarios,
  38. get_data_nodes,
  39. get_jobs,
  40. is_deletable,
  41. is_editable,
  42. is_promotable,
  43. is_readable,
  44. is_submittable,
  45. set_primary,
  46. )
  47. from taipy.core import delete as core_delete
  48. from taipy.core import get as core_get
  49. from taipy.core import submit as core_submit
  50. from taipy.core.notification import CoreEventConsumerBase, EventEntityType
  51. from taipy.core.notification.event import Event, EventOperation
  52. from taipy.core.notification.notifier import Notifier
  53. from taipy.core.submission.submission_status import SubmissionStatus
  54. from taipy.core.taipy import can_create
  55. from taipy.gui import Gui, State
  56. from taipy.gui._warnings import _warn
  57. from taipy.gui.gui import _DoNotUpdate
  58. from ._adapters import (
  59. CustomScenarioFilter,
  60. _EntityType,
  61. _get_entity_property,
  62. _GuiCoreDatanodeAdapter,
  63. _GuiCoreScenarioProperties,
  64. _invoke_action,
  65. )
  66. class _GuiCoreContext(CoreEventConsumerBase):
  67. __PROP_ENTITY_ID = "id"
  68. __PROP_ENTITY_COMMENT = "comment"
  69. __PROP_CONFIG_ID = "config"
  70. __PROP_DATE = "date"
  71. __PROP_ENTITY_NAME = "name"
  72. __PROP_SCENARIO_PRIMARY = "primary"
  73. __PROP_SCENARIO_TAGS = "tags"
  74. __ENTITY_PROPS = (__PROP_CONFIG_ID, __PROP_DATE, __PROP_ENTITY_NAME)
  75. __ACTION = "action"
  76. _CORE_CHANGED_NAME = "core_changed"
  77. def __init__(self, gui: Gui) -> None:
  78. self.gui = gui
  79. self.scenario_by_cycle: t.Optional[t.Dict[t.Optional[Cycle], t.List[Scenario]]] = None
  80. self.data_nodes_by_owner: t.Optional[t.Dict[t.Optional[str], t.List[DataNode]]] = None
  81. self.scenario_configs: t.Optional[t.List[t.Tuple[str, str]]] = None
  82. self.jobs_list: t.Optional[t.List[Job]] = None
  83. self.client_submission: t.Dict[str, SubmissionStatus] = {}
  84. # register to taipy core notification
  85. reg_id, reg_queue = Notifier.register()
  86. # locks
  87. self.lock = Lock()
  88. self.submissions_lock = Lock()
  89. # lazy_start
  90. self.__started = False
  91. # super
  92. super().__init__(reg_id, reg_queue)
  93. def __lazy_start(self):
  94. if self.__started:
  95. return
  96. self.__started = True
  97. self.start()
  98. def process_event(self, event: Event):
  99. self.__lazy_start()
  100. if event.entity_type == EventEntityType.SCENARIO:
  101. with self.gui._get_autorization(system=True):
  102. self.scenario_refresh(
  103. event.entity_id
  104. if event.operation == EventOperation.DELETION or is_readable(t.cast(ScenarioId, event.entity_id))
  105. else None
  106. )
  107. elif event.entity_type == EventEntityType.SEQUENCE and event.entity_id:
  108. sequence = None
  109. try:
  110. with self.gui._get_autorization(system=True):
  111. sequence = (
  112. core_get(event.entity_id)
  113. if event.operation != EventOperation.DELETION
  114. and is_readable(t.cast(SequenceId, event.entity_id))
  115. else None
  116. )
  117. if sequence and hasattr(sequence, "parent_ids") and sequence.parent_ids: # type: ignore
  118. self.broadcast_core_changed({"scenario": list(sequence.parent_ids)})
  119. except Exception as e:
  120. _warn(f"Access to sequence {event.entity_id} failed", e)
  121. elif event.entity_type == EventEntityType.JOB:
  122. with self.lock:
  123. self.jobs_list = None
  124. self.broadcast_core_changed({"jobs": event.entity_id})
  125. elif event.entity_type == EventEntityType.SUBMISSION:
  126. self.submission_status_callback(event.entity_id, event)
  127. elif event.entity_type == EventEntityType.DATA_NODE:
  128. with self.lock:
  129. self.data_nodes_by_owner = None
  130. self.broadcast_core_changed(
  131. {"datanode": event.entity_id if event.operation != EventOperation.DELETION else True}
  132. )
  133. def broadcast_core_changed(self, payload: t.Dict[str, t.Any], client_id: t.Optional[str] = None):
  134. self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, payload, client_id)
  135. def scenario_refresh(self, scenario_id: t.Optional[str]):
  136. with self.lock:
  137. self.scenario_by_cycle = None
  138. self.data_nodes_by_owner = None
  139. self.broadcast_core_changed({"scenario": scenario_id or True})
  140. def submission_status_callback(self, submission_id: t.Optional[str] = None, event: t.Optional[Event] = None):
  141. if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
  142. return
  143. submission = None
  144. new_status = None
  145. payload: t.Optional[t.Dict[str, t.Any]] = None
  146. client_id: t.Optional[str] = None
  147. try:
  148. last_status = self.client_submission.get(submission_id)
  149. if not last_status:
  150. return
  151. submission = t.cast(Submission, core_get(submission_id))
  152. if not submission or not submission.entity_id:
  153. return
  154. payload = {}
  155. new_status = t.cast(SubmissionStatus, submission.submission_status)
  156. client_id = submission.properties.get("client_id")
  157. if client_id:
  158. running_tasks = {}
  159. with self.gui._get_autorization(client_id):
  160. for job in submission.jobs:
  161. job = job if isinstance(job, Job) else core_get(job)
  162. running_tasks[job.task.id] = (
  163. SubmissionStatus.RUNNING.value
  164. if job.is_running()
  165. else SubmissionStatus.PENDING.value
  166. if job.is_pending()
  167. else None
  168. )
  169. payload.update(tasks=running_tasks)
  170. if last_status != new_status:
  171. # callback
  172. submission_name = submission.properties.get("on_submission")
  173. if submission_name:
  174. self.gui.invoke_callback(
  175. client_id,
  176. submission_name,
  177. [
  178. core_get(submission.id),
  179. {
  180. "submission_status": new_status.name,
  181. "submittable_entity": core_get(submission.entity_id),
  182. **(event.metadata if event else {}),
  183. },
  184. ],
  185. submission.properties.get("module_context"),
  186. )
  187. with self.submissions_lock:
  188. if new_status in (
  189. SubmissionStatus.COMPLETED,
  190. SubmissionStatus.FAILED,
  191. SubmissionStatus.CANCELED,
  192. ):
  193. self.client_submission.pop(submission_id, None)
  194. else:
  195. self.client_submission[submission_id] = new_status
  196. except Exception as e:
  197. _warn(f"Submission ({submission_id}) is not available", e)
  198. finally:
  199. if payload is not None:
  200. payload.update(jobs=True)
  201. entity_id = submission.entity_id if submission else None
  202. if entity_id:
  203. payload.update(scenario=entity_id)
  204. if new_status:
  205. payload.update(submission=new_status.value)
  206. self.broadcast_core_changed(payload, client_id)
  207. def no_change_adapter(self, entity: t.List):
  208. return entity
  209. def cycle_adapter(self, cycle: Cycle, sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None):
  210. self.__lazy_start()
  211. try:
  212. if (
  213. isinstance(cycle, Cycle)
  214. and is_readable(cycle.id)
  215. and core_get(cycle.id) is not None
  216. and self.scenario_by_cycle
  217. ):
  218. return [
  219. cycle.id,
  220. cycle.get_simple_label(),
  221. self.get_sorted_scenario_list(self.scenario_by_cycle.get(cycle, []), sorts),
  222. _EntityType.CYCLE.value,
  223. False,
  224. ]
  225. except Exception as e:
  226. _warn(
  227. f"Access to {type(cycle).__name__} " + f"({cycle.id if hasattr(cycle, 'id') else 'No_id'})" + " failed",
  228. e,
  229. )
  230. return None
  231. def scenario_adapter(self, scenario: Scenario):
  232. self.__lazy_start()
  233. if isinstance(scenario, (tuple, list)):
  234. return scenario
  235. try:
  236. if isinstance(scenario, Scenario) and is_readable(scenario.id) and core_get(scenario.id) is not None:
  237. return [
  238. scenario.id,
  239. scenario.get_simple_label(),
  240. None,
  241. _EntityType.SCENARIO.value,
  242. scenario.is_primary,
  243. ]
  244. except Exception as e:
  245. _warn(
  246. f"Access to {type(scenario).__name__} "
  247. + f"({scenario.id if hasattr(scenario, 'id') else 'No_id'})"
  248. + " failed",
  249. e,
  250. )
  251. return None
  252. def filter_entities(
  253. self, cycle_scenario: t.List, col: str, col_type: str, is_dn: bool, action: str, val: t.Any, col_fn=None
  254. ):
  255. cycle_scenario[2] = [
  256. e for e in cycle_scenario[2] if _invoke_action(e, col, col_type, is_dn, action, val, col_fn)
  257. ]
  258. return cycle_scenario
  259. def adapt_scenarios(self, cycle: t.List):
  260. cycle[2] = [self.scenario_adapter(e) for e in cycle[2]]
  261. return cycle
  262. def get_sorted_scenario_list(
  263. self,
  264. entities: t.Union[t.List[t.Union[Cycle, Scenario]], t.List[Scenario]],
  265. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  266. ):
  267. if sorts:
  268. sorted_list = entities
  269. for sd in reversed(sorts):
  270. col = sd.get("col", "")
  271. order = sd.get("order", True)
  272. sorted_list = sorted(sorted_list, key=_get_entity_property(col, Scenario, Cycle), reverse=not order)
  273. else:
  274. sorted_list = sorted(entities, key=_get_entity_property("creation_date", Scenario, Cycle))
  275. return [self.cycle_adapter(e, sorts) if isinstance(e, Cycle) else e for e in sorted_list]
  276. def get_filtered_scenario_list(
  277. self,
  278. entities: t.List[t.Union[t.List, Scenario]],
  279. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  280. ):
  281. if not filters:
  282. return entities
  283. # filtering
  284. filtered_list = list(entities)
  285. for fd in filters:
  286. col = fd.get("col", "")
  287. is_datanode_prop = _GuiCoreScenarioProperties.is_datanode_property(col)
  288. col_type = fd.get("type", "no type")
  289. col_fn = cp[0] if (cp := col.split("(")) and len(cp) > 1 else None
  290. val = fd.get("value")
  291. action = fd.get("action", "")
  292. customs = CustomScenarioFilter._get_custom(col)
  293. if customs:
  294. with self.gui._set_locals_context(customs[0] or None):
  295. fn = self.gui._get_user_function(customs[1])
  296. if callable(fn):
  297. col = fn
  298. if (
  299. isinstance(col, str)
  300. and next(filter(lambda s: not s.isidentifier(), (col_fn or col).split(".")), False) is True
  301. ):
  302. _warn(f'Error filtering with "{col_fn or col}": not a valid Python identifier.')
  303. continue
  304. # level 1 filtering
  305. filtered_list = [
  306. e
  307. for e in filtered_list
  308. if not isinstance(e, Scenario)
  309. or _invoke_action(e, col, col_type, is_datanode_prop, action, val, col_fn)
  310. ]
  311. # level 2 filtering
  312. filtered_list = [
  313. e
  314. if isinstance(e, Scenario)
  315. else self.filter_entities(e, col, col_type, is_datanode_prop, action, val, col_fn)
  316. for e in filtered_list
  317. ]
  318. # remove empty cycles
  319. return [e for e in filtered_list if isinstance(e, Scenario) or (isinstance(e, (tuple, list)) and len(e[2]))]
  320. def get_scenarios(
  321. self,
  322. scenarios: t.Optional[t.List[t.Union[Cycle, Scenario]]],
  323. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  324. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  325. ):
  326. self.__lazy_start()
  327. cycles_scenarios: t.List[t.Union[Cycle, Scenario]] = []
  328. with self.lock:
  329. # always needed to get scenarios for a cycle in cycle_adapter
  330. if self.scenario_by_cycle is None:
  331. self.scenario_by_cycle = get_cycles_scenarios()
  332. if scenarios is None:
  333. for cycle, c_scenarios in self.scenario_by_cycle.items():
  334. if cycle is None:
  335. cycles_scenarios.extend(c_scenarios)
  336. else:
  337. cycles_scenarios.append(cycle)
  338. if scenarios is not None:
  339. cycles_scenarios = scenarios
  340. adapted_list = self.get_sorted_scenario_list(cycles_scenarios, sorts)
  341. adapted_list = self.get_filtered_scenario_list(adapted_list, filters)
  342. return adapted_list
  343. def select_scenario(self, state: State, id: str, payload: t.Dict[str, str]):
  344. self.__lazy_start()
  345. args = payload.get("args")
  346. if args is None or not isinstance(args, list) or len(args) < 2:
  347. return
  348. state.assign(args[0], args[1])
  349. def get_scenario_by_id(self, id: str) -> t.Optional[Scenario]:
  350. self.__lazy_start()
  351. if not id or not is_readable(t.cast(ScenarioId, id)):
  352. return None
  353. try:
  354. return core_get(t.cast(ScenarioId, id))
  355. except Exception:
  356. return None
  357. def get_scenario_configs(self):
  358. self.__lazy_start()
  359. with self.lock:
  360. if self.scenario_configs is None:
  361. configs = Config.scenarios
  362. if isinstance(configs, dict):
  363. self.scenario_configs = [(id, f"{c.id}") for id, c in configs.items() if id != "default"]
  364. return self.scenario_configs
  365. def crud_scenario(self, state: State, id: str, payload: t.Dict[str, str]): # noqa: C901
  366. self.__lazy_start()
  367. args = payload.get("args")
  368. start_idx = 2
  369. if (
  370. args is None
  371. or not isinstance(args, list)
  372. or len(args) < start_idx + 3
  373. or not isinstance(args[start_idx], bool)
  374. or not isinstance(args[start_idx + 1], bool)
  375. or not isinstance(args[start_idx + 2], dict)
  376. ):
  377. return
  378. error_var = payload.get("error_id")
  379. update = args[start_idx]
  380. delete = args[start_idx + 1]
  381. data = args[start_idx + 2]
  382. with_dialog = True if len(args) < start_idx + 4 else bool(args[start_idx + 3])
  383. scenario = None
  384. user_scenario = None
  385. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  386. if update:
  387. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  388. if delete:
  389. if not is_deletable(scenario_id):
  390. state.assign(error_var, f"Scenario. {scenario_id} is not deletable.")
  391. return
  392. try:
  393. core_delete(scenario_id)
  394. except Exception as e:
  395. state.assign(error_var, f"Error deleting Scenario. {e}")
  396. else:
  397. if not self.__check_readable_editable(state, scenario_id, "Scenario", error_var):
  398. return
  399. scenario = core_get(scenario_id)
  400. else:
  401. if with_dialog:
  402. config_id = data.get(_GuiCoreContext.__PROP_CONFIG_ID)
  403. scenario_config = Config.scenarios.get(config_id)
  404. if with_dialog and scenario_config is None:
  405. state.assign(error_var, f"Invalid configuration id ({config_id})")
  406. return
  407. date_str = data.get(_GuiCoreContext.__PROP_DATE)
  408. try:
  409. date = parser.parse(date_str) if isinstance(date_str, str) else None
  410. except Exception as e:
  411. state.assign(error_var, f"Invalid date ({date_str}).{e}")
  412. return
  413. else:
  414. scenario_config = None
  415. date = None
  416. scenario_id = None
  417. try:
  418. gui = state.get_gui()
  419. on_creation = args[0] if isinstance(args[0], str) else None
  420. on_creation_function = gui._get_user_function(on_creation) if on_creation else None
  421. if callable(on_creation_function):
  422. try:
  423. res = gui._call_function_with_state(
  424. on_creation_function,
  425. [
  426. id,
  427. {
  428. "action": on_creation,
  429. "config": scenario_config,
  430. "date": date,
  431. "label": name,
  432. "properties": {v.get("key"): v.get("value") for v in data.get("properties", {})},
  433. },
  434. ],
  435. )
  436. if isinstance(res, Scenario):
  437. # everything's fine
  438. user_scenario = res
  439. scenario_id = user_scenario.id
  440. state.assign(error_var, "")
  441. return
  442. if res:
  443. # do not create
  444. state.assign(error_var, f"{res}")
  445. return
  446. except Exception as e: # pragma: no cover
  447. if not gui._call_on_exception(on_creation, e):
  448. _warn(f"on_creation(): Exception raised in '{on_creation}()'", e)
  449. state.assign(
  450. error_var,
  451. f"Error creating Scenario with '{on_creation}()'. {e}",
  452. )
  453. return
  454. elif on_creation is not None:
  455. _warn(f"on_creation(): '{on_creation}' is not a function.")
  456. elif not with_dialog:
  457. if len(Config.scenarios) == 2:
  458. scenario_config = next(sc for k, sc in Config.scenarios.items() if k != "default")
  459. else:
  460. state.assign(
  461. error_var,
  462. "Error creating Scenario: only one scenario config needed "
  463. + f"({len(Config.scenarios) - 1}) found.",
  464. )
  465. return
  466. scenario = create_scenario(scenario_config, date, name)
  467. scenario_id = scenario.id
  468. except Exception as e:
  469. state.assign(error_var, f"Error creating Scenario. {e}")
  470. finally:
  471. self.scenario_refresh(scenario_id)
  472. if (scenario or user_scenario) and (sel_scenario_var := args[1] if isinstance(args[1], str) else None):
  473. try:
  474. var_name, _ = gui._get_real_var_name(sel_scenario_var)
  475. state.assign(var_name, scenario or user_scenario)
  476. except Exception as e: # pragma: no cover
  477. _warn("Can't find value variable name in context", e)
  478. if scenario:
  479. if not is_editable(scenario):
  480. state.assign(error_var, f"Scenario {scenario_id or name} is not editable.")
  481. return
  482. with scenario as sc:
  483. sc.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  484. if props := data.get("properties"):
  485. try:
  486. new_keys = [prop.get("key") for prop in props]
  487. for key in t.cast(dict, sc.properties).keys():
  488. if key and key not in _GuiCoreContext.__ENTITY_PROPS and key not in new_keys:
  489. t.cast(dict, sc.properties).pop(key, None)
  490. for prop in props:
  491. key = prop.get("key")
  492. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  493. sc._properties[key] = prop.get("value")
  494. state.assign(error_var, "")
  495. except Exception as e:
  496. state.assign(error_var, f"Error creating Scenario. {e}")
  497. @staticmethod
  498. def __assign_var(state: State, var_name: t.Optional[str], msg: str):
  499. if var_name:
  500. state.assign(var_name, msg)
  501. def edit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  502. self.__lazy_start()
  503. args = payload.get("args")
  504. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  505. return
  506. error_var = payload.get("error_id")
  507. data = args[0]
  508. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  509. sequence = data.get("sequence")
  510. if not self.__check_readable_editable(state, entity_id, "Scenario", error_var):
  511. return
  512. scenario: Scenario = core_get(entity_id)
  513. if scenario:
  514. try:
  515. if not sequence:
  516. if isinstance(sequence, str) and (name := data.get(_GuiCoreContext.__PROP_ENTITY_NAME)):
  517. scenario.add_sequence(name, data.get("task_ids"))
  518. else:
  519. primary = data.get(_GuiCoreContext.__PROP_SCENARIO_PRIMARY)
  520. if primary is True:
  521. if not is_promotable(scenario):
  522. _GuiCoreContext.__assign_var(
  523. state, error_var, f"Scenario {entity_id} is not promotable."
  524. )
  525. return
  526. set_primary(scenario)
  527. self.__edit_properties(scenario, data)
  528. else:
  529. if data.get("del", False):
  530. scenario.remove_sequence(sequence)
  531. else:
  532. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  533. if sequence != name:
  534. scenario.rename_sequence(sequence, name)
  535. if seqEntity := scenario.sequences.get(name):
  536. seqEntity.tasks = data.get("task_ids")
  537. self.__edit_properties(seqEntity, data)
  538. else:
  539. _GuiCoreContext.__assign_var(
  540. state,
  541. error_var,
  542. f"Sequence {name} is not available in Scenario {entity_id}.",
  543. )
  544. return
  545. _GuiCoreContext.__assign_var(state, error_var, "")
  546. except Exception as e:
  547. _GuiCoreContext.__assign_var(state, error_var, f"Error updating {type(scenario).__name__}. {e}")
  548. def submit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
  549. self.__lazy_start()
  550. args = payload.get("args")
  551. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  552. return
  553. data = args[0]
  554. error_var = payload.get("error_id")
  555. try:
  556. scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  557. entity = core_get(scenario_id)
  558. if sequence := data.get("sequence"):
  559. entity = entity.sequences.get(sequence)
  560. if not (reason := is_submittable(entity)):
  561. _GuiCoreContext.__assign_var(
  562. state,
  563. error_var,
  564. f"{'Sequence' if sequence else 'Scenario'} {sequence or scenario_id} is not submittable: "
  565. + reason.reasons,
  566. )
  567. return
  568. if entity:
  569. on_submission = data.get("on_submission_change")
  570. submission_entity = core_submit(
  571. entity,
  572. on_submission=on_submission,
  573. client_id=self.gui._get_client_id(),
  574. module_context=self.gui._get_locals_context(),
  575. )
  576. with self.submissions_lock:
  577. self.client_submission[submission_entity.id] = submission_entity.submission_status
  578. if Config.core.mode == "development":
  579. with self.submissions_lock:
  580. self.client_submission[submission_entity.id] = SubmissionStatus.SUBMITTED
  581. self.submission_status_callback(submission_entity.id)
  582. _GuiCoreContext.__assign_var(state, error_var, "")
  583. except Exception as e:
  584. _GuiCoreContext.__assign_var(state, error_var, f"Error submitting entity. {e}")
  585. def get_filtered_datanode_list(
  586. self,
  587. entities: t.List[t.Union[t.List, DataNode]],
  588. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  589. ):
  590. if not filters or not entities:
  591. return entities
  592. # filtering
  593. filtered_list = list(entities)
  594. for fd in filters:
  595. col = fd.get("col", "")
  596. col_type = fd.get("type", "no type")
  597. col_fn = cp[0] if (cp := col.split("(")) and len(cp) > 1 else None
  598. val = fd.get("value")
  599. action = fd.get("action", "")
  600. customs = CustomScenarioFilter._get_custom(col)
  601. if customs:
  602. with self.gui._set_locals_context(customs[0] or None):
  603. fn = self.gui._get_user_function(customs[1])
  604. if callable(fn):
  605. col = fn
  606. if (
  607. isinstance(col, str)
  608. and next(filter(lambda s: not s.isidentifier(), (col_fn or col).split(".")), False) is True
  609. ):
  610. _warn(f'Error filtering with "{col_fn or col}": not a valid Python identifier.')
  611. continue
  612. # level 1 filtering
  613. filtered_list = [
  614. e
  615. for e in filtered_list
  616. if not isinstance(e, DataNode) or _invoke_action(e, col, col_type, False, action, val, col_fn)
  617. ]
  618. # level 3 filtering
  619. filtered_list = [
  620. e if isinstance(e, DataNode) else self.filter_entities(d, col, col_type, False, action, val, col_fn)
  621. for e in filtered_list
  622. for d in e[2]
  623. ]
  624. # remove empty cycles
  625. return [e for e in filtered_list if isinstance(e, DataNode) or (isinstance(e, (tuple, list)) and len(e[2]))]
  626. def get_sorted_datanode_list(
  627. self,
  628. entities: t.Union[
  629. t.List[t.Union[Cycle, Scenario, DataNode]], t.List[t.Union[Scenario, DataNode]], t.List[DataNode]
  630. ],
  631. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  632. adapt_dn=False,
  633. ):
  634. if not entities:
  635. return entities
  636. if sorts:
  637. sorted_list = entities
  638. for sd in reversed(sorts):
  639. col = sd.get("col", "")
  640. order = sd.get("order", True)
  641. sorted_list = sorted(sorted_list, key=_get_entity_property(col, DataNode), reverse=not order)
  642. else:
  643. sorted_list = entities
  644. return [self.data_node_adapter(e, sorts, adapt_dn) for e in sorted_list]
  645. def __do_datanodes_tree(self):
  646. if self.data_nodes_by_owner is None:
  647. self.data_nodes_by_owner = defaultdict(list)
  648. for dn in get_data_nodes():
  649. self.data_nodes_by_owner[dn.owner_id].append(dn)
  650. def get_datanodes_tree(
  651. self,
  652. scenarios: t.Optional[t.Union[Scenario, t.List[Scenario]]],
  653. datanodes: t.Optional[t.List[DataNode]],
  654. filters: t.Optional[t.List[t.Dict[str, t.Any]]],
  655. sorts: t.Optional[t.List[t.Dict[str, t.Any]]],
  656. ):
  657. self.__lazy_start()
  658. base_list = []
  659. with self.lock:
  660. self.__do_datanodes_tree()
  661. if datanodes is None:
  662. if scenarios is None:
  663. base_list = (self.data_nodes_by_owner or {}).get(None, []) + (
  664. self.get_scenarios(None, None, None) or []
  665. )
  666. else:
  667. if isinstance(scenarios, (list, tuple)) and len(scenarios) > 1:
  668. base_list = scenarios
  669. else:
  670. if self.data_nodes_by_owner:
  671. owners = scenarios if isinstance(scenarios, (list, tuple)) else [scenarios]
  672. base_list = [d for owner in owners for d in (self.data_nodes_by_owner).get(owner.id, [])]
  673. else:
  674. base_list = []
  675. else:
  676. base_list = datanodes
  677. adapted_list = self.get_sorted_datanode_list(base_list, sorts)
  678. return self.get_filtered_datanode_list(adapted_list, filters)
  679. def data_node_adapter(
  680. self,
  681. data: t.Union[Cycle, Scenario, Sequence, DataNode],
  682. sorts: t.Optional[t.List[t.Dict[str, t.Any]]] = None,
  683. adapt_dn=True,
  684. ):
  685. self.__lazy_start()
  686. if isinstance(data, tuple):
  687. raise NotImplementedError
  688. if isinstance(data, list):
  689. if data[2] and isinstance(data[2][0], (Cycle, Scenario, Sequence, DataNode)):
  690. data[2] = self.get_sorted_datanode_list(data[2], sorts, False)
  691. return data
  692. try:
  693. if hasattr(data, "id") and is_readable(data.id) and core_get(data.id) is not None:
  694. if isinstance(data, DataNode):
  695. return (
  696. [data.id, data.get_simple_label(), None, _EntityType.DATANODE.value, False]
  697. if adapt_dn
  698. else data
  699. )
  700. with self.lock:
  701. self.__do_datanodes_tree()
  702. if self.data_nodes_by_owner:
  703. if isinstance(data, Cycle):
  704. return [
  705. data.id,
  706. data.get_simple_label(),
  707. self.get_sorted_datanode_list(
  708. self.data_nodes_by_owner.get(data.id, [])
  709. + (self.scenario_by_cycle or {}).get(data, []),
  710. sorts,
  711. False,
  712. ),
  713. _EntityType.CYCLE.value,
  714. False,
  715. ]
  716. elif isinstance(data, Scenario):
  717. return [
  718. data.id,
  719. data.get_simple_label(),
  720. self.get_sorted_datanode_list(
  721. self.data_nodes_by_owner.get(data.id, []) + list(data.sequences.values()),
  722. sorts,
  723. False,
  724. ),
  725. _EntityType.SCENARIO.value,
  726. data.is_primary,
  727. ]
  728. elif isinstance(data, Sequence):
  729. if datanodes := self.data_nodes_by_owner.get(data.id):
  730. return [
  731. data.id,
  732. data.get_simple_label(),
  733. self.get_sorted_datanode_list(datanodes, sorts, False),
  734. _EntityType.SEQUENCE.value,
  735. ]
  736. except Exception as e:
  737. _warn(
  738. f"Access to {type(data)} ({data.id if hasattr(data, 'id') else 'No_id'}) failed",
  739. e,
  740. )
  741. return None
  742. def get_jobs_list(self):
  743. self.__lazy_start()
  744. with self.lock:
  745. if self.jobs_list is None:
  746. self.jobs_list = get_jobs()
  747. return self.jobs_list
  748. def job_adapter(self, job):
  749. self.__lazy_start()
  750. try:
  751. if hasattr(job, "id") and is_readable(job.id) and core_get(job.id) is not None:
  752. if isinstance(job, Job):
  753. entity = core_get(job.owner_id)
  754. return (
  755. job.id,
  756. job.get_simple_label(),
  757. [],
  758. entity.get_simple_label() if entity else "",
  759. entity.id if entity else "",
  760. job.submit_id,
  761. job.creation_date,
  762. job.status.value,
  763. is_deletable(job),
  764. is_readable(job),
  765. is_editable(job),
  766. )
  767. except Exception as e:
  768. _warn(f"Access to job ({job.id if hasattr(job, 'id') else 'No_id'}) failed", e)
  769. return None
  770. def act_on_jobs(self, state: State, id: str, payload: t.Dict[str, str]):
  771. self.__lazy_start()
  772. args = payload.get("args")
  773. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  774. return
  775. data = args[0]
  776. job_ids = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  777. job_action = data.get(_GuiCoreContext.__ACTION)
  778. if job_action and isinstance(job_ids, list):
  779. errs = []
  780. if job_action == "delete":
  781. for job_id in job_ids:
  782. if not is_readable(job_id):
  783. errs.append(f"Job {job_id} is not readable.")
  784. continue
  785. if not is_deletable(job_id):
  786. errs.append(f"Job {job_id} is not deletable.")
  787. continue
  788. try:
  789. delete_job(core_get(job_id))
  790. except Exception as e:
  791. errs.append(f"Error deleting job. {e}")
  792. elif job_action == "cancel":
  793. for job_id in job_ids:
  794. if not is_readable(job_id):
  795. errs.append(f"Job {job_id} is not readable.")
  796. continue
  797. if not is_editable(job_id):
  798. errs.append(f"Job {job_id} is not cancelable.")
  799. continue
  800. try:
  801. cancel_job(job_id)
  802. except Exception as e:
  803. errs.append(f"Error canceling job. {e}")
  804. _GuiCoreContext.__assign_var(state, payload.get("error_id"), "<br/>".join(errs) if errs else "")
  805. def edit_data_node(self, state: State, id: str, payload: t.Dict[str, str]):
  806. self.__lazy_start()
  807. args = payload.get("args")
  808. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  809. return
  810. error_var = payload.get("error_id")
  811. data = args[0]
  812. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  813. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  814. return
  815. entity: DataNode = core_get(entity_id)
  816. if isinstance(entity, DataNode):
  817. try:
  818. self.__edit_properties(entity, data)
  819. _GuiCoreContext.__assign_var(state, error_var, "")
  820. except Exception as e:
  821. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode. {e}")
  822. def lock_datanode_for_edit(self, state: State, id: str, payload: t.Dict[str, str]):
  823. self.__lazy_start()
  824. args = payload.get("args")
  825. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  826. return
  827. data = args[0]
  828. error_var = payload.get("error_id")
  829. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  830. if not self.__check_readable_editable(state, entity_id, "Datanode", error_var):
  831. return
  832. lock = data.get("lock", True)
  833. entity: DataNode = core_get(entity_id)
  834. if isinstance(entity, DataNode):
  835. try:
  836. if lock:
  837. entity.lock_edit(self.gui._get_client_id())
  838. else:
  839. entity.unlock_edit(self.gui._get_client_id())
  840. _GuiCoreContext.__assign_var(state, error_var, "")
  841. except Exception as e:
  842. _GuiCoreContext.__assign_var(state, error_var, f"Error locking Datanode. {e}")
  843. def __edit_properties(self, entity: t.Union[Scenario, Sequence, DataNode], data: t.Dict[str, str]):
  844. with entity as ent:
  845. if isinstance(ent, Scenario):
  846. tags = data.get(_GuiCoreContext.__PROP_SCENARIO_TAGS)
  847. if isinstance(tags, (list, tuple)):
  848. ent.tags = set(tags)
  849. name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
  850. if isinstance(name, str):
  851. if hasattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME):
  852. setattr(ent, _GuiCoreContext.__PROP_ENTITY_NAME, name)
  853. else:
  854. ent.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
  855. props = data.get("properties")
  856. if isinstance(props, (list, tuple)):
  857. for prop in props:
  858. key = prop.get("key")
  859. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  860. ent.properties[key] = prop.get("value")
  861. deleted_props = data.get("deleted_properties")
  862. if isinstance(deleted_props, (list, tuple)):
  863. for prop in deleted_props:
  864. key = prop.get("key")
  865. if key and key not in _GuiCoreContext.__ENTITY_PROPS:
  866. ent.properties.pop(key, None)
  867. def get_scenarios_for_owner(self, owner_id: str):
  868. self.__lazy_start()
  869. cycles_scenarios: t.List[t.Union[Scenario, Cycle]] = []
  870. with self.lock:
  871. if self.scenario_by_cycle is None:
  872. self.scenario_by_cycle = get_cycles_scenarios()
  873. if owner_id:
  874. if owner_id == "GLOBAL":
  875. for cycle, scenarios in self.scenario_by_cycle.items():
  876. if cycle is None:
  877. cycles_scenarios.extend(scenarios)
  878. else:
  879. cycles_scenarios.append(cycle)
  880. elif is_readable(t.cast(ScenarioId, owner_id)):
  881. entity = core_get(owner_id)
  882. if entity and (scenarios_cycle := self.scenario_by_cycle.get(t.cast(Cycle, entity))):
  883. cycles_scenarios.extend(scenarios_cycle)
  884. elif isinstance(entity, Scenario):
  885. cycles_scenarios.append(entity)
  886. return sorted(cycles_scenarios, key=_get_entity_property("creation_date", Scenario))
  887. def get_data_node_history(self, id: str):
  888. self.__lazy_start()
  889. if id and (dn := core_get(id)) and isinstance(dn, DataNode):
  890. res = []
  891. for e in dn.edits:
  892. job_id = e.get("job_id")
  893. job: t.Optional[Job] = None
  894. if job_id:
  895. if not is_readable(job_id):
  896. job_id += " not readable"
  897. else:
  898. job = core_get(job_id)
  899. res.append(
  900. (
  901. e.get("timestamp"),
  902. job_id if job_id else e.get("writer_identifier", ""),
  903. f"Execution of task {job.task.get_simple_label()}."
  904. if job and job.task
  905. else e.get("comment", ""),
  906. )
  907. )
  908. return sorted(res, key=lambda r: r[0], reverse=True)
  909. return _DoNotUpdate()
  910. def __check_readable_editable(self, state: State, id: str, ent_type: str, var: t.Optional[str]):
  911. if not is_readable(t.cast(ScenarioId, id)):
  912. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not readable.")
  913. return False
  914. if not is_editable(t.cast(ScenarioId, id)):
  915. _GuiCoreContext.__assign_var(state, var, f"{ent_type} {id} is not editable.")
  916. return False
  917. return True
  918. def update_data(self, state: State, id: str, payload: t.Dict[str, str]):
  919. self.__lazy_start()
  920. args = payload.get("args")
  921. if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
  922. return
  923. data = args[0]
  924. error_var = payload.get("error_id")
  925. entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
  926. if not self.__check_readable_editable(state, entity_id, "DataNode", error_var):
  927. return
  928. entity: DataNode = core_get(entity_id)
  929. if isinstance(entity, DataNode):
  930. try:
  931. entity.write(
  932. parser.parse(data.get("value"))
  933. if data.get("type") == "date"
  934. else int(data.get("value"))
  935. if data.get("type") == "int"
  936. else float(data.get("value"))
  937. if data.get("type") == "float"
  938. else data.get("value"),
  939. comment=data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT),
  940. )
  941. entity.unlock_edit(self.gui._get_client_id())
  942. _GuiCoreContext.__assign_var(state, error_var, "")
  943. except Exception as e:
  944. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode value. {e}")
  945. _GuiCoreContext.__assign_var(state, payload.get("data_id"), entity_id) # this will update the data value
  946. def tabular_data_edit(self, state: State, var_name: str, payload: dict):
  947. self.__lazy_start()
  948. error_var = payload.get("error_id")
  949. user_data = payload.get("user_data", {})
  950. dn_id = user_data.get("dn_id")
  951. if not self.__check_readable_editable(state, dn_id, "DataNode", error_var):
  952. return
  953. datanode = core_get(dn_id) if dn_id else None
  954. if isinstance(datanode, DataNode):
  955. try:
  956. idx = t.cast(int, payload.get("index"))
  957. col = t.cast(str, payload.get("col"))
  958. tz = payload.get("tz")
  959. val = (
  960. parser.parse(str(payload.get("value"))).astimezone(zoneinfo.ZoneInfo(tz)).replace(tzinfo=None)
  961. if tz is not None
  962. else payload.get("value")
  963. )
  964. # user_value = payload.get("user_value")
  965. data = self.__read_tabular_data(datanode)
  966. new_data: t.Any = None
  967. if isinstance(data, (pd.DataFrame, pd.Series)):
  968. if isinstance(data, pd.DataFrame):
  969. data.at[idx, col] = val
  970. elif isinstance(data, pd.Series):
  971. data.at[idx] = val
  972. new_data = data
  973. else:
  974. data_tuple = False
  975. if isinstance(data, tuple):
  976. data_tuple = True
  977. data = list(data)
  978. if isinstance(data, list):
  979. row = data[idx]
  980. row_tuple = False
  981. if isinstance(row, tuple):
  982. row = list(row)
  983. row_tuple = True
  984. if isinstance(row, list):
  985. row[int(col)] = val
  986. if row_tuple:
  987. data[idx] = tuple(row)
  988. new_data = data
  989. elif col == "0" and (isinstance(row, (str, Number)) or "date" in type(row).__name__):
  990. data[idx] = val
  991. new_data = data
  992. else:
  993. _GuiCoreContext.__assign_var(
  994. state,
  995. error_var,
  996. "Error updating Datanode: cannot handle multi-column list value.",
  997. )
  998. if data_tuple and new_data is not None:
  999. new_data = tuple(new_data)
  1000. else:
  1001. _GuiCoreContext.__assign_var(
  1002. state,
  1003. error_var,
  1004. "Error updating Datanode tabular value: type does not support at[] indexer.",
  1005. )
  1006. if new_data is not None:
  1007. datanode.write(new_data, comment=user_data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT))
  1008. _GuiCoreContext.__assign_var(state, error_var, "")
  1009. except Exception as e:
  1010. _GuiCoreContext.__assign_var(state, error_var, f"Error updating Datanode tabular value. {e}")
  1011. _GuiCoreContext.__assign_var(state, payload.get("data_id"), dn_id)
  1012. def get_data_node_properties(self, id: str):
  1013. self.__lazy_start()
  1014. if id and is_readable(t.cast(DataNodeId, id)) and (dn := core_get(id)) and isinstance(dn, DataNode):
  1015. try:
  1016. return (
  1017. (
  1018. (k, f"{v}")
  1019. for k, v in dn._get_user_properties().items()
  1020. if k != _GuiCoreContext.__PROP_ENTITY_NAME
  1021. ),
  1022. )
  1023. except Exception:
  1024. return None
  1025. return None
  1026. def __read_tabular_data(self, datanode: DataNode):
  1027. return datanode.read()
  1028. def get_data_node_tabular_data(self, id: str):
  1029. self.__lazy_start()
  1030. if (
  1031. id
  1032. and is_readable(t.cast(DataNodeId, id))
  1033. and (dn := core_get(id))
  1034. and isinstance(dn, DataNode)
  1035. and dn.is_ready_for_reading
  1036. ):
  1037. try:
  1038. value = self.__read_tabular_data(dn)
  1039. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  1040. return value
  1041. except Exception:
  1042. return None
  1043. return None
  1044. def get_data_node_tabular_columns(self, id: str):
  1045. self.__lazy_start()
  1046. if (
  1047. id
  1048. and is_readable(t.cast(DataNodeId, id))
  1049. and (dn := core_get(id))
  1050. and isinstance(dn, DataNode)
  1051. and dn.is_ready_for_reading
  1052. ):
  1053. try:
  1054. value = self.__read_tabular_data(dn)
  1055. if _GuiCoreDatanodeAdapter._is_tabular_data(dn, value):
  1056. return self.gui._tbl_cols(
  1057. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=value
  1058. )
  1059. except Exception:
  1060. return None
  1061. return None
  1062. def get_data_node_chart_config(self, id: str):
  1063. self.__lazy_start()
  1064. if (
  1065. id
  1066. and is_readable(t.cast(DataNodeId, id))
  1067. and (dn := core_get(id))
  1068. and isinstance(dn, DataNode)
  1069. and dn.is_ready_for_reading
  1070. ):
  1071. try:
  1072. return self.gui._chart_conf(
  1073. True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=self.__read_tabular_data(dn)
  1074. )
  1075. except Exception:
  1076. return None
  1077. return None
  1078. def on_dag_select(self, state: State, id: str, payload: t.Dict[str, str]):
  1079. self.__lazy_start()
  1080. args = payload.get("args")
  1081. if args is None or not isinstance(args, list) or len(args) < 2:
  1082. return
  1083. on_action_function = self.gui._get_user_function(args[1]) if args[1] else None
  1084. if callable(on_action_function):
  1085. try:
  1086. entity = core_get(args[0]) if is_readable(args[0]) else f"unredable({args[0]})"
  1087. self.gui._call_function_with_state(
  1088. on_action_function,
  1089. [entity],
  1090. )
  1091. except Exception as e:
  1092. if not self.gui._call_on_exception(args[1], e):
  1093. _warn(f"dag.on_action(): Exception raised in '{args[1]}()' with '{args[0]}'", e)
  1094. elif args[1]:
  1095. _warn(f"dag.on_action(): Invalid function '{args[1]}()'.")
  1096. def get_creation_reason(self):
  1097. self.__lazy_start()
  1098. return "" if (reason := can_create()) else f"Cannot create scenario: {reason.reasons}"