Browse Source

submission status callback (#211) (#300)

* #211 submission status callback

* doc

* fab's comment

* format

* JR comment

* add unit tests and fix pathological cases

* add new line at the end of the file

* fab's comment about doc

* format

* test no job case

* remove twice the same test

* format

* fix import

---------

Co-authored-by: Fred Lefévère-Laoide <Fred.Lefevere-Laoide@Taipy.io>
Co-authored-by: jrobinAV <88036007+jrobinAV@users.noreply.github.com>
Fred Lefévère-Laoide 1 year ago
parent
commit
dec9a4f978

+ 5 - 4
gui/src/ScenarioViewer.tsx

@@ -77,6 +77,7 @@ interface ScenarioViewerProps {
     libClassName?: string;
     className?: string;
     dynamicClassName?: string;
+    onSubmissionChange?: string;
 }
 
 interface SequencesRowProps {
@@ -278,19 +279,19 @@ const ScenarioViewer = (props: ScenarioViewerProps) => {
     // submits
     const submitSequence = useCallback(
         (sequenceId: string) => {
-            dispatch(createSendActionNameAction(id, module, props.onSubmit, { id: sequenceId }));
+            dispatch(createSendActionNameAction(id, module, props.onSubmit, { id: sequenceId, on_submission_change: props.onSubmissionChange }));
         },
-        [props.onSubmit, id, dispatch, module]
+        [props.onSubmit, props.onSubmissionChange, id, dispatch, module]
     );
 
     const submitScenario = useCallback(
         (e: React.MouseEvent<HTMLElement>) => {
             e.stopPropagation();
             if (isScenario) {
-                dispatch(createSendActionNameAction(id, module, props.onSubmit, { id: scId }));
+                dispatch(createSendActionNameAction(id, module, props.onSubmit, { id: scId, on_submission_change: props.onSubmissionChange }));
             }
         },
-        [isScenario, props.onSubmit, id, scId, dispatch, module]
+        [isScenario, props.onSubmit, props.onSubmissionChange, id, scId, dispatch, module]
     );
 
     // focus

+ 1 - 0
src/taipy/gui_core/GuiCoreLib.py

@@ -73,6 +73,7 @@ class _GuiCore(ElementLibrary):
                 "show_sequences": ElementProperty(PropertyType.boolean, True),
                 "show_submit_sequences": ElementProperty(PropertyType.boolean, True),
                 "class_name": ElementProperty(PropertyType.dynamic_string),
+                "on_submission_change": ElementProperty(PropertyType.function),
             },
             inner_properties={
                 "on_edit": ElementProperty(PropertyType.function, f"{{{__CTX_VAR_NAME}.edit_entity}}"),

+ 152 - 2
src/taipy/gui_core/_context.py

@@ -12,6 +12,7 @@
 import json
 import typing as t
 from collections import defaultdict
+from enum import Enum
 from numbers import Number
 from threading import Lock
 
@@ -41,6 +42,39 @@ from taipy.gui.gui import _DoNotUpdate
 from ._adapters import _EntityType
 
 
+class _SubmissionStatus(Enum):
+    SUBMITTED = 0
+    COMPLETED = 1
+    CANCELED = 2
+    FAILED = 3
+    BLOCKED = 4
+    WAITING = 5
+    RUNNING = 6
+    UNDEFINED = 7
+
+
+class _SubmissionDetails:
+    def __init__(
+        self,
+        client_id: str,
+        module_context: str,
+        callback: str,
+        entity_id: str,
+        status: _SubmissionStatus,
+        jobs: t.List[Job],
+    ) -> None:
+        self.client_id = client_id
+        self.module_context = module_context
+        self.callback = callback
+        self.entity_id = entity_id
+        self.status = status
+        self.jobs = jobs
+
+    def set_status(self, status: _SubmissionStatus):
+        self.status = status
+        return self
+
+
 class _GuiCoreContext(CoreEventConsumerBase):
     __PROP_ENTITY_ID = "id"
     __PROP_ENTITY_COMMENT = "comment"
@@ -69,9 +103,12 @@ class _GuiCoreContext(CoreEventConsumerBase):
         self.data_nodes_by_owner: t.Optional[t.Dict[t.Optional[str], DataNode]] = None
         self.scenario_configs: t.Optional[t.List[t.Tuple[str, str]]] = None
         self.jobs_list: t.Optional[t.List[Job]] = None
+        self.client_jobs_by_submission: t.Dict[str, _SubmissionDetails] = dict()
         # register to taipy core notification
         reg_id, reg_queue = Notifier.register()
+        # locks
         self.lock = Lock()
+        self.submissions_lock = Lock()
         super().__init__(reg_id, reg_queue)
         self.start()
 
@@ -95,6 +132,8 @@ class _GuiCoreContext(CoreEventConsumerBase):
         elif event.entity_type == EventEntityType.JOB:
             with self.lock:
                 self.jobs_list = None
+            if event.entity_id:
+                self.scenario_status_callback(event.entity_id)
             self.gui.broadcast(_GuiCoreContext._CORE_CHANGED_NAME, {"jobs": True})
         elif event.entity_type == EventEntityType.DATA_NODE:
             with self.lock:
@@ -262,7 +301,7 @@ class _GuiCoreContext(CoreEventConsumerBase):
             except Exception as e:
                 state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, f"Error updating Scenario. {e}")
 
-    def submit_entity(self, state: State, id: str, action: str, payload: t.Dict[str, str]):
+    def submit_entity(self, state: State, id: str, submission_cb: str, payload: t.Dict[str, str]):
         args = payload.get("args")
         if args is None or not isinstance(args, list) or len(args) < 1 or not isinstance(args[0], dict):
             return
@@ -271,11 +310,122 @@ class _GuiCoreContext(CoreEventConsumerBase):
         entity = core_get(entity_id)
         if entity:
             try:
-                core_submit(entity)
+                jobs = core_submit(entity)
+                if submission_cb := data.get("on_submission_change"):
+                    if callable(self.gui._get_user_function(submission_cb)):
+                        job_ids = [j.id for j in (jobs if isinstance(jobs, list) else [jobs])]
+                        client_id = self.gui._get_client_id()
+                        module_context = self.gui._get_locals_context()
+                        sub_id = jobs[0].submit_id if isinstance(jobs, list) else jobs.submit_id
+                        with self.submissions_lock:
+                            self.client_jobs_by_submission[sub_id] = _SubmissionDetails(
+                                client_id,
+                                module_context,
+                                submission_cb,
+                                entity_id,
+                                _SubmissionStatus.SUBMITTED,
+                                job_ids,
+                            )
+                        self.scenario_status_callback(jobs[0].id if isinstance(jobs, list) else jobs.id)
+                    else:
+                        _warn(f"on_submission_change(): '{submission_cb}' is not a valid function.")
                 state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, "")
             except Exception as e:
                 state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, f"Error submitting entity. {e}")
 
