Explorar o código

migrate job status update for scenario status to using submission entity

Toan Quach hai 1 ano
pai
achega
e35c487e0c
Modificáronse 2 ficheiros con 30 adicións e 336 borrados
  1. 30 103
      taipy/gui_core/_context.py
  2. 0 233
      tests/gui_core/test_context_submission_status.py

+ 30 - 103
taipy/gui_core/_context.py

@@ -12,7 +12,6 @@
 import json
 import typing as t
 from collections import defaultdict
-from enum import Enum
 from numbers import Number
 from threading import Lock
 
@@ -23,6 +22,7 @@ except ImportError:
 
 import pandas as pd
 from dateutil import parser
+
 from taipy.config import Config
 from taipy.core import Cycle, DataNode, Job, Scenario, Sequence, cancel_job, create_scenario
 from taipy.core import delete as core_delete
@@ -31,7 +31,6 @@ from taipy.core import get as core_get
 from taipy.core import (
     get_cycles_scenarios,
     get_data_nodes,
-    get_jobs,
     is_deletable,
     is_editable,
     is_promotable,
@@ -44,6 +43,8 @@ from taipy.core.data._abstract_tabular import _AbstractTabularDataNode
 from taipy.core.notification import CoreEventConsumerBase, EventEntityType
 from taipy.core.notification.event import Event, EventOperation
 from taipy.core.notification.notifier import Notifier
+from taipy.core.submission.submission import Submission
+from taipy.core.submission.submission_status import SubmissionStatus
 from taipy.gui import Gui, State
 from taipy.gui._warnings import _warn
 from taipy.gui.gui import _DoNotUpdate
@@ -51,36 +52,21 @@ from taipy.gui.gui import _DoNotUpdate
 from ._adapters import _EntityType
 
 
-class _SubmissionStatus(Enum):
-    SUBMITTED = 0
-    COMPLETED = 1
-    CANCELED = 2
-    FAILED = 3
-    BLOCKED = 4
-    WAITING = 5
-    RUNNING = 6
-    UNDEFINED = 7
-
-
 class _SubmissionDetails:
     def __init__(
         self,
         client_id: str,
         module_context: str,
         callback: t.Callable,
-        entity_id: str,
-        status: _SubmissionStatus,
-        jobs: t.List[Job],
+        submission: Submission,
     ) -> None:
         self.client_id = client_id
         self.module_context = module_context
         self.callback = callback
-        self.entity_id = entity_id
-        self.status = status
-        self.jobs = jobs
+        self.submission = submission
 
-    def set_status(self, status: _SubmissionStatus):
-        self.status = status
+    def set_submission(self, submission: Submission):
+        self.submission = submission
         return self
 
 
@@ -112,7 +98,7 @@ class _GuiCoreContext(CoreEventConsumerBase):
         self.data_nodes_by_owner: t.Optional[t.Dict[t.Optional[str], DataNode]] = None
         self.scenario_configs: t.Optional[t.List[t.Tuple[str, str]]] = None
         self.jobs_list: t.Optional[t.List[Job]] = None
-        self.client_jobs_by_submission: t.Dict[str, _SubmissionDetails] = dict()
+        self.client_submission: t.Dict[str, _SubmissionDetails] = dict()
         # register to taipy core notification
         reg_id, reg_queue = Notifier.register()
         # locks
@@ -144,9 +130,7 @@ class _GuiCoreContext(CoreEventConsumerBase):
                     )
             except Exception as e:
                 _warn(f"Access to sequence {event.entity_id} failed", e)
-        elif event.entity_type == EventEntityType.JOB:
-            with self.lock:
-                self.jobs_list = None
+        elif event.entity_type == EventEntityType.SUBMISSION:
             self.scenario_status_callback(event.entity_id)
         elif event.entity_type == EventEntityType.DATA_NODE:
             with self.lock:
@@ -165,50 +149,45 @@ class _GuiCoreContext(CoreEventConsumerBase):
             {"scenario": scenario_id or True},
         )
 
-    def scenario_status_callback(self, job_id: str, is_submission: t.Optional[bool] = False):
-        if not job_id or not (is_submission or is_readable(job_id)):
+    def scenario_status_callback(self, submission_id: str, is_submission: t.Optional[bool] = False):
+        if not submission_id or not (is_submission or is_readable(submission_id)):
             return
         try:
             if is_submission:
