浏览代码

added small test for getting job from metadata

Toan Quach 1 年之前
父节点
当前提交
c9bb5cfe8e
共有 1 个文件被更改,包括 16 次插入0 次删除
  1. 16 0
      tests/core/notification/test_notifier.py

+ 16 - 0
tests/core/notification/test_notifier.py

@@ -19,6 +19,7 @@ from taipy.core.notification._topic import _Topic
 from taipy.core.notification.event import Event
 from taipy.core.notification.notifier import Notifier
 from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
+from taipy.core.submission.submission_status import SubmissionStatus
 
 
 def test_register():
@@ -742,6 +743,21 @@ def test_publish_submission_event():
         and event.attribute_name == expected_attribute_names[i]
         for i, event in enumerate(published_events)
     )
+    assert "job_triggered_submission_status_changed" in published_events[4].metadata
+    assert published_events[4].metadata["job_triggered_submission_status_changed"] == job.id
+
+    # Test updating submission_status manually will not add the job_triggered_submission_status_changed
+    # to the metadata as no job was used to update the submission_status
+    submission.submission_status = SubmissionStatus.CANCELED
+
+    assert registration_queue.qsize() == 1
+    published_event = registration_queue.get()
+
+    assert published_event.entity_type == EventEntityType.SUBMISSION
+    assert published_event.entity_id == submission.id
+    assert published_event.operation == EventOperation.UPDATE
+    assert published_event.attribute_name == "submission_status"
+    assert "job_triggered_submission_status_changed" not in published_event.metadata
 
 
 def test_publish_deletion_event():