+    def _get_submittable_status(self, jobs_ids: t.List[str]) -> _SubmissionStatus:
+        abandoned = False
+        canceled = False
+        blocked = False
+        waiting = False
+        running = False
+        completed = False
+        for id in jobs_ids:
+            job = core_get(id)
+            if not job:
+                continue
+            if job.is_failed():
+                return _SubmissionStatus.FAILED
+            if job.is_canceled():
+                canceled = True
+            if job.is_blocked():
+                blocked = True
+                continue
+            if job.is_pending() or job.is_submitted():
+                waiting = True
+                continue
+            if job.is_running():
+                running = True
+                continue
+            if job.is_completed() or job.is_skipped():
+                completed = True
+                continue
+            if job.is_abandoned():
+                abandoned = True
+        if canceled:
+            return _SubmissionStatus.CANCELED
+        if abandoned:
+            return _SubmissionStatus.UNDEFINED
+        if running:
+            return _SubmissionStatus.RUNNING
+        if waiting:
+            return _SubmissionStatus.WAITING
+        if blocked:
+            return _SubmissionStatus.BLOCKED
+        if completed:
+            return _SubmissionStatus.COMPLETED
+        return _SubmissionStatus.UNDEFINED
+
+    def scenario_status_callback(self, job_id: str):
+        if not job_id:
+            return
+        try:
+            job = core_get(job_id)
+            if not job:
+                return
+            sub_id = job.submit_id
+            sub_details = self.client_jobs_by_submission.get(sub_id)
+            if not sub_details:
+                return
+
+            if (
+                not sub_details.callback
+                or not sub_details.client_id
+                or not sub_details.entity_id
+                or not sub_details.jobs
+            ):
+                return
+
+            entity = core_get(sub_details.entity_id)
+            if not entity:
+                return
+
+            submission_function = self.gui._get_user_function(sub_details.callback)
+            if not callable(submission_function):
+                return
+
+            new_status = self._get_submittable_status(sub_details.jobs)
+            if sub_details.status is not new_status:
+                # callback
+                self.gui._call_user_callback(
+                    sub_details.client_id,
+                    submission_function,
+                    [entity, {"submission_status": new_status.name, "job": job}],
+                    sub_details.module_context,
+                )
+            with self.submissions_lock:
+                if new_status in (
+                    _SubmissionStatus.COMPLETED,
+                    _SubmissionStatus.FAILED,
+                    _SubmissionStatus.CANCELED,
+                ):
+                    self.client_jobs_by_submission.pop(sub_id, None)
+                else:
+                    self.client_jobs_by_submission[sub_id] = sub_details.set_status(new_status)
+
+        except Exception as e:
+            _warn(f"Job is not available {e}")
+
     def __do_datanodes_tree(self):
         if self.data_nodes_by_owner is None:
             self.data_nodes_by_owner = defaultdict(list)

