Bläddra i källkod

update DAG on Job Status change (#729)

* update DAG on Job Status change

* fix test and default value

* Fab's comments

---------

Co-authored-by: Fred Lefévère-Laoide <Fred.Lefevere-Laoide@Taipy.io>
Fred Lefévère-Laoide 1 år sedan
förälder
incheckning
53ab999186

+ 12 - 6
frontend/taipy/src/ScenarioDag.tsx

@@ -10,8 +10,8 @@ import { ZoomIn } from "@mui/icons-material";
 import createEngine from "@projectstorm/react-diagrams";
 import deepEqual from "fast-deep-equal/es6";
 
-import { DisplayModel } from "./utils/types";
-import { createDagreEngine, initDiagram, populateModel, relayoutDiagram } from "./utils/diagram";
+import { DisplayModel, TaskStatuses } from "./utils/types";
+import { addStatusToDisplayModel, createDagreEngine, initDiagram, populateModel, relayoutDiagram } from "./utils/diagram";
 import {
     createRequestUpdateAction,
     createSendUpdateAction,
@@ -72,6 +72,7 @@ const ScenarioDag = (props: ScenarioDagProps) => {
     const [engine] = useState(createEngine);
     const [dagreEngine] = useState(createDagreEngine);
     const [displayModel, setDisplayModel] = useState<DisplayModel>();
+    const [taskStatuses, setTaskStatuses] = useState<TaskStatuses>();
     const dispatch = useDispatch();
     const module = useModule();
 
@@ -95,6 +96,10 @@ const ScenarioDag = (props: ScenarioDagProps) => {
         if (typeof ids === "string" ? ids === scenarioId : Array.isArray(ids) ? ids.includes(scenarioId) : ids) {
             props.updateVarName && dispatch(createRequestUpdateAction(props.id, module, [props.updateVarName], true));
         }
+        const tasks = props.coreChanged?.tasks;
+        if (tasks) {
+            setTaskStatuses(tasks as TaskStatuses);
+        }
     }, [props.coreChanged, props.updateVarName, scenarioId, module, dispatch, props.id]);
 
     useEffect(() => {
@@ -108,8 +113,9 @@ const ScenarioDag = (props: ScenarioDagProps) => {
                 // Do nothing
             }
         }
+        dm = addStatusToDisplayModel(dm, taskStatuses);
         setDisplayModel((oldDm) => (deepEqual(oldDm, dm) ? oldDm : dm));
-    }, [props.scenario, props.defaultScenario]);
+    }, [props.scenario, props.defaultScenario, taskStatuses]);
 
     const relayout = useCallback(() => relayoutDiagram(engine, dagreEngine), [engine, dagreEngine]);
 
