浏览代码

Merge pull request #1277 from Avaiga/feature/#1197-add-job-to-on-submission-change-event

feature/#1197 pass job as metadata for event
Toan Quach 11 月之前
父节点
当前提交
d490ef157f

+ 1 - 3
taipy/core/_entity/_reload.py

@@ -104,6 +104,4 @@ def _get_manager(manager: str) -> _Manager:
         "job": _JobManagerFactory._build_manager(),
         "task": _TaskManagerFactory._build_manager(),
         "submission": _SubmissionManagerFactory._build_manager(),
-    }[
-        manager
-    ]  # type: ignore
+    }[manager]  # type: ignore

+ 4 - 3
taipy/core/_orchestrator/_orchestrator.py

@@ -165,10 +165,11 @@ class _Orchestrator(_AbstractOrchestrator):
 
     @classmethod
     def _update_submission_status(cls, job: Job):
-        if submission := _SubmissionManagerFactory._build_manager()._get(job.submit_id):
-            submission._update_submission_status(job)
+        submission_manager = _SubmissionManagerFactory._build_manager()
+        if submission := submission_manager._get(job.submit_id):
+            submission_manager._update_submission_status(submission, job)
         else:
-            submissions = _SubmissionManagerFactory._build_manager()._get_all()
+            submissions = submission_manager._get_all()
             cls.__logger.error(f"Submission {job.submit_id} not found.")
             msg = "\n--------------------------------------------------------------------------------\n"
             msg += f"Submission {job.submit_id} not found.\n"

+ 1 - 0
taipy/core/notification/notifier.py

@@ -82,6 +82,7 @@ class Notifier:
                     <li>TASK</li>
                     <li>DATA_NODE</li>
                     <li>JOB</li>
+                    <li>SUBMISSION</li>
                 </ul>
             entity_id (Optional[str]): If provided, the listener will be notified
                 for all events related to this entity. Otherwise, the listener

+ 88 - 0
taipy/core/submission/_submission_manager.py

@@ -9,13 +9,17 @@
 # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 # specific language governing permissions and limitations under the License.
 
+from threading import Lock
 from typing import List, Optional, Union
 
+from taipy.logger._taipy_logger import _TaipyLogger
+
 from .._entity._entity_ids import _EntityIds
 from .._manager._manager import _Manager
 from .._repository._abstract_repository import _AbstractRepository
 from .._version._version_mixin import _VersionMixin
 from ..exceptions.exceptions import SubmissionNotDeletedException
+from ..job.job import Job, Status
 from ..notification import EventEntityType, EventOperation, Notifier, _make_event
 from ..scenario.scenario import Scenario
 from ..sequence.sequence import Sequence
@@ -27,6 +31,8 @@ class _SubmissionManager(_Manager[Submission], _VersionMixin):
     _ENTITY_NAME = Submission.__name__
     _repository: _AbstractRepository
     _EVENT_ENTITY_TYPE = EventEntityType.SUBMISSION
+    __lock = Lock()
+    __logger = _TaipyLogger._get_logger()
 
     @classmethod
     def _get_all(cls, version_number: Optional[str] = None) -> List[Submission]:
@@ -47,6 +53,88 @@ class _SubmissionManager(_Manager[Submission], _VersionMixin):
 
         return submission
 