+ 6 - 0
src/taipy/gui_core/viselements.json

@@ -149,6 +149,12 @@
                         "type": "bool",
                         "default_value": "True",
                         "doc": "If False, the buttons to submit scenario sequences are not visible."
+                    },
+                    {
+                        "name": "on_submission_change",
+                        "type": "Callback",
+                        "doc": "The name of the function that is triggered when a submission status is changed.<br/><br/>All the parameters of that function are optional:\n<ul>\n<li>state (<code>State^</code>): the state instance.</li>\n<li>submittable (Submittable): the entity (usually a Scenario) that was submitted.</li>\n<li>details (dict): the details on this callback's invocation.<br/>\nThis dictionary has the following keys:\n<ul>\n<li>submission_status (str): the new status of the submission (possible values: SUBMITTED, COMPLETED, CANCELED, FAILED, BLOCKED, WAITING, RUNNING).</li>\n<li>job: the Job (if any) that is at the orgin of the submission status change.</li>\n</ul>",
+                        "signature": [["state", "State"], ["submittable", "Submittable"], ["details", "dict"]]
                     }
                 ]
             }

+ 10 - 0
tests/gui_core/__init__.py

@@ -0,0 +1,10 @@
+# Copyright 2023 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.

+ 222 - 0
tests/gui_core/test_context_submission_status.py