@@ -118,17 +124,17 @@ const ScenarioDag = (props: ScenarioDagProps) => {
     useEffect(() => {
         const model = new TaipyDiagramModel();
         initDiagram(engine);
-
+        let doLayout = false;
         if (displayModel) {
             setScenarioId(displayModel[0]);
             // populate model
-            populateModel(displayModel, model);
+            doLayout = populateModel(displayModel, model);
         }
         engine.setModel(model);
         // Block deletion
         //engine.getActionEventBus().registerAction(new DeleteItemsAction({ keyCodes: [1] }));
         model.setLocked(true);
-        setTimeout(relayout, 500);
+        doLayout && setTimeout(relayout, 500);
     }, [displayModel, engine, relayout]);
 
     useEffect(() => {

+ 8 - 2
frontend/taipy/src/projectstorm/NodeWidget.tsx

@@ -21,16 +21,17 @@ import { Datanode as DIcon, Task as TIcon, Sequence as PIcon, Scenario as SIcon
 import { TaipyNodeModel } from "./models";
 import { IN_PORT_NAME } from "../utils/diagram";
 import { Input, Output } from "../icons";
+import { TaskStatus } from "../utils/types";
 
 // eslint-disable-next-line @typescript-eslint/no-namespace
 namespace S {
-    export const Node = styled.div<{ background?: string; selected?: boolean }>`
+    export const Node = styled.div<{ background?: string; selected?: boolean, $status?: TaskStatus }>`
         background-color: ${(p) => p.background};
         border-radius: 5px;
         color: white;
         border: solid 2px black;
         overflow: visible;
-        border: solid 2px ${(p) => (p.selected ? "rgb(0,192,255)" : "black")};
+        border: solid 2px ${(p) => (p.selected ? "rgb(0,192,255)" : getStatusColor(p.$status))};
     `;
     export const Title = styled.div`
         background: rgba(0, 0, 0, 0.3);
@@ -87,6 +88,9 @@ interface NodeProps {
     engine: DiagramEngine;
 }
 
+const getStatusLabel = (status?: TaskStatus) => status == TaskStatus.Running ? "Running" : status == TaskStatus.Pending ? "Pending" : undefined
+const getStatusColor = (status?: TaskStatus) => status == TaskStatus.Running ? "rgb(0,163,108)" : status == TaskStatus.Pending ? "rgb(255,165,0)" : "black"
+
 const NodeWidget = ({ node, engine }: NodeProps) => {
     const generatePort = useCallback(
         (port: DefaultPortModel) =>
@@ -111,6 +115,8 @@ const NodeWidget = ({ node, engine }: NodeProps) => {
             data-default-node-name={node.getOptions().name}
             selected={node.isSelected()}
             background={node.getOptions().color}
+            title={getStatusLabel(node.status)}
+            $status={node.status}
         >
             <S.Title>
                 <S.TitleIcon className="icon" title={node.getType()}>

+ 4 - 0
frontend/taipy/src/projectstorm/models.ts

@@ -16,17 +16,21 @@ import { DefaultNodeModel, DefaultNodeModelOptions, DefaultPortModel, DefaultPor
 import { IN_PORT_NAME, OUT_PORT_NAME } from "../utils/diagram";
 import { getChildType } from "../utils/childtype";
 import { DataNode, Task } from "../utils/names";
+import { TaskStatus } from "../utils/types";
 
 export class TaipyDiagramModel extends DiagramModel {}
 
 export interface TaipyNodeModelOptions extends DefaultNodeModelOptions {
     subtype?: string;
+    status?: TaskStatus;
 }
 export class TaipyNodeModel extends DefaultNodeModel {
     subtype: string | undefined;
+    status: TaskStatus | undefined;
     constructor(options?: TaipyNodeModelOptions) {
         super(options);
         this.subtype = options?.subtype;
+        this.status = options?.status
     }
 }
 

+ 29 - 15
frontend/taipy/src/utils/diagram.ts

@@ -26,19 +26,20 @@ import { getNodeColor } from "./config";
 import { TaipyDiagramModel, TaipyNodeModel } from "../projectstorm/models";
 import { TaipyNodeFactory, TaipyPortFactory } from "../projectstorm/factories";
 import { nodeTypes } from "./config";
-import { DisplayModel } from "./types";
-
-export const createDagreEngine = () => new DagreEngine({
-    graph: {
-        rankdir: "LR",
-        ranker: "longest-path",
-        marginx: 25,
-        marginy: 25,
-    },
-    includeLinks: false,
-});
-
-export const initDiagram = (engine: DiagramEngine)  => {
+import { DisplayModel, TaskStatus, TaskStatuses } from "./types";
+
+export const createDagreEngine = () =>
+    new DagreEngine({
+        graph: {
+            rankdir: "LR",
+            ranker: "longest-path",
+            marginx: 25,
+            marginy: 25,
+        },
+        includeLinks: false,
+    });
+
+export const initDiagram = (engine: DiagramEngine) => {
     nodeTypes.forEach((nodeType) => engine.getNodeFactories().registerFactory(new TaipyNodeFactory(nodeType)));
     engine.getPortFactories().registerFactory(new TaipyPortFactory());
     const state = engine.getStateMachine().getCurrentState();
@@ -58,13 +59,14 @@ export const getLinkId = (link: LinkModel) =>
     )}`;
 export const getNodeId = (node: DefaultNodeModel) => `${node.getType()}.${node.getID()}`;
 
-export const createNode = (nodeType: string, id: string, name: string, subtype: string) =>
+export const createNode = (nodeType: string, id: string, name: string, subtype: string, status?: TaskStatus) =>
     new TaipyNodeModel({
         id: id,
         type: nodeType,
         name: name,
         color: getNodeColor(nodeType),
         subtype: subtype,
+        status: status,
     });
 
 export const createLink = (outPort: DefaultPortModel, inPort: DefaultPortModel) =>
@@ -127,6 +129,17 @@ export const relayoutDiagram = (engine: DiagramEngine, dagreEngine: DagreEngine)
     engine.repaintCanvas();
 };
 
+export const addStatusToDisplayModel = (dm?: DisplayModel, taskStatuses?: TaskStatuses) => {
+    if (dm && taskStatuses) {
+        Object.values(dm[1]).forEach((node) =>
+            Object.entries(node).forEach(([id, detail]) => {
+                detail.status = taskStatuses[id];
+            })
+        );
+    }
+    return dm;
+};
+
 export const populateModel = (displayModel: DisplayModel, model: TaipyDiagramModel) => {
     const linkModels: DefaultLinkModel[] = [];
     const nodeModels: Record<string, Record<string, DefaultNodeModel>> = {};
@@ -134,7 +147,7 @@ export const populateModel = (displayModel: DisplayModel, model: TaipyDiagramMod
     displayModel[1] &&
         Object.entries(displayModel[1]).forEach(([nodeType, n]) => {
             Object.entries(n).forEach(([id, detail]) => {
-                const node = createNode(nodeType, id, detail.name, detail.type);
+                const node = createNode(nodeType, id, detail.name, detail.type, detail.status);
                 nodeModels[nodeType] = nodeModels[nodeType] || {};
                 nodeModels[nodeType][id] = node;
             });
@@ -157,4 +170,5 @@ export const populateModel = (displayModel: DisplayModel, model: TaipyDiagramMod
     Object.values(nodeModels).forEach((nm) => Object.values(nm).forEach((n) => nodeLayer.addModel(n)));
     const linkLayer = model.getActiveLinkLayer();
     linkModels.forEach((l) => linkLayer.addModel(l));
+    return Object.keys(nodeModels).length > 1;
 };

+ 9 - 1
frontend/taipy/src/utils/types.ts

@@ -1,6 +1,6 @@
 export type DisplayModel = [
     string,
-    Record<string, Record<string, { name: string; type: string }>>,
+    Record<string, Record<string, { name: string; type: string, status?: TaskStatus }>>,
     Array<[string, string, string, string]>
 ];
 
@@ -19,3 +19,11 @@ export enum NodeType {
     SEQUENCE = 2,
     NODE = 3,
 }
+
+export enum TaskStatus {
+    Quiet = 0,
+    Pending = 3,
+    Running = 4,
+}
+
+export type TaskStatuses = Record<string, TaskStatus>;

+ 14 - 7
taipy/gui/gui.py

@@ -1102,11 +1102,13 @@ class Gui:
         else:
             grouping_message.append(payload)
 
-    def __broadcast_ws(self, payload: dict):
+    def __broadcast_ws(self, payload: dict, client_id: t.Optional[str] = None):
         try:
+            to = list(self.__get_sids(client_id)) if client_id else []
             self._server._ws.emit(
                 "message",
                 payload,
+                to=to if to else None
             )
             time.sleep(0.001)
         except Exception as e:  # pragma: no cover
@@ -1188,20 +1190,24 @@ class Gui:
         else:
             self.__send_ws({"type": _WsType.MULTIPLE_UPDATE.value, "payload": payload})
 
-    def __send_ws_broadcast(self, var_name: str, var_value: t.Any):
+    def __send_ws_broadcast(self, var_name: str, var_value: t.Any, client_id: t.Optional[str] = None):
         self.__broadcast_ws(
-            {"type": _WsType.UPDATE.value, "name": _get_broadcast_var_name(var_name), "payload": {"value": var_value}}
+            {"type": _WsType.UPDATE.value, "name": _get_broadcast_var_name(var_name), "payload": {"value": var_value}},
+            client_id,
         )
 
     def __get_ws_receiver(self) -> t.Union[t.List[str], t.Any, None]:
         if self._bindings()._is_single_client():
             return None
         sid = getattr(request, "sid", None) if request else None
-        sids = self.__client_id_2_sid.get(self._get_client_id(), set())
+        sids = self.__get_sids(self._get_client_id())
         if sid:
             sids.add(sid)
         return list(sids)
 
+    def __get_sids(self, client_id: str) -> t.Set[str]:
+        return self.__client_id_2_sid.get(client_id, set())
+
     def __get_message_grouping(self):
         return (
             _getscopeattr(self, Gui.__MESSAGE_GROUPING_NAME)
@@ -1785,15 +1791,16 @@ class Gui:
     def load_config(self, config: Config) -> None:
         self._config._load(config)
 
-    def _broadcast(self, name: str, value: t.Any):
-        """NOT UNDOCUMENTED
+    def _broadcast(self, name: str, value: t.Any, client_id: t.Optional[str] = None):
+        """NOT DOCUMENTED
         Send the new value of a variable to all connected clients.
 
         Arguments:
             name: The name of the variable to update or create.
             value: The value (must be serializable to the JSON format).
+            client_id: The client id (broadcast to all client if None)
         """
-        self.__send_ws_broadcast(name, value)
+        self.__send_ws_broadcast(name, value, client_id)
 
     def _broadcast_all_clients(self, name: str, value: t.Any):
         try:

+ 33 - 31
taipy/gui_core/_context.py

@@ -127,17 +127,8 @@ class _GuiCoreContext(CoreEventConsumerBase):
         elif event.entity_type == EventEntityType.JOB:
             with self.lock:
                 self.jobs_list = None
-            if event.operation == EventOperation.UPDATE:
-                try:
-                    job_entity = t.cast(Job, core_get(str(event.entity_id)))
-                    self.gui._broadcast(
-                        _GuiCoreContext._CORE_CHANGED_NAME,
-                        {"task": {"id": job_entity.task.id, "status": job_entity.status.name}},
-                    )
-                except Exception as e:
-                    _warn(f"Access to sequence {event.entity_id} failed", e)
         elif event.entity_type == EventEntityType.SUBMISSION:
-            self.scenario_status_callback(event.entity_id)
+            self.submission_status_callback(event.entity_id)
         elif event.entity_type == EventEntityType.DATA_NODE:
             with self.lock:
                 self.data_nodes_by_owner = None
@@ -155,7 +146,7 @@ class _GuiCoreContext(CoreEventConsumerBase):
             {"scenario": scenario_id or True},
         )
 
-    def scenario_status_callback(self, submission_id: t.Optional[str]):
+    def submission_status_callback(self, submission_id: t.Optional[str]):
         if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
             return
         try:
@@ -167,25 +158,35 @@ class _GuiCoreContext(CoreEventConsumerBase):
             if not submission or not submission.entity_id:
                 return
 
-            entity = core_get(submission.entity_id)
-            if not entity:
-                return
-
             new_status = submission.submission_status
-            if last_status != new_status:
-                # callback
-                submission_name = submission.properties.get("on_submission")
-                if not submission_name:
-                    return
-                submission_fn = self.gui._get_user_function(submission_name)
-                if not callable(submission_fn):
-                    return
-                self.gui._call_user_callback(
-                    submission.properties.get("client_id"),
-                    submission_fn,
-                    [entity, {"submission_status": new_status.name}],
-                    submission.properties.get("module_context"),
-                )
+
+            client_id = submission.properties.get("client_id")
+            if client_id:
+                running_tasks = {}
+                for job in submission.jobs:
+                    job = job if isinstance(job, Job) else core_get(job)
+                    running_tasks[job.task.id] = (
+                        SubmissionStatus.RUNNING.value
+                        if job.is_running()
+                        else SubmissionStatus.PENDING.value
+                        if job.is_pending()
+                        else None
+                    )
+                self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, {"tasks": running_tasks}, client_id)
+
+                if last_status != new_status:
+                    # callback
+                    submission_name = submission.properties.get("on_submission")
+                    if submission_name:
+                        submission_fn = self.gui._get_user_function(submission_name)
+                        if callable(submission_fn):
+                            self.gui._call_user_callback(
+                                submission.properties.get("client_id"),
+                                submission_fn,
+                                [core_get(submission.entity_id), {"submission_status": new_status.name}],
+                                submission.properties.get("module_context"),
+                            )
+
             with self.submissions_lock:
                 if new_status in (
                     SubmissionStatus.COMPLETED,
@@ -455,8 +456,9 @@ class _GuiCoreContext(CoreEventConsumerBase):
                     with self.submissions_lock:
                         self.client_submission[submission_entity.id] = submission_entity.submission_status
                     if Config.core.mode == "development":
-                        self.client_submission[submission_entity.id] = SubmissionStatus.SUBMITTED
-                        self.scenario_status_callback(submission_entity.id)
+                        with self.submissions_lock:
+                            self.client_submission[submission_entity.id] = SubmissionStatus.SUBMITTED
+                        self.submission_status_callback(submission_entity.id)
                 state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, "")
             except Exception as e:
                 state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, f"Error submitting entity. {e}")

+ 12 - 7
tests/gui_core/test_context_is_readable.py

@@ -15,7 +15,7 @@ from unittest.mock import Mock, patch
 from taipy.config.common.scope import Scope
 from taipy.core import Job, JobId, Scenario, Task
 from taipy.core.data.pickle import PickleDataNode
-from taipy.core.submission.submission import Submission
+from taipy.core.submission.submission import Submission, SubmissionStatus
 from taipy.gui import Gui
 from taipy.gui_core._context import _GuiCoreContext
 
@@ -24,7 +24,12 @@ a_task = Task("task_config_id", {}, print)
 a_job = Job(t.cast(JobId, "JOB_job_id"), a_task, "submit_id", a_scenario.id)
 a_job.isfinished = lambda s: True  # type: ignore[attr-defined]
 a_datanode = PickleDataNode("data_node_config_id", Scope.SCENARIO)
-a_submission = Submission(a_scenario.id, "Scenario", a_scenario.config_id)
+a_submission = Submission(
+    a_scenario.id,
+    "Scenario",
+    a_scenario.config_id,
+    properties={"client_id": "client_id", "on_submission": "on_submission"},
+)
 
 
 def mock_is_readable_false(entity_id):
@@ -142,7 +147,7 @@ class TestGuiCoreContext_is_readable:
                 assert assign.call_args.args[0] == "gui_core_sv_error"
                 assert str(assign.call_args.args[1]).endswith("is not readable.")
 
-    def test_scenario_status_callback(self):
+    def test_submission_status_callback(self):
         with patch("taipy.gui_core._context.core_get", side_effect=mock_core_get) as mockget:
             mockget.reset_mock()
             gui_core_context = _GuiCoreContext(Mock())
@@ -150,8 +155,8 @@ class TestGuiCoreContext_is_readable:
             def sub_cb():
                 return True
 
-            gui_core_context.client_submission[a_submission.id] = a_submission.submission_status
-            gui_core_context.scenario_status_callback(a_submission.id)
+            gui_core_context.client_submission[a_submission.id] = SubmissionStatus.UNDEFINED
+            gui_core_context.submission_status_callback(a_submission.id)
             mockget.assert_called()
             found = False
             for call in mockget.call_args_list:
@@ -162,7 +167,7 @@ class TestGuiCoreContext_is_readable:
             mockget.reset_mock()
 
             with patch("taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
-                gui_core_context.scenario_status_callback(a_submission.id)
+                gui_core_context.submission_status_callback(a_submission.id)
                 mockget.assert_not_called()
 
     def test_data_node_adapter(self):
@@ -323,7 +328,7 @@ class TestGuiCoreContext_is_readable:
             mockget.reset_mock()
 
             with patch("taipy.gui_core._context.is_readable", side_effect=mock_is_readable_false):
-                gui_core_context.scenario_status_callback(a_scenario.id)
+                gui_core_context.submission_status_callback(a_scenario.id)
                 mockget.assert_not_called()
 
     def test_update_data(self):