+    @classmethod
+    def _update_submission_status(cls, submission: Submission, job: Job):
+        with cls.__lock:
+            submission = cls._get(submission)
+
+            if submission._submission_status == SubmissionStatus.FAILED:
+                return
+
+            job_status = job.status
+            if job_status == Status.FAILED:
+                submission._submission_status = SubmissionStatus.FAILED
+                cls._set(submission)
+                cls.__logger.debug(
+                    f"{job.id} status is {job_status}. Submission status set to `{submission._submission_status}`."
+                )
+                return
+            if job_status == Status.CANCELED:
+                submission._is_canceled = True
+            elif job_status == Status.BLOCKED:
+                submission._blocked_jobs.add(job.id)
+                submission._pending_jobs.discard(job.id)
+            elif job_status == Status.PENDING or job_status == Status.SUBMITTED:
+                submission._pending_jobs.add(job.id)
+                submission._blocked_jobs.discard(job.id)
+            elif job_status == Status.RUNNING:
+                submission._running_jobs.add(job.id)
+                submission._pending_jobs.discard(job.id)
+            elif job_status == Status.COMPLETED or job_status == Status.SKIPPED:
+                submission._is_completed = True  # type: ignore
+                submission._blocked_jobs.discard(job.id)
+                submission._pending_jobs.discard(job.id)
+                submission._running_jobs.discard(job.id)
+            elif job_status == Status.ABANDONED:
+                submission._is_abandoned = True  # type: ignore
+                submission._running_jobs.discard(job.id)
+                submission._blocked_jobs.discard(job.id)
+                submission._pending_jobs.discard(job.id)
+            cls._set(submission)
+
+            # The submission_status is set later to make sure notification for updating
+            # the submission_status attribute is triggered
+            if submission._is_canceled:
+                cls._set_submission_status(submission, SubmissionStatus.CANCELED, job)
+            elif submission._is_abandoned:
+                cls._set_submission_status(submission, SubmissionStatus.UNDEFINED, job)
+            elif submission._running_jobs:
+                cls._set_submission_status(submission, SubmissionStatus.RUNNING, job)
+            elif submission._pending_jobs:
+                cls._set_submission_status(submission, SubmissionStatus.PENDING, job)
+            elif submission._blocked_jobs:
+                cls._set_submission_status(submission, SubmissionStatus.BLOCKED, job)
+            elif submission._is_completed:
+                cls._set_submission_status(submission, SubmissionStatus.COMPLETED, job)
+            else:
+                cls._set_submission_status(submission, SubmissionStatus.UNDEFINED, job)
+            cls.__logger.debug(
+                f"{job.id} status is {job_status}. Submission status set to `{submission._submission_status}`"
+            )
+
+    @classmethod
+    def _set_submission_status(cls, submission: Submission, new_submission_status: SubmissionStatus, job: Job):
+        if not submission._is_in_context:
+            submission = cls._get(submission)
+        _current_submission_status = submission._submission_status
+        submission._submission_status = new_submission_status
+
+        cls._set(submission)
+
+        if _current_submission_status != submission._submission_status:
+            event = _make_event(
+                submission,
+                EventOperation.UPDATE,
+                "submission_status",
+                submission._submission_status,
+                job_triggered_submission_status_changed=job.id,
+            )
+
+            if not submission._is_in_context:
+                Notifier.publish(event)
+            else:
+                submission._in_context_attributes_changed_collector.append(event)
+
     @classmethod
     def _get_latest(cls, entity: Union[Scenario, Sequence, Task]) -> Optional[Submission]:
         entity_id = entity.id if not isinstance(entity, str) else entity

+ 2 - 65
taipy/core/submission/submission.py

@@ -14,15 +14,13 @@ import uuid
 from datetime import datetime
 from typing import Any, Dict, List, Optional, Set, Union
 
-from taipy.logger._taipy_logger import _TaipyLogger
-
 from .._entity._entity import _Entity
 from .._entity._labeled import _Labeled
 from .._entity._properties import _Properties
 from .._entity._reload import _Reloader, _self_reload, _self_setter
 from .._version._version_manager_factory import _VersionManagerFactory
-from ..job.job import Job, JobId, Status
-from ..notification.event import Event, EventEntityType, EventOperation, _make_event
+from ..job.job import Job, JobId
+from ..notification import Event, EventEntityType, EventOperation, _make_event
 from .submission_id import SubmissionId
 from .submission_status import SubmissionStatus
 
@@ -45,7 +43,6 @@ class Submission(_Entity, _Labeled):
     _MANAGER_NAME = "submission"
     __SEPARATOR = "_"
     lock = threading.Lock()