@@ -0,0 +1,222 @@
+from unittest.mock import Mock, patch
+
+import pytest
+
+from src.taipy.gui_core._context import _GuiCoreContext, _SubmissionStatus
+from taipy.core import Status
+
+
+class MockJob:
+    def __init__(self, id: str, status):
+        self.status = status
+        self.id = id
+
+    def is_failed(self):
+        return self.status == Status.FAILED
+
+    def is_canceled(self):
+        return self.status == Status.CANCELED
+
+    def is_blocked(self):
+        return self.status == Status.BLOCKED
+
+    def is_pending(self):
+        return self.status == Status.PENDING
+
+    def is_running(self):
+        return self.status == Status.RUNNING
+
+    def is_completed(self):
+        return self.status == Status.COMPLETED
+
+    def is_skipped(self):
+        return self.status == Status.SKIPPED
+
+    def is_abandoned(self):
+        return self.status == Status.ABANDONED
+
+    def is_submitted(self):
+        return self.status == Status.SUBMITTED
+
+
+def mock_core_get(entity_id):
+    jobs = {
+        "job0_submitted": MockJob("job0_submitted", Status.SUBMITTED),
+        "job1_failed": MockJob("job1_failed", Status.FAILED),
+        "job2_canceled": MockJob("job2_canceled", Status.CANCELED),
+        "job3_blocked": MockJob("job3_blocked", Status.BLOCKED),
+        "job4_pending": MockJob("job4_pending", Status.PENDING),
+        "job5_running": MockJob("job5_running", Status.RUNNING),
+        "job6_completed": MockJob("job6_completed", Status.COMPLETED),
+        "job7_skipped": MockJob("job7_skipped", Status.SKIPPED),
+        "job8_abandoned": MockJob("job8_abandoned", Status.ABANDONED),
+    }
+    return jobs[entity_id]
+
+
+class TestGuiCoreContext_SubmissionStatus:
+    @pytest.mark.parametrize(
+        "job_ids, expected_status",
+        [
+            (["job1_failed"], _SubmissionStatus.FAILED),
+            (["job2_canceled"], _SubmissionStatus.CANCELED),
+            (["job3_blocked"], _SubmissionStatus.BLOCKED),
+            (["job4_pending"], _SubmissionStatus.WAITING),
+            (["job5_running"], _SubmissionStatus.RUNNING),
+            (["job6_completed"], _SubmissionStatus.COMPLETED),
+            (["job7_skipped"], _SubmissionStatus.COMPLETED),
+            (["job8_abandoned"], _SubmissionStatus.UNDEFINED),
+        ],
+    )
+    def test_single_job(self, job_ids, expected_status):
+        with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
+            gui_core_context = _GuiCoreContext(Mock())
+            status = gui_core_context._get_submittable_status(job_ids)
+            assert status == expected_status
+
+    @pytest.mark.parametrize(
+        "job_ids, expected_status",
+        [
+            (["job1_failed", "job1_failed"], _SubmissionStatus.FAILED),
+            (["job1_failed", "job2_canceled"], _SubmissionStatus.FAILED),
+            (["job1_failed", "job3_blocked"], _SubmissionStatus.FAILED),
+            (["job1_failed", "job4_pending"], _SubmissionStatus.FAILED),
+            (["job1_failed", "job5_running"], _SubmissionStatus.FAILED),
+            (["job1_failed", "job6_completed"], _SubmissionStatus.FAILED),
+            (["job1_failed", "job7_skipped"], _SubmissionStatus.FAILED),
+            (["job1_failed", "job8_abandoned"], _SubmissionStatus.FAILED),
+            (["job2_canceled", "job1_failed"], _SubmissionStatus.FAILED),
+            (["job3_blocked", "job1_failed"], _SubmissionStatus.FAILED),
+            (["job4_pending", "job1_failed"], _SubmissionStatus.FAILED),
+            (["job5_running", "job1_failed"], _SubmissionStatus.FAILED),
+            (["job6_completed", "job1_failed"], _SubmissionStatus.FAILED),
+            (["job7_skipped", "job1_failed"], _SubmissionStatus.FAILED),
+            (["job8_abandoned", "job1_failed"], _SubmissionStatus.FAILED),
+        ],
+    )
+    def test_one_failed_job(self, job_ids, expected_status):
+        with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
+            gui_core_context = _GuiCoreContext(Mock())
+            status = gui_core_context._get_submittable_status(job_ids)
+            assert status == expected_status
+
+    @pytest.mark.parametrize(
+        "job_ids, expected_status",
+        [
+            (["job2_canceled", "job2_canceled"], _SubmissionStatus.CANCELED),
+            (["job2_canceled", "job3_blocked"], _SubmissionStatus.CANCELED),
+            (["job2_canceled", "job4_pending"], _SubmissionStatus.CANCELED),
+            (["job2_canceled", "job5_running"], _SubmissionStatus.CANCELED),
+            (["job2_canceled", "job6_completed"], _SubmissionStatus.CANCELED),
+            (["job2_canceled", "job7_skipped"], _SubmissionStatus.CANCELED),
+            (["job2_canceled", "job8_abandoned"], _SubmissionStatus.CANCELED),
+            (["job3_blocked", "job2_canceled"], _SubmissionStatus.CANCELED),
+            (["job4_pending", "job2_canceled"], _SubmissionStatus.CANCELED),
+            (["job5_running", "job2_canceled"], _SubmissionStatus.CANCELED),
+            (["job6_completed", "job2_canceled"], _SubmissionStatus.CANCELED),
+            (["job7_skipped", "job2_canceled"], _SubmissionStatus.CANCELED),
+            (["job8_abandoned", "job2_canceled"], _SubmissionStatus.CANCELED),
+        ],
+    )
+    def test_no_failed_one_cancel(self, job_ids, expected_status):
+        with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
+            gui_core_context = _GuiCoreContext(Mock())
+            status = gui_core_context._get_submittable_status(job_ids)
+            assert status == expected_status
+
+    @pytest.mark.parametrize(
+        "job_ids, expected_status",
+        [
+            (["job4_pending", "job3_blocked"], _SubmissionStatus.WAITING),
+            (["job4_pending", "job4_pending"], _SubmissionStatus.WAITING),
+            (["job4_pending", "job6_completed"], _SubmissionStatus.WAITING),
+            (["job4_pending", "job7_skipped"], _SubmissionStatus.WAITING),
+            (["job3_blocked", "job4_pending"], _SubmissionStatus.WAITING),
+            (["job6_completed", "job4_pending"], _SubmissionStatus.WAITING),
+            (["job7_skipped", "job4_pending"], _SubmissionStatus.WAITING),
+        ],
+    )
+    def test_no_failed_or_cancel_one_pending(self, job_ids, expected_status):
+        with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
+            gui_core_context = _GuiCoreContext(Mock())
+            status = gui_core_context._get_submittable_status(job_ids)
+            assert status == expected_status
+
+    @pytest.mark.parametrize(
+        "job_ids, expected_status",
+        [
+            (["job5_running", "job3_blocked"], _SubmissionStatus.RUNNING),
+            (["job5_running", "job4_pending"], _SubmissionStatus.RUNNING),
+            (["job5_running", "job5_running"], _SubmissionStatus.RUNNING),
+            (["job5_running", "job6_completed"], _SubmissionStatus.RUNNING),
+            (["job5_running", "job7_skipped"], _SubmissionStatus.RUNNING),
+            (["job3_blocked", "job5_running"], _SubmissionStatus.RUNNING),
+            (["job4_pending", "job5_running"], _SubmissionStatus.RUNNING),
+            (["job6_completed", "job5_running"], _SubmissionStatus.RUNNING),
+            (["job7_skipped", "job5_running"], _SubmissionStatus.RUNNING),
+        ],
+    )
+    def test_no_failed_cancel_nor_pending_one_running(self, job_ids, expected_status):
+        with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
+            gui_core_context = _GuiCoreContext(Mock())
+            status = gui_core_context._get_submittable_status(job_ids)
+            assert status == expected_status
+
+    @pytest.mark.parametrize(
+        "job_ids, expected_status",
+        [
+            (["job3_blocked", "job3_blocked"], _SubmissionStatus.BLOCKED),
+            (["job3_blocked", "job6_completed"], _SubmissionStatus.BLOCKED),
+            (["job3_blocked", "job7_skipped"], _SubmissionStatus.BLOCKED),
+            (["job6_completed", "job3_blocked"], _SubmissionStatus.BLOCKED),
+            (["job7_skipped", "job3_blocked"], _SubmissionStatus.BLOCKED),
+        ],
+    )
+    def test_no_failed_cancel_pending_nor_running_one_blocked(self, job_ids, expected_status):
+        with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
+            gui_core_context = _GuiCoreContext(Mock())
+            status = gui_core_context._get_submittable_status(job_ids)
+            assert status == expected_status
+
+    @pytest.mark.parametrize(
+        "job_ids, expected_status",
+        [
+            (["job6_completed", "job6_completed"], _SubmissionStatus.COMPLETED),
+            (["job6_completed", "job7_skipped"], _SubmissionStatus.COMPLETED),
+            (["job7_skipped", "job6_completed"], _SubmissionStatus.COMPLETED),
+            (["job7_skipped", "job7_skipped"], _SubmissionStatus.COMPLETED),
+        ],
+    )
+    def test_only_completed_or_skipped(self, job_ids, expected_status):
+        with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
+            gui_core_context = _GuiCoreContext(Mock())
+            status = gui_core_context._get_submittable_status(job_ids)
+            assert status == expected_status
+
+    @pytest.mark.parametrize(
+        "job_ids, expected_status",
+        [
+            (["job3_blocked", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
+            (["job4_pending", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
+            (["job5_running", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
+            (["job6_completed", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
+            (["job7_skipped", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
+            (["job8_abandoned", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
+            (["job8_abandoned", "job3_blocked"], _SubmissionStatus.UNDEFINED),
+            (["job8_abandoned", "job4_pending"], _SubmissionStatus.UNDEFINED),
+            (["job8_abandoned", "job5_running"], _SubmissionStatus.UNDEFINED),
+            (["job8_abandoned", "job6_completed"], _SubmissionStatus.UNDEFINED),
+            (["job8_abandoned", "job7_skipped"], _SubmissionStatus.UNDEFINED),
+        ],
+    )
+    def test_WRONG_CASE_abandoned_without_cancel_or_failed(self, job_ids, expected_status):
+        with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
+            gui_core_context = _GuiCoreContext(Mock())
+            status = gui_core_context._get_submittable_status(job_ids)
+            assert status == expected_status
+
+    def test_no_job(self):
+        with patch("src.taipy.gui_core._context.core_get", side_effect=mock_core_get):
+            gui_core_context = _GuiCoreContext(Mock())
+            status = gui_core_context._get_submittable_status([])
+            assert status == _SubmissionStatus.UNDEFINED