-                sub_id = job_id
-                job = None
+                sub_id = submission_id
+                submission = None
             else:
-                job = core_get(job_id)
-                if not job:
-                    return
-                sub_id = job.submit_id
-            sub_details = self.client_jobs_by_submission.get(sub_id)
-            if not sub_details:
-                return
+                submission: Submission = core_get(submission_id)
+            sub_details: t.Optional[_SubmissionDetails] = self.client_submission.get(sub_id)
 
-            if not sub_details.client_id or not sub_details.entity_id or not sub_details.jobs:
+            if not submission or not submission.entity_id or sub_details:
                 return
 
-            entity = core_get(sub_details.entity_id)
+            entity = core_get(submission.entity_id)
             if not entity:
                 return
 
-            new_status = self._get_submittable_status(sub_details.jobs)
-            if sub_details.status != new_status:
+            new_status = submission.submission_status
+            if sub_details.submission.submission_status != new_status:
                 # callback
                 self.gui._call_user_callback(
                     sub_details.client_id,
                     sub_details.callback,
-                    [entity, {"submission_status": new_status.name, "job": job}],
+                    [entity, {"submission_status": new_status.name}],
                     sub_details.module_context,
                 )
             with self.submissions_lock:
                 if new_status in (
-                    _SubmissionStatus.COMPLETED,
-                    _SubmissionStatus.FAILED,
-                    _SubmissionStatus.CANCELED,
+                    SubmissionStatus.COMPLETED,
+                    SubmissionStatus.FAILED,
+                    SubmissionStatus.CANCELED,
                 ):
-                    self.client_jobs_by_submission.pop(sub_id, None)
+                    self.client_submission.pop(sub_id, None)
                 else:
-                    self.client_jobs_by_submission[sub_id] = sub_details.set_status(new_status)
+                    self.client_submission[sub_id] = sub_details.set_submission(new_status)
 
         except Exception as e:
-            _warn(f"Job ({job_id}) is not available", e)
+            _warn(f"Job ({submission_id}) is not available", e)
 
         finally:
             self.gui._broadcast(_GuiCoreContext._CORE_CHANGED_NAME, {"jobs": True})
@@ -441,69 +420,23 @@ class _GuiCoreContext(CoreEventConsumerBase):
                 if submission_cb := data.get("on_submission_change"):
                     submission_fn = self.gui._get_user_function(submission_cb)
                     if callable(submission_fn):
-                        job_ids = [j.id for j in (jobs if isinstance(jobs, list) else [jobs])]
+                        submission_entity = core_get(jobs[0].submit_id if isinstance(jobs, list) else jobs.submit_id)
                         client_id = self.gui._get_client_id()
                         module_context = self.gui._get_locals_context()
-                        sub_id = jobs[0].submit_id if isinstance(jobs, list) else jobs.submit_id
                         with self.submissions_lock:
-                            self.client_jobs_by_submission[sub_id] = _SubmissionDetails(
+                            self.client_submission[submission_entity.submit_entity_id] = _SubmissionDetails(
                                 client_id,
                                 module_context,
                                 submission_fn,
-                                entity_id,
-                                _SubmissionStatus.SUBMITTED,
-                                job_ids,
+                                submission_entity,
                             )
                     else:
                         _warn(f"on_submission_change(): '{submission_cb}' is not a valid function.")
-                self.scenario_status_callback(jobs[0].id if len(jobs) else "" if isinstance(jobs, list) else jobs.id)
+                self.scenario_status_callback(submission_entity.id)
                 state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, "")
             except Exception as e:
                 state.assign(_GuiCoreContext._SCENARIO_VIZ_ERROR_VAR, f"Error submitting entity. {e}")
 