-    __logger = _TaipyLogger._get_logger()
 
     def __init__(
         self,
@@ -192,66 +189,6 @@ class Submission(_Entity, _Labeled):
     def __ge__(self, other):
         return self.creation_date.timestamp() >= other.creation_date.timestamp()
 
-    def _update_submission_status(self, job: Job):
-        from ._submission_manager_factory import _SubmissionManagerFactory
-
-        with self.lock:
-            submission_manager = _SubmissionManagerFactory._build_manager()
-            submission = submission_manager._get(self)
-            if submission._submission_status == SubmissionStatus.FAILED:
-                return
-
-            job_status = job.status
-            if job_status == Status.FAILED:
-                submission._submission_status = SubmissionStatus.FAILED
-                submission_manager._set(submission)
-                self.__logger.debug(
-                    f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}"
-                )
-                return
-            if job_status == Status.CANCELED:
-                submission._is_canceled = True
-            elif job_status == Status.BLOCKED:
-                submission._blocked_jobs.add(job.id)
-                submission._pending_jobs.discard(job.id)
-            elif job_status == Status.PENDING or job_status == Status.SUBMITTED:
-                submission._pending_jobs.add(job.id)
-                submission._blocked_jobs.discard(job.id)
-            elif job_status == Status.RUNNING:
-                submission._running_jobs.add(job.id)
-                submission._pending_jobs.discard(job.id)
-            elif job_status == Status.COMPLETED or job_status == Status.SKIPPED:
-                submission._is_completed = True  # type: ignore
-                submission._blocked_jobs.discard(job.id)
-                submission._pending_jobs.discard(job.id)
-                submission._running_jobs.discard(job.id)
-            elif job_status == Status.ABANDONED:
-                submission._is_abandoned = True  # type: ignore
-                submission._running_jobs.discard(job.id)
-                submission._blocked_jobs.discard(job.id)
-                submission._pending_jobs.discard(job.id)
-            submission_manager._set(submission)
-
-            # The submission_status is set later to make sure notification for updating
-            # the submission_status attribute is triggered
-            if submission._is_canceled:
-                submission.submission_status = SubmissionStatus.CANCELED  # type: ignore
-            elif submission._is_abandoned:
-                submission.submission_status = SubmissionStatus.UNDEFINED  # type: ignore
-            elif submission._running_jobs:
-                submission.submission_status = SubmissionStatus.RUNNING  # type: ignore
-            elif submission._pending_jobs:
-                submission.submission_status = SubmissionStatus.PENDING  # type: ignore
-            elif submission._blocked_jobs:
-                submission.submission_status = SubmissionStatus.BLOCKED  # type: ignore
-            elif submission._is_completed:
-                submission.submission_status = SubmissionStatus.COMPLETED  # type: ignore
-            else:
-                submission.submission_status = SubmissionStatus.UNDEFINED  # type: ignore
-            self.__logger.debug(
-                f"{job.id} status is {job_status}. Submission status set to " f"{submission._submission_status}"
-            )
-
     def is_finished(self) -> bool:
         """Indicate if the submission is finished.
 

+ 10 - 3
taipy/gui_core/_context.py

@@ -128,7 +128,7 @@ class _GuiCoreContext(CoreEventConsumerBase):
             with self.lock:
                 self.jobs_list = None
         elif event.entity_type == EventEntityType.SUBMISSION:
-            self.submission_status_callback(event.entity_id)
+            self.submission_status_callback(event.entity_id, event)
         elif event.entity_type == EventEntityType.DATA_NODE:
             with self.lock:
                 self.data_nodes_by_owner = None
@@ -146,7 +146,7 @@ class _GuiCoreContext(CoreEventConsumerBase):
             {"scenario": scenario_id or True},
         )
 
-    def submission_status_callback(self, submission_id: t.Optional[str]):
+    def submission_status_callback(self, submission_id: t.Optional[str] = None, event: t.Optional[Event] = None):
         if not submission_id or not is_readable(t.cast(SubmissionId, submission_id)):
             return
         try:
@@ -182,7 +182,14 @@ class _GuiCoreContext(CoreEventConsumerBase):
                             self.gui._call_user_callback(
                                 client_id,
                                 submission_name,
-                                [core_get(submission.entity_id), {"submission_status": new_status.name}],
+                                [
+                                    core_get(submission.id),
+                                    {
+                                        "submission_status": new_status.name,
+                                        "submittable_entity": core_get(submission.entity_id),
+                                        **(event.metadata if event else {}),
+                                    },
+                                ],
                                 submission.properties.get("module_context"),
                             )
 

+ 3 - 3
taipy/gui_core/viselements.json

@@ -207,15 +207,15 @@
                     {
                         "name": "on_submission_change",
                         "type": "Callback",
-                        "doc": "The name of the function that is triggered when a submission status is changed.<br/><br/>All the parameters of that function are optional:\n<ul>\n<li>state (<code>State^</code>): the state instance.</li>\n<li>submittable (Submittable): the entity (usually a Scenario) that was submitted.</li>\n<li>details (dict): the details on this callback's invocation.<br/>\nThis dictionary has the following keys:\n<ul>\n<li>submission_status (str): the new status of the submission (possible values: SUBMITTED, COMPLETED, CANCELED, FAILED, BLOCKED, WAITING, RUNNING).</li>\n<li>job: the Job (if any) that is at the origin of the submission status change.</li>\n</ul>",
+                        "doc": "The name of the function that is triggered when a submission status is changed.<br/><br/>All the parameters of that function are optional:\n<ul>\n<li>state (<code>State^</code>): the state instance.</li>\n<li>submission (Submission): the submission entity containing submission information.</li>\n<li>details (dict): the details on this callback's invocation.<br/>\nThis dictionary has the following keys:\n<ul>\n<li>submission_status (str): the new status of the submission (possible values: SUBMITTED, COMPLETED, CANCELED, FAILED, BLOCKED, WAITING, RUNNING).</li>\n<li>job: the Job (if any) that is at the origin of the submission status change.</li>\n<li>submittable_entity: submittable (Submittable): the entity (usually a Scenario) that was submitted.</li>\n</ul>",
                         "signature": [
                             [
                                 "state",
                                 "State"
                             ],
                             [
-                                "submittable",
-                                "Submittable"
+                                "submission",
+                                "Submission"
                             ],
                             [
                                 "details",

+ 16 - 0
tests/core/notification/test_notifier.py

@@ -19,6 +19,7 @@ from taipy.core.notification._topic import _Topic
 from taipy.core.notification.event import Event
 from taipy.core.notification.notifier import Notifier
 from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
+from taipy.core.submission.submission_status import SubmissionStatus
 
 
 def test_register():
@@ -742,6 +743,21 @@ def test_publish_submission_event():
         and event.attribute_name == expected_attribute_names[i]
         for i, event in enumerate(published_events)
     )
+    assert "job_triggered_submission_status_changed" in published_events[4].metadata
+    assert published_events[4].metadata["job_triggered_submission_status_changed"] == job.id
+
+    # Test updating submission_status manually will not add the job_triggered_submission_status_changed
+    # to the metadata as no job was used to update the submission_status
+    submission.submission_status = SubmissionStatus.CANCELED
+
+    assert registration_queue.qsize() == 1
+    published_event = registration_queue.get()
+
+    assert published_event.entity_type == EventEntityType.SUBMISSION
+    assert published_event.entity_id == submission.id
+    assert published_event.operation == EventOperation.UPDATE
+    assert published_event.attribute_name == "submission_status"
+    assert "job_triggered_submission_status_changed" not in published_event.metadata
 
 
 def test_publish_deletion_event():

+ 9 - 5
tests/core/submission/test_submission.py

@@ -124,7 +124,7 @@ def __test_update_submission_status(job_ids, expected_submission_status):
     submission.jobs = [jobs[job_id] for job_id in job_ids]
     for job_id in job_ids:
         job = jobs[job_id]
-        submission._update_submission_status(job)
+        _SubmissionManagerFactory._build_manager()._update_submission_status(submission, job)
     assert submission.submission_status == expected_submission_status
 
 
@@ -470,29 +470,33 @@ def test_auto_set_and_reload_properties():
     ],
 )
 def test_update_submission_status_with_single_job_completed(job_statuses, expected_submission_statuses):
+    submission_manager = _SubmissionManagerFactory._build_manager()
+
     job = MockJob("job_id", Status.SUBMITTED)
     submission = Submission("submission_id", "ENTITY_TYPE", "entity_config_id")
-    _SubmissionManagerFactory._build_manager()._set(submission)
+    submission_manager._set(submission)
 
     assert submission.submission_status == SubmissionStatus.SUBMITTED
 
     for job_status, submission_status in zip(job_statuses, expected_submission_statuses):
         job.status = job_status
-        submission._update_submission_status(job)
+        submission_manager._update_submission_status(submission, job)
         assert submission.submission_status == submission_status
 
 
 def __test_update_submission_status_with_two_jobs(job_ids, job_statuses, expected_submission_statuses):
+    submission_manager = _SubmissionManagerFactory._build_manager()
+
     jobs = {job_id: MockJob(job_id, Status.SUBMITTED) for job_id in job_ids}
     submission = Submission("submission_id", "ENTITY_TYPE", "entity_config_id")
-    _SubmissionManagerFactory._build_manager()._set(submission)
+    submission_manager._set(submission)
 
     assert submission.submission_status == SubmissionStatus.SUBMITTED
 
     for (job_id, job_status), submission_status in zip(job_statuses, expected_submission_statuses):
         job = jobs[job_id]
         job.status = job_status
-        submission._update_submission_status(job)
+        submission_manager._update_submission_status(submission, job)
         assert submission.submission_status == submission_status