123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853 |
- # Copyright 2021-2024 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- import json
- import typing as t
- from collections import defaultdict
- from numbers import Number
- from threading import Lock
- try:
- import zoneinfo
- except ImportError:
- from backports import zoneinfo # type: ignore[no-redef]
- import pandas as pd
- from dateutil import parser
- from taipy.config import Config
- from taipy.core import (
- Cycle,
- DataNode,
- DataNodeId,
- Job,
- Scenario,
- ScenarioId,
- Sequence,
- SequenceId,
- Submission,
- SubmissionId,
- cancel_job,
- create_scenario,
- delete_job,
- get_cycles_scenarios,
- get_data_nodes,
- get_jobs,
- is_deletable,
- is_editable,
- is_promotable,
- is_readable,
- is_submittable,
- set_primary,
- )
- from taipy.core import delete as core_delete
- from taipy.core import get as core_get
- from taipy.core import submit as core_submit
- from taipy.core.data._abstract_tabular import _AbstractTabularDataNode
- from taipy.core.notification import CoreEventConsumerBase, EventEntityType
- from taipy.core.notification.event import Event, EventOperation
- from taipy.core.notification.notifier import Notifier
- from taipy.core.submission.submission_status import SubmissionStatus
- from taipy.gui import Gui, State
- from taipy.gui._warnings import _warn
- from taipy.gui.gui import _DoNotUpdate
- from ._adapters import _EntityType
- class _GuiCoreContext(CoreEventConsumerBase):
- __PROP_ENTITY_ID = "id"
- __PROP_ENTITY_COMMENT = "comment"
- __PROP_CONFIG_ID = "config"
- __PROP_DATE = "date"
- __PROP_ENTITY_NAME = "name"
- __PROP_SCENARIO_PRIMARY = "primary"
- __PROP_SCENARIO_TAGS = "tags"
- __ENTITY_PROPS = (__PROP_CONFIG_ID, __PROP_DATE, __PROP_ENTITY_NAME)
- __ACTION = "action"
- _CORE_CHANGED_NAME = "core_changed"
- _SCENARIO_SELECTOR_ERROR_VAR = "gui_core_sc_error"
- _SCENARIO_SELECTOR_ID_VAR = "gui_core_sc_id"
- _SCENARIO_VIZ_ERROR_VAR = "gui_core_sv_error"
- _JOB_SELECTOR_ERROR_VAR = "gui_core_js_error"
- _DATANODE_VIZ_ERROR_VAR = "gui_core_dv_error"
- _DATANODE_VIZ_OWNER_ID_VAR = "gui_core_dv_owner_id"
- _DATANODE_VIZ_HISTORY_ID_VAR = "gui_core_dv_history_id"
- _DATANODE_VIZ_DATA_ID_VAR = "gui_core_dv_data_id"
- _DATANODE_VIZ_DATA_CHART_ID_VAR = "gui_core_dv_data_chart_id"
- _DATANODE_VIZ_DATA_NODE_PROP = "data_node"
- _DATANODE_SEL_SCENARIO_PROP = "scenario"
- def __init__(self, gui: Gui) -> None:
- self.gui = gui
- self.scenario_by_cycle: t.Optional[t.Dict[t.Optional[Cycle], t.List[Scenario]]] = None
- self.data_nodes_by_owner: t.Optional[t.Dict[t.Optional[str], DataNode]] = None
- self.scenario_configs: t.Optional[t.List[t.Tuple[str, str]]] = None
- self.jobs_list: t.Optional[t.List[Job]] = None
- self.client_submission: t.Dict[str, SubmissionStatus] = dict()
- # register to taipy core notification
- reg_id, reg_queue = Notifier.register()
- # locks
- self.lock = Lock()
- self.submissions_lock = Lock()
- # super
- super().__init__(reg_id, reg_queue)
- self.start()
- def process_event(self, event: Event):
- if event.entity_type == EventEntityType.SCENARIO:
- self.scenario_refresh(
- event.entity_id
- if event.operation != EventOperation.DELETION and is_readable(t.cast(ScenarioId, event.entity_id))
- else None
- )
- elif event.entity_type == EventEntityType.SEQUENCE and event.entity_id:
- sequence = None
- try:
- sequence = (
- core_get(event.entity_id)
- if event.operation != EventOperation.DELETION and is_readable(t.cast(SequenceId, event.entity_id))
- else None
- )
- if sequence and hasattr(sequence, "parent_ids") and sequence.parent_ids: # type: ignore
- self.gui._broadcast(
- _GuiCoreContext._CORE_CHANGED_NAME,
- {"scenario": [x for x in sequence.parent_ids]}, # type: ignore
- )
- except Exception as e:
- _warn(f"Access to sequence {event.entity_id} failed", e)
- elif event.entity_type == EventEntityType.JOB:
- with self.lock:
- self.jobs_list = None
- if event.operation == EventOperation.UPDATE:
- try:
- job_entity = t.cast(Job, core_get(str(event.entity_id)))
- self.gui._broadcast(
- _GuiCoreContext._CORE_CHANGED_NAME,
- {"task": {"id": job_entity.task.id, "status": job_entity.status.name}},
- )
- except Exception as e:
- _warn(f"Access to sequence {event.entity_id} failed", e)
- elif event.entity_type == EventEntityType.SUBMISSION:
- self.scenario_status_callback(event.entity_id)
- elif event.entity_type == EventEntityType.DATA_NODE:
- with self.lock:
- self.data_nodes_by_owner = None
- self.gui._broadcast(
- _GuiCoreContext._CORE_CHANGED_NAME,
- {"datanode": event.entity_id if event.operation != EventOperation.DELETION else True},
- )
- def scenario_refresh(self, scenario_id: t.Optional[str]):
- with self.lock:
- self.scenario_by_cycle = None
- self.data_nodes_by_owner = None
- self.gui._broadcast(
- _GuiCoreContext._CORE_CHANGED_NAME,
- {"scenario": scenario_id or True},
- )
- def scenario_status_callback(self, submission_id: t.Optional[str]):
- if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
- return
- try:
- last_status = self.client_submission.get(submission_id)
- if not last_status:
- return
- submission = t.cast(Submission, core_get(submission_id))
- if not submission or not submission.entity_id:
- return
- entity = core_get(submission.entity_id)
- if not entity:
- return
- new_status = submission.submission_status
- if last_status != new_status:
- # callback
- submission_name = submission.properties.get("on_submission")
- if not submission_name:
- return
- submission_fn = self.gui._get_user_function(submission_name)
- if not callable(submission_fn):
- return
- self.gui._call_user_callback(
- submission.properties.get("client_id"),
- submission_fn,
- [entity, {"submission_status": new_status.name}],
- submission.properties.get("module_context"),
- )
- with self.submissions_lock:
- if new_status in (
- SubmissionStatus.COMPLETED,
- SubmissionStatus.FAILED,
- SubmissionStatus.CANCELED,
- ):
- self.client_submission.pop(submission_id, None)
- else:
- self.client_submission[submission_id] = new_status
- except Exception as e:
- _warn(f"Submission ({submission_id}) is not available", e)
- finally:
- self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, {"jobs": True})
- def scenario_adapter(self, scenario_or_cycle):
- try:
- if (
- hasattr(scenario_or_cycle, "id")
- and is_readable(scenario_or_cycle.id)
- and core_get(scenario_or_cycle.id) is not None
- ):
- if self.scenario_by_cycle and isinstance(scenario_or_cycle, Cycle):
- return (
- scenario_or_cycle.id,
- scenario_or_cycle.get_simple_label(),
- self.scenario_by_cycle.get(scenario_or_cycle),
- _EntityType.CYCLE.value,
- False,
- )
- elif isinstance(scenario_or_cycle, Scenario):
- return (
- scenario_or_cycle.id,
- scenario_or_cycle.get_simple_label(),
- None,
- _EntityType.SCENARIO.value,
- scenario_or_cycle.is_primary,
- )
- except Exception as e:
- _warn(
- f"Access to {type(scenario_or_cycle)} "
- + f"({scenario_or_cycle.id if hasattr(scenario_or_cycle, 'id') else 'No_id'})"
- + " failed",
- e,
- )
- return None
- def get_scenarios(self):
- cycles_scenarios = []
- with self.lock:
- if self.scenario_by_cycle is None:
- self.scenario_by_cycle = get_cycles_scenarios()
- for cycle, scenarios in self.scenario_by_cycle.items():
- if cycle is None:
- cycles_scenarios.extend(scenarios)
- else:
- cycles_scenarios.append(cycle)
- return cycles_scenarios
- def select_scenario(self, state: State, id: str, payload: t.Dict[str, str]):
- args = payload.get("args")
- if args is None or not isinstance(args, list) or len(args) == 0:
- return
- state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ID_VAR, args[0])
- def get_scenario_by_id(self, id: str) -> t.Optional[Scenario]:
- if not id or not is_readable(t.cast(ScenarioId, id)):
- return None
- try:
- return core_get(t.cast(ScenarioId, id))
- except Exception:
- return None
- def get_scenario_configs(self):
- with self.lock:
- if self.scenario_configs is None:
- configs = Config.scenarios
- if isinstance(configs, dict):
- self.scenario_configs = [(id, f"{c.id}") for id, c in configs.items() if id != "default"]
- return self.scenario_configs
- def crud_scenario(self, state: State, id: str, payload: t.Dict[str, str]): # noqa: C901
- args = payload.get("args")
- if (
- args is None
- or not isinstance(args, list)
- or len(args) < 4
- or not isinstance(args[1], bool)
- or not isinstance(args[2], bool)
- or not isinstance(args[3], dict)
- ):
- return
- update = args[1]
- delete = args[2]
- data = args[3]
- with_dialog = True if len(args) < 5 else bool(args[4])
- scenario = None
- name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
- if update:
- scenario_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
- if delete:
- if not is_deletable(scenario_id):
- state.assign(
- _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Scenario. {scenario_id} is not deletable."
- )
- return
- try:
- core_delete(scenario_id)
- except Exception as e:
- state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Error deleting Scenario. {e}")
- else:
- if not self.__check_readable_editable(
- state, scenario_id, "Scenario", _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR
- ):
- return
- scenario = core_get(scenario_id)
- else:
- if with_dialog:
- config_id = data.get(_GuiCoreContext.__PROP_CONFIG_ID)
- scenario_config = Config.scenarios.get(config_id)
- if with_dialog and scenario_config is None:
- state.assign(
- _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Invalid configuration id ({config_id})"
- )
- return
- date_str = data.get(_GuiCoreContext.__PROP_DATE)
- try:
- date = parser.parse(date_str) if isinstance(date_str, str) else None
- except Exception as e:
- state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Invalid date ({date_str}).{e}")
- return
- else:
- scenario_config = None
- date = None
- scenario_id = None
- try:
- gui: Gui = state._gui
- on_creation = args[0] if isinstance(args[0], str) else None
- on_creation_function = gui._get_user_function(on_creation) if on_creation else None
- if callable(on_creation_function):
- try:
- res = gui._call_function_with_state(
- on_creation_function,
- [
- id,
- {
- "action": on_creation,
- "config": scenario_config,
- "date": date,
- "label": name,
- "properties": {
- v.get("key"): v.get("value") for v in data.get("properties", dict())
- },
- },
- ],
- )
- if isinstance(res, Scenario):
- # everything's fine
- scenario_id = res.id
- state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, "")
- return
- if res:
- # do not create
- state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"{res}")
- return
- except Exception as e: # pragma: no cover
- if not gui._call_on_exception(on_creation, e):
- _warn(f"on_creation(): Exception raised in '{on_creation}()'", e)
- state.assign(
- _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR,
- f"Error creating Scenario with '{on_creation}()'. {e}",
- )
- return
- elif on_creation is not None:
- _warn(f"on_creation(): '{on_creation}' is not a function.")
- elif not with_dialog:
- if len(Config.scenarios) == 2:
- scenario_config = [sc for k, sc in Config.scenarios.items() if k != "default"][0]
- else:
- state.assign(
- _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR,
- "Error creating Scenario: only one scenario config needed "
- + f"({len(Config.scenarios) - 1}) found.",
- )
- return
- scenario = create_scenario(scenario_config, date, name)
- scenario_id = scenario.id
- except Exception as e:
- state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Error creating Scenario. {e}")
- finally:
- self.scenario_refresh(scenario_id)
- if scenario:
- if not is_editable(scenario):
- state.assign(
- _GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Scenario {scenario_id or name} is not editable."
- )
- return
- with scenario as sc:
- sc.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
- if props := data.get("properties"):
- try:
- new_keys = [prop.get("key") for prop in props]
- for key in t.cast(dict, sc.properties).keys():
- if key and key not in _GuiCoreContext.__ENTITY_PROPS and key not in new_keys:
- t.cast(dict, sc.properties).pop(key, None)
- for prop in props:
- key = prop.get("key")
- if key and key not in _GuiCoreContext.__ENTITY_PROPS:
- sc._properties[key] = prop.get("value")
- state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, "")
- except Exception as e:
- state.assign(_GuiCoreContext._SCENARIO_SELECTOR_ERROR_VAR, f"Error creating Scenario. {e}")
- def edit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
- args = payload.get("args")
- if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
- return
- data = args[0]
- entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
- if not self.__check_readable_editable(
- state, entity_id, data.get("type", "Scenario"), _GuiCoreContext._SCENARIO_VIZ_ERROR_VAR
- ):
- return
- entity: t.Union[Scenario, Sequence] = core_get(entity_id)
- if entity:
- try:
- if isinstance(entity, Scenario):
- primary = data.get(_GuiCoreContext.__PROP_SCENARIO_PRIMARY)
- if primary is True:
- if not is_promotable(entity):
- state.assign(
- _GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, f"Scenario {entity_id} is not promotable."
- )
- return
- set_primary(entity)
- self.__edit_properties(entity, data)
- state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, "")
- except Exception as e:
- state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, f"Error updating Scenario. {e}")
- def submit_entity(self, state: State, id: str, payload: t.Dict[str, str]):
- args = payload.get("args")
- if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
- return
- data = args[0]
- entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
- if not is_submittable(entity_id):
- state.assign(
- _GuiCoreContext._SCENARIO_VIZ_ERROR_VAR,
- f"{data.get('type', 'Scenario')} {entity_id} is not submittable.",
- )
- return
- entity = core_get(entity_id)
- if entity:
- try:
- on_submission = data.get("on_submission_change")
- submission_entity = core_submit(
- entity,
- on_submission=on_submission,
- client_id=self.gui._get_client_id(),
- module_context=self.gui._get_locals_context(),
- )
- if on_submission:
- with self.submissions_lock:
- self.client_submission[submission_entity.id] = submission_entity.submission_status
- if Config.core.mode == "development":
- self.client_submission[submission_entity.id] = SubmissionStatus.SUBMITTED
- self.scenario_status_callback(submission_entity.id)
- state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, "")
- except Exception as e:
- state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, f"Error submitting entity. {e}")
- def __do_datanodes_tree(self):
- if self.data_nodes_by_owner is None:
- self.data_nodes_by_owner = defaultdict(list)
- for dn in get_data_nodes():
- self.data_nodes_by_owner[dn.owner_id].append(dn)
- def get_datanodes_tree(self, scenario: t.Optional[Scenario]):
- with self.lock:
- self.__do_datanodes_tree()
- return (
- self.data_nodes_by_owner.get(scenario.id if scenario else None, []) if self.data_nodes_by_owner else []
- ) + (self.get_scenarios() if not scenario else [])
- def data_node_adapter(self, data):
- try:
- if hasattr(data, "id") and is_readable(data.id) and core_get(data.id) is not None:
- if isinstance(data, DataNode):
- return (data.id, data.get_simple_label(), None, _EntityType.DATANODE.value, False)
- else:
- with self.lock:
- self.__do_datanodes_tree()
- if self.data_nodes_by_owner:
- if isinstance(data, Cycle):
- return (
- data.id,
- data.get_simple_label(),
- self.data_nodes_by_owner[data.id] + self.scenario_by_cycle.get(data, []),
- _EntityType.CYCLE.value,
- False,
- )
- elif isinstance(data, Scenario):
- return (
- data.id,
- data.get_simple_label(),
- self.data_nodes_by_owner[data.id] + list(data.sequences.values()),
- _EntityType.SCENARIO.value,
- data.is_primary,
- )
- elif isinstance(data, Sequence):
- if datanodes := self.data_nodes_by_owner.get(data.id):
- return (
- data.id,
- data.get_simple_label(),
- datanodes,
- _EntityType.SEQUENCE.value,
- False,
- )
- except Exception as e:
- _warn(
- f"Access to {type(data)} ({data.id if hasattr(data, 'id') else 'No_id'}) failed",
- e,
- )
- return None
- def get_jobs_list(self):
- with self.lock:
- if self.jobs_list is None:
- self.jobs_list = get_jobs()
- return self.jobs_list
- def job_adapter(self, job):
- try:
- if hasattr(job, "id") and is_readable(job.id) and core_get(job.id) is not None:
- if isinstance(job, Job):
- entity = core_get(job.owner_id)
- return (
- job.id,
- job.get_simple_label(),
- [],
- entity.get_simple_label() if entity else "",
- entity.id if entity else "",
- job.submit_id,
- job.creation_date,
- job.status.value,
- is_deletable(job),
- is_readable(job),
- is_editable(job),
- )
- except Exception as e:
- _warn(f"Access to job ({job.id if hasattr(job, 'id') else 'No_id'}) failed", e)
- return None
- def act_on_jobs(self, state: State, id: str, payload: t.Dict[str, str]):
- args = payload.get("args")
- if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
- return
- data = args[0]
- job_ids = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
- job_action = data.get(_GuiCoreContext.__ACTION)
- if job_action and isinstance(job_ids, list):
- errs = []
- if job_action == "delete":
- for job_id in job_ids:
- if not is_readable(job_id):
- errs.append(f"Job {job_id} is not readable.")
- continue
- if not is_deletable(job_id):
- errs.append(f"Job {job_id} is not deletable.")
- continue
- try:
- delete_job(core_get(job_id))
- except Exception as e:
- errs.append(f"Error deleting job. {e}")
- elif job_action == "cancel":
- for job_id in job_ids:
- if not is_readable(job_id):
- errs.append(f"Job {job_id} is not readable.")
- continue
- if not is_editable(job_id):
- errs.append(f"Job {job_id} is not cancelable.")
- continue
- try:
- cancel_job(job_id)
- except Exception as e:
- errs.append(f"Error canceling job. {e}")
- state.assign(_GuiCoreContext._JOB_SELECTOR_ERROR_VAR, "<br/>".join(errs) if errs else "")
- def edit_data_node(self, state: State, id: str, payload: t.Dict[str, str]):
- args = payload.get("args")
- if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
- return
- data = args[0]
- entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
- if not self.__check_readable_editable(state, entity_id, "DataNode", _GuiCoreContext._DATANODE_VIZ_ERROR_VAR):
- return
- entity: DataNode = core_get(entity_id)
- if isinstance(entity, DataNode):
- try:
- self.__edit_properties(entity, data)
- state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, "")
- except Exception as e:
- state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, f"Error updating Datanode. {e}")
- def lock_datanode_for_edit(self, state: State, id: str, payload: t.Dict[str, str]):
- args = payload.get("args")
- if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
- return
- data = args[0]
- entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
- if not self.__check_readable_editable(state, entity_id, "Datanode", _GuiCoreContext._DATANODE_VIZ_ERROR_VAR):
- return
- lock = data.get("lock", True)
- entity: DataNode = core_get(entity_id)
- if isinstance(entity, DataNode):
- try:
- if lock:
- entity.lock_edit(self.gui._get_client_id())
- else:
- entity.unlock_edit(self.gui._get_client_id())
- state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, "")
- except Exception as e:
- state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, f"Error locking Datanode. {e}")
- def __edit_properties(self, entity: t.Union[Scenario, Sequence, DataNode], data: t.Dict[str, str]):
- with entity as ent:
- if isinstance(ent, Scenario):
- tags = data.get(_GuiCoreContext.__PROP_SCENARIO_TAGS)
- if isinstance(tags, (list, tuple)):
- ent.tags = {t for t in tags}
- name = data.get(_GuiCoreContext.__PROP_ENTITY_NAME)
- if isinstance(name, str):
- ent.properties[_GuiCoreContext.__PROP_ENTITY_NAME] = name
- props = data.get("properties")
- if isinstance(props, (list, tuple)):
- for prop in props:
- key = prop.get("key")
- if key and key not in _GuiCoreContext.__ENTITY_PROPS:
- ent.properties[key] = prop.get("value")
- deleted_props = data.get("deleted_properties")
- if isinstance(deleted_props, (list, tuple)):
- for prop in deleted_props:
- key = prop.get("key")
- if key and key not in _GuiCoreContext.__ENTITY_PROPS:
- ent.properties.pop(key, None)
- def get_scenarios_for_owner(self, owner_id: str):
- cycles_scenarios: t.List[t.Union[Scenario, Cycle]] = []
- with self.lock:
- if self.scenario_by_cycle is None:
- self.scenario_by_cycle = get_cycles_scenarios()
- if owner_id:
- if owner_id == "GLOBAL":
- for cycle, scenarios in self.scenario_by_cycle.items():
- if cycle is None:
- cycles_scenarios.extend(scenarios)
- else:
- cycles_scenarios.append(cycle)
- elif is_readable(t.cast(ScenarioId, owner_id)):
- entity = core_get(owner_id)
- if entity and (scenarios_cycle := self.scenario_by_cycle.get(t.cast(Cycle, entity))):
- cycles_scenarios.extend(scenarios_cycle)
- elif isinstance(entity, Scenario):
- cycles_scenarios.append(entity)
- return cycles_scenarios
- def get_data_node_history(self, datanode: DataNode, id: str):
- if (
- id
- and isinstance(datanode, DataNode)
- and id == datanode.id
- and (dn := core_get(id))
- and isinstance(dn, DataNode)
- ):
- res = []
- for e in dn.edits:
- job_id = e.get("job_id")
- job: t.Optional[Job] = None
- if job_id:
- if not is_readable(job_id):
- job_id += " not readable"
- else:
- job = core_get(job_id)
- res.append(
- (
- e.get("timestamp"),
- job_id if job_id else e.get("writer_identifier", ""),
- f"Execution of task {job.task.get_simple_label()}."
- if job and job.task
- else e.get("comment", ""),
- )
- )
- return sorted(res, key=lambda r: r[0], reverse=True)
- return _DoNotUpdate()
- def get_data_node_data(self, datanode: DataNode, id: str):
- if (
- id
- and isinstance(datanode, DataNode)
- and id == datanode.id
- and (dn := core_get(id))
- and isinstance(dn, DataNode)
- ):
- if dn._last_edit_date:
- if isinstance(dn, _AbstractTabularDataNode):
- return (None, None, True, None)
- try:
- value = dn.read()
- if isinstance(value, (pd.DataFrame, pd.Series)):
- return (None, None, True, None)
- return (
- value,
- "date"
- if "date" in type(value).__name__
- else type(value).__name__
- if isinstance(value, Number)
- else None,
- None,
- None,
- )
- except Exception as e:
- return (None, None, None, f"read data_node: {e}")
- return (None, None, None, f"Data unavailable for {dn.get_simple_label()}")
- return _DoNotUpdate()
- def __check_readable_editable(self, state: State, id: str, type: str, var: str):
- if not is_readable(t.cast(DataNodeId, id)):
- state.assign(var, f"{type} {id} is not readable.")
- return False
- if not is_editable(t.cast(DataNodeId, id)):
- state.assign(var, f"{type} {id} is not editable.")
- return False
- return True
- def update_data(self, state: State, id: str, payload: t.Dict[str, str]):
- args = payload.get("args")
- if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
- return
- data = args[0]
- entity_id = data.get(_GuiCoreContext.__PROP_ENTITY_ID)
- if not self.__check_readable_editable(state, entity_id, "DataNode", _GuiCoreContext._DATANODE_VIZ_ERROR_VAR):
- return
- entity: DataNode = core_get(entity_id)
- if isinstance(entity, DataNode):
- try:
- entity.write(
- parser.parse(data.get("value"))
- if data.get("type") == "date"
- else int(data.get("value"))
- if data.get("type") == "int"
- else float(data.get("value"))
- if data.get("type") == "float"
- else data.get("value"),
- comment=data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT),
- )
- entity.unlock_edit(self.gui._get_client_id())
- state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, "")
- except Exception as e:
- state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, f"Error updating Datanode value. {e}")
- state.assign(_GuiCoreContext._DATANODE_VIZ_DATA_ID_VAR, entity_id) # this will update the data value
- def tabular_data_edit(self, state: State, var_name: str, payload: dict):
- user_data = payload.get("user_data", {})
- dn_id = user_data.get("dn_id")
- if not self.__check_readable_editable(state, dn_id, "DataNode", _GuiCoreContext._DATANODE_VIZ_ERROR_VAR):
- return
- datanode = core_get(dn_id) if dn_id else None
- if isinstance(datanode, DataNode):
- try:
- idx = payload.get("index")
- col = payload.get("col")
- tz = payload.get("tz")
- val = (
- parser.parse(str(payload.get("value"))).astimezone(zoneinfo.ZoneInfo(tz)).replace(tzinfo=None)
- if tz is not None
- else payload.get("value")
- )
- # user_value = payload.get("user_value")
- data = self.__read_tabular_data(datanode)
- if hasattr(data, "at"):
- data.at[idx, col] = val
- datanode.write(data, comment=user_data.get(_GuiCoreContext.__PROP_ENTITY_COMMENT))
- state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, "")
- else:
- state.assign(
- _GuiCoreContext._DATANODE_VIZ_ERROR_VAR,
- "Error updating Datanode tabular value: type does not support at[] indexer.",
- )
- except Exception as e:
- state.assign(_GuiCoreContext._DATANODE_VIZ_ERROR_VAR, f"Error updating Datanode tabular value. {e}")
- setattr(state, _GuiCoreContext._DATANODE_VIZ_DATA_ID_VAR, dn_id)
- def __read_tabular_data(self, datanode: DataNode):
- return datanode.read()
- def get_data_node_tabular_data(self, datanode: DataNode, id: str):
- if (
- id
- and isinstance(datanode, DataNode)
- and id == datanode.id
- and is_readable(t.cast(DataNodeId, id))
- and (dn := core_get(id))
- and isinstance(dn, DataNode)
- and dn.is_ready_for_reading
- ):
- try:
- return self.__read_tabular_data(dn)
- except Exception:
- return None
- return None
- def get_data_node_tabular_columns(self, datanode: DataNode, id: str):
- if (
- id
- and isinstance(datanode, DataNode)
- and id == datanode.id
- and is_readable(t.cast(DataNodeId, id))
- and (dn := core_get(id))
- and isinstance(dn, DataNode)
- and dn.is_ready_for_reading
- ):
- try:
- return self.gui._tbl_cols(
- True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=self.__read_tabular_data(dn)
- )
- except Exception:
- return None
- return None
- def get_data_node_chart_config(self, datanode: DataNode, id: str):
- if (
- id
- and isinstance(datanode, DataNode)
- and id == datanode.id
- and is_readable(t.cast(DataNodeId, id))
- and (dn := core_get(id))
- and isinstance(dn, DataNode)
- and dn.is_ready_for_reading
- ):
- try:
- return self.gui._chart_conf(
- True, True, "{}", json.dumps({"data": "tabular_data"}), tabular_data=self.__read_tabular_data(dn)
- )
- except Exception:
- return None
- return None
- def select_id(self, state: State, id: str, payload: t.Dict[str, str]):
- args = payload.get("args")
- if args is None or not isinstance(args, list) or len(args) == 0 and isinstance(args[0], dict):
- return
- data = args[0]
- if owner_id := data.get("owner_id"):
- state.assign(_GuiCoreContext._DATANODE_VIZ_OWNER_ID_VAR, owner_id)
- elif history_id := data.get("history_id"):
- state.assign(_GuiCoreContext._DATANODE_VIZ_HISTORY_ID_VAR, history_id)
- elif data_id := data.get("data_id"):
- state.assign(_GuiCoreContext._DATANODE_VIZ_DATA_ID_VAR, data_id)
- elif chart_id := data.get("chart_id"):
- state.assign(_GuiCoreContext._DATANODE_VIZ_DATA_CHART_ID_VAR, chart_id)
|