-    def _get_submittable_status(self, jobs_ids: t.List[str]) -> _SubmissionStatus:
-        abandoned = False
-        canceled = False
-        blocked = False
-        waiting = False
-        running = False
-        completed = False
-        for id in jobs_ids:
-            job = core_get(id)
-            if not job:
-                continue
-            if job.is_failed():
-                return _SubmissionStatus.FAILED
-            if job.is_canceled():
-                canceled = True
-            if job.is_blocked():
-                blocked = True
-                continue
-            if job.is_pending() or job.is_submitted():
-                waiting = True
-                continue
-            if job.is_running():
-                running = True
-                continue
-            if job.is_completed() or job.is_skipped():
-                completed = True
-                continue
-            if job.is_abandoned():
-                abandoned = True
-        if canceled:
-            return _SubmissionStatus.CANCELED
-        if abandoned:
-            return _SubmissionStatus.UNDEFINED
-        if running:
-            return _SubmissionStatus.RUNNING
-        if waiting:
-            return _SubmissionStatus.WAITING
-        if blocked:
-            return _SubmissionStatus.BLOCKED
-        if completed:
-            return _SubmissionStatus.COMPLETED
-        return _SubmissionStatus.UNDEFINED
-
     def __do_datanodes_tree(self):
         if self.data_nodes_by_owner is None:
             self.data_nodes_by_owner = defaultdict(list)
@@ -557,12 +490,6 @@ class _GuiCoreContext(CoreEventConsumerBase):
 
         return None
 
-    def get_jobs_list(self):
-        with self.lock:
-            if self.jobs_list is None:
-                self.jobs_list = get_jobs()
-            return self.jobs_list
-
     def job_adapter(self, job):
         try:
             if hasattr(job, "id") and is_readable(job.id) and core_get(job.id) is not None:

+ 0 - 233
tests/gui_core/test_context_submission_status.py

@@ -1,233 +0,0 @@
-# Copyright 2023 Avaiga Private Limited
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-#        http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
-# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations under the License.
-
-from unittest.mock import Mock, patch
-
-import pytest
-
-from taipy.core import Status
-from taipy.gui_core._context import _GuiCoreContext, _SubmissionStatus
-
-
-class MockJob:
-    def __init__(self, id: str, status):
-        self.status = status
-        self.id = id
-
-    def is_failed(self):
-        return self.status == Status.FAILED
-
-    def is_canceled(self):
-        return self.status == Status.CANCELED
-
-    def is_blocked(self):
-        return self.status == Status.BLOCKED
-
-    def is_pending(self):
-        return self.status == Status.PENDING
-
-    def is_running(self):
-        return self.status == Status.RUNNING
-
-    def is_completed(self):
-        return self.status == Status.COMPLETED
-
-    def is_skipped(self):
-        return self.status == Status.SKIPPED
-
-    def is_abandoned(self):
-        return self.status == Status.ABANDONED
-
-    def is_submitted(self):
-        return self.status == Status.SUBMITTED
-
-
-def mock_core_get(entity_id):
-    jobs = {
-        "job0_submitted": MockJob("job0_submitted", Status.SUBMITTED),
-        "job1_failed": MockJob("job1_failed", Status.FAILED),
-        "job2_canceled": MockJob("job2_canceled", Status.CANCELED),
-        "job3_blocked": MockJob("job3_blocked", Status.BLOCKED),
-        "job4_pending": MockJob("job4_pending", Status.PENDING),
-        "job5_running": MockJob("job5_running", Status.RUNNING),
-        "job6_completed": MockJob("job6_completed", Status.COMPLETED),
-        "job7_skipped": MockJob("job7_skipped", Status.SKIPPED),
-        "job8_abandoned": MockJob("job8_abandoned", Status.ABANDONED),
-    }
-    return jobs[entity_id]
-
-
-class TestGuiCoreContext_SubmissionStatus:
-    @pytest.mark.parametrize(
-        "job_ids, expected_status",
-        [
-            (["job1_failed"], _SubmissionStatus.FAILED),
-            (["job2_canceled"], _SubmissionStatus.CANCELED),
-            (["job3_blocked"], _SubmissionStatus.BLOCKED),
-            (["job4_pending"], _SubmissionStatus.WAITING),
-            (["job5_running"], _SubmissionStatus.RUNNING),
-            (["job6_completed"], _SubmissionStatus.COMPLETED),
-            (["job7_skipped"], _SubmissionStatus.COMPLETED),
-            (["job8_abandoned"], _SubmissionStatus.UNDEFINED),
-        ],
-    )
-    def test_single_job(self, job_ids, expected_status):
-        with patch("taipy.gui_core._context.core_get", side_effect=mock_core_get):
-            gui_core_context = _GuiCoreContext(Mock())
-            status = gui_core_context._get_submittable_status(job_ids)
-            assert status == expected_status
-
-    @pytest.mark.parametrize(
-        "job_ids, expected_status",
-        [
-            (["job1_failed", "job1_failed"], _SubmissionStatus.FAILED),
-            (["job1_failed", "job2_canceled"], _SubmissionStatus.FAILED),
-            (["job1_failed", "job3_blocked"], _SubmissionStatus.FAILED),
-            (["job1_failed", "job4_pending"], _SubmissionStatus.FAILED),
-            (["job1_failed", "job5_running"], _SubmissionStatus.FAILED),
-            (["job1_failed", "job6_completed"], _SubmissionStatus.FAILED),
-            (["job1_failed", "job7_skipped"], _SubmissionStatus.FAILED),
-            (["job1_failed", "job8_abandoned"], _SubmissionStatus.FAILED),
-            (["job2_canceled", "job1_failed"], _SubmissionStatus.FAILED),
-            (["job3_blocked", "job1_failed"], _SubmissionStatus.FAILED),
-            (["job4_pending", "job1_failed"], _SubmissionStatus.FAILED),
-            (["job5_running", "job1_failed"], _SubmissionStatus.FAILED),
-            (["job6_completed", "job1_failed"], _SubmissionStatus.FAILED),
-            (["job7_skipped", "job1_failed"], _SubmissionStatus.FAILED),
-            (["job8_abandoned", "job1_failed"], _SubmissionStatus.FAILED),
-        ],
-    )
-    def test_one_failed_job(self, job_ids, expected_status):
-        with patch("taipy.gui_core._context.core_get", side_effect=mock_core_get):
-            gui_core_context = _GuiCoreContext(Mock())
-            status = gui_core_context._get_submittable_status(job_ids)
-            assert status == expected_status
-
-    @pytest.mark.parametrize(
-        "job_ids, expected_status",
-        [
-            (["job2_canceled", "job2_canceled"], _SubmissionStatus.CANCELED),
-            (["job2_canceled", "job3_blocked"], _SubmissionStatus.CANCELED),
-            (["job2_canceled", "job4_pending"], _SubmissionStatus.CANCELED),
-            (["job2_canceled", "job5_running"], _SubmissionStatus.CANCELED),
-            (["job2_canceled", "job6_completed"], _SubmissionStatus.CANCELED),
-            (["job2_canceled", "job7_skipped"], _SubmissionStatus.CANCELED),
-            (["job2_canceled", "job8_abandoned"], _SubmissionStatus.CANCELED),
-            (["job3_blocked", "job2_canceled"], _SubmissionStatus.CANCELED),
-            (["job4_pending", "job2_canceled"], _SubmissionStatus.CANCELED),
-            (["job5_running", "job2_canceled"], _SubmissionStatus.CANCELED),
-            (["job6_completed", "job2_canceled"], _SubmissionStatus.CANCELED),
-            (["job7_skipped", "job2_canceled"], _SubmissionStatus.CANCELED),
-            (["job8_abandoned", "job2_canceled"], _SubmissionStatus.CANCELED),
-        ],
-    )
-    def test_no_failed_one_cancel(self, job_ids, expected_status):
-        with patch("taipy.gui_core._context.core_get", side_effect=mock_core_get):
-            gui_core_context = _GuiCoreContext(Mock())
-            status = gui_core_context._get_submittable_status(job_ids)
-            assert status == expected_status
-
-    @pytest.mark.parametrize(
-        "job_ids, expected_status",
-        [
-            (["job4_pending", "job3_blocked"], _SubmissionStatus.WAITING),
-            (["job4_pending", "job4_pending"], _SubmissionStatus.WAITING),
-            (["job4_pending", "job6_completed"], _SubmissionStatus.WAITING),
-            (["job4_pending", "job7_skipped"], _SubmissionStatus.WAITING),
-            (["job3_blocked", "job4_pending"], _SubmissionStatus.WAITING),
-            (["job6_completed", "job4_pending"], _SubmissionStatus.WAITING),
-            (["job7_skipped", "job4_pending"], _SubmissionStatus.WAITING),
-        ],
-    )
-    def test_no_failed_or_cancel_one_pending(self, job_ids, expected_status):
-        with patch("taipy.gui_core._context.core_get", side_effect=mock_core_get):
-            gui_core_context = _GuiCoreContext(Mock())
-            status = gui_core_context._get_submittable_status(job_ids)
-            assert status == expected_status
-
-    @pytest.mark.parametrize(
-        "job_ids, expected_status",
-        [
-            (["job5_running", "job3_blocked"], _SubmissionStatus.RUNNING),
-            (["job5_running", "job4_pending"], _SubmissionStatus.RUNNING),
-            (["job5_running", "job5_running"], _SubmissionStatus.RUNNING),
-            (["job5_running", "job6_completed"], _SubmissionStatus.RUNNING),
-            (["job5_running", "job7_skipped"], _SubmissionStatus.RUNNING),
-            (["job3_blocked", "job5_running"], _SubmissionStatus.RUNNING),
-            (["job4_pending", "job5_running"], _SubmissionStatus.RUNNING),
-            (["job6_completed", "job5_running"], _SubmissionStatus.RUNNING),
-            (["job7_skipped", "job5_running"], _SubmissionStatus.RUNNING),
-        ],
-    )
-    def test_no_failed_cancel_nor_pending_one_running(self, job_ids, expected_status):
-        with patch("taipy.gui_core._context.core_get", side_effect=mock_core_get):
-            gui_core_context = _GuiCoreContext(Mock())
-            status = gui_core_context._get_submittable_status(job_ids)
-            assert status == expected_status
-
-    @pytest.mark.parametrize(
-        "job_ids, expected_status",
-        [
-            (["job3_blocked", "job3_blocked"], _SubmissionStatus.BLOCKED),
-            (["job3_blocked", "job6_completed"], _SubmissionStatus.BLOCKED),
-            (["job3_blocked", "job7_skipped"], _SubmissionStatus.BLOCKED),
-            (["job6_completed", "job3_blocked"], _SubmissionStatus.BLOCKED),
-            (["job7_skipped", "job3_blocked"], _SubmissionStatus.BLOCKED),
-        ],
-    )
-    def test_no_failed_cancel_pending_nor_running_one_blocked(self, job_ids, expected_status):
-        with patch("taipy.gui_core._context.core_get", side_effect=mock_core_get):
-            gui_core_context = _GuiCoreContext(Mock())
-            status = gui_core_context._get_submittable_status(job_ids)
-            assert status == expected_status
-
-    @pytest.mark.parametrize(
-        "job_ids, expected_status",
-        [
-            (["job6_completed", "job6_completed"], _SubmissionStatus.COMPLETED),
-            (["job6_completed", "job7_skipped"], _SubmissionStatus.COMPLETED),
-            (["job7_skipped", "job6_completed"], _SubmissionStatus.COMPLETED),
-            (["job7_skipped", "job7_skipped"], _SubmissionStatus.COMPLETED),
-        ],
-    )
-    def test_only_completed_or_skipped(self, job_ids, expected_status):
-        with patch("taipy.gui_core._context.core_get", side_effect=mock_core_get):
-            gui_core_context = _GuiCoreContext(Mock())
-            status = gui_core_context._get_submittable_status(job_ids)
-            assert status == expected_status
-
-    @pytest.mark.parametrize(
-        "job_ids, expected_status",
-        [
-            (["job3_blocked", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
-            (["job4_pending", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
-            (["job5_running", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
-            (["job6_completed", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
-            (["job7_skipped", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
-            (["job8_abandoned", "job8_abandoned"], _SubmissionStatus.UNDEFINED),
-            (["job8_abandoned", "job3_blocked"], _SubmissionStatus.UNDEFINED),
-            (["job8_abandoned", "job4_pending"], _SubmissionStatus.UNDEFINED),
-            (["job8_abandoned", "job5_running"], _SubmissionStatus.UNDEFINED),
-            (["job8_abandoned", "job6_completed"], _SubmissionStatus.UNDEFINED),
-            (["job8_abandoned", "job7_skipped"], _SubmissionStatus.UNDEFINED),
-        ],
-    )
-    def test_WRONG_CASE_abandoned_without_cancel_or_failed(self, job_ids, expected_status):
-        with patch("taipy.gui_core._context.core_get", side_effect=mock_core_get):
-            gui_core_context = _GuiCoreContext(Mock())
-            status = gui_core_context._get_submittable_status(job_ids)
-            assert status == expected_status
-
-    def test_no_job(self):
-        with patch("taipy.gui_core._context.core_get", side_effect=mock_core_get):
-            gui_core_context = _GuiCoreContext(Mock())
-            status = gui_core_context._get_submittable_status([])
-            assert status == _SubmissionStatus.UNDEFINED