فهرست منبع

Implement UTs for EventConsumer

jean-robin medori 1 ماه پیش
والد
کامیت
03af3e9a46

+ 0 - 1
tests/core/notification/test_notifier.py

@@ -14,7 +14,6 @@ from queue import SimpleQueue
 from taipy.common.config import Config, Frequency
 from taipy.core import taipy as tp
 from taipy.core._version._version_manager_factory import _VersionManagerFactory
-from taipy.core.common.frequency import Frequency
 from taipy.core.notification import EventEntityType, EventOperation, _Registration
 from taipy.core.notification._topic import _Topic
 from taipy.core.notification.event import Event

+ 10 - 0
tests/event/__init__.py

@@ -0,0 +1,10 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.

+ 135 - 0
tests/event/conftest.py

@@ -0,0 +1,135 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import os
+import pickle
+import shutil
+from datetime import datetime
+from queue import Queue
+from unittest.mock import patch
+
+import pandas as pd
+import pytest
+from sqlalchemy import create_engine, text
+from sqlalchemy.orm import close_all_sessions
+
+from taipy.common.config import Config
+from taipy.common.config.checker._checker import _Checker
+from taipy.common.config.common.frequency import Frequency
+from taipy.common.config.common.scope import Scope
+from taipy.core._orchestrator._orchestrator_factory import _OrchestratorFactory
+from taipy.core._version._version import _Version
+from taipy.core._version._version_manager_factory import _VersionManagerFactory
+from taipy.core.config import (
+    _ConfigIdChecker,
+    _CoreSectionChecker,
+    _DataNodeConfigChecker,
+    _JobConfigChecker,
+    _ScenarioConfigChecker,
+    _TaskConfigChecker,
+)
+from taipy.core.cycle._cycle_manager_factory import _CycleManagerFactory
+from taipy.core.cycle._cycle_model import _CycleModel
+from taipy.core.cycle.cycle import Cycle
+from taipy.core.cycle.cycle_id import CycleId
+from taipy.core.data._data_manager_factory import _DataManagerFactory
+from taipy.core.data._data_model import _DataNodeModel
+from taipy.core.data.in_memory import DataNodeId, InMemoryDataNode
+from taipy.core.job._job_manager_factory import _JobManagerFactory
+from taipy.core.job.job import Job
+from taipy.core.job.job_id import JobId
+from taipy.core.notification.notifier import Notifier
+from taipy.core.orchestrator import Orchestrator
+from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
+from taipy.core.scenario._scenario_model import _ScenarioModel
+from taipy.core.scenario.scenario import Scenario
+from taipy.core.scenario.scenario_id import ScenarioId
+from taipy.core.sequence._sequence_manager_factory import _SequenceManagerFactory
+from taipy.core.sequence.sequence import Sequence
+from taipy.core.sequence.sequence_id import SequenceId
+from taipy.core.submission._submission_manager_factory import _SubmissionManagerFactory
+from taipy.core.submission.submission import Submission
+from taipy.core.task._task_manager_factory import _TaskManagerFactory
+from taipy.core.task.task import Task, TaskId
+
+current_time = datetime.now()
+
+@pytest.fixture(scope="function")
+def current_datetime():
+    return current_time
+
+
+@pytest.fixture(scope="function")
+def data_node():
+    return InMemoryDataNode(
+        "data_node",
+        Scope.SCENARIO,
+        version="random_version_number",
+        properties={"default_data": "foo"},
+    )
+
+
+@pytest.fixture(scope="function")
+def scenario(cycle):
+    return Scenario(
+        "sc",
+        set(),
+        {},
+        set(),
+        ScenarioId("SCENARIO_sc_id"),
+        current_time,
+        is_primary=False,
+        tags={"foo"},
+        cycle=None,
+        version="random_version_number",
+    )
+
+
+@pytest.fixture(scope="function")
+def cycle():
+    example_date = datetime.fromisoformat("2021-11-11T11:11:01.000001")
+    return Cycle(
+        Frequency.DAILY,
+        {},
+        creation_date=example_date,
+        start_date=example_date,
+        end_date=example_date,
+        name="cc",
+        id=CycleId("cc_id"),
+    )
+
+
+@pytest.fixture(scope="function")
+def sequence(scenario):
+    return Sequence({}, [], SequenceId(f"SEQUENCE_sequence_{scenario.id}"), version="random_version_number")
+
+
+@pytest.fixture(scope="function")
+def submission():
+    return Submission("entity_id", "entity_type")
+
+
+@pytest.fixture(scope="function", autouse=True)
+def init(init_notifier):
+    init_notifier()
+
+    with patch("sys.argv", ["prog"]):
+        yield
+
+    init_notifier()
+
+
+@pytest.fixture
+def init_notifier():
+    def _init_notifier():
+        Notifier._topics_registrations_list = {}
+
+    return _init_notifier

+ 165 - 0
tests/event/test_consumer__on_datanode_created.py

@@ -0,0 +1,165 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import DataNode, Gui
+from taipy.core.config import DataNodeConfig
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import EventConsumer
+
+
+def cb_0(event: Event, datanode: DataNode):
+    ...
+
+
+def cb_1(event: Event, datanode: DataNode, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, datanode: DataNode):
+    ...
+
+
+def test_on_datanode_created(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_created(callback=cb_0)
+        # test the on_datanode_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.CREATION, entity_id=data_node.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True  # No config provided, so the datanode passes the filter
+            assert event.metadata["predefined_args"] == [data_node]
+
+
+def test_on_datanode_created_multiple_configs(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_created(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1", DataNodeConfig("dn2"), "data_node"])
+        # test the on_datanode_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.CREATION, entity_id=data_node.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True  # The datanode is from config 'data_node', so the datanode passes the filter
+            assert event.metadata["predefined_args"] == [data_node]
+
+
+def test_on_datanode_created_multiple_configs_no_matching(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_created(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1"])
+        # test the on_datanode_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.CREATION,
+                      entity_id=data_node.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            f_val = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert not f_val  # DataNode is not from any of the provided configs, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None
+
+
+def test_on_datanode_created_with_args_and_matching_config(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_created(callback=cb_1, callback_args=["foo"], datanode_config="data_node")
+        # test the on_datanode_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.CREATION, entity_id=data_node.id)
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True # datanode is from config 'data_node', so the datanode passes the filter
+            assert event.metadata.get("predefined_args") == [data_node]
+
+
+def test_on_datanode_created_with_args_and_not_matching_config(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_created(callback=cb_1, callback_args=["foo"], datanode_config="WRONG_CFG")
+        # test the on_datanode_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.CREATION, entity_id=data_node.id)
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is False  # datanode is not from WRONG_CFG, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None # No need to cache the datanode in the metadata
+
+
+def test_on_datanode_created_with_broadcast():
+    consumer = EventConsumer(Gui())
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_created(callback=cb_for_state, broadcast=True)
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=True)
+

+ 149 - 0
tests/event/test_consumer__on_datanode_deleted.py

@@ -0,0 +1,149 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import Gui
+from taipy.core.config import DataNodeConfig
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import EventConsumer
+
+
+def cb_0(event: Event, datanode: str):
+    ...
+
+
+def cb_1(event: Event, datanode: str, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, datanode: str):
+    ...
+
+
+def test_on_datanode_deleted(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_0)
+        # test the on_datanode_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    operation=EventOperation.DELETION,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.DELETION, entity_id=data_node.id)
+        assert actual_filter is not None
+        filter_value = actual_filter(event)
+        assert filter_value is True  # No config provided, so the datanode passes the filter
+        assert event.metadata["predefined_args"] == [data_node.id]
+
+
+def test_on_datanode_deleted_multiple_configs(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1", DataNodeConfig("dn2"), "data_node"])
+        # test the on_datanode_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.DELETION, entity_id=data_node.id)
+        assert actual_filter is not None
+        filter_value = actual_filter(event)
+        assert filter_value is True  # The datanode is from config 'data_node', so the datanode passes the filter
+        assert event.metadata["predefined_args"] == [data_node.id]
+
+
+def test_on_datanode_deleted_multiple_configs_no_matching(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1"])
+        # test the on_datanode_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.DELETION,
+                      entity_id=data_node.id)
+        assert actual_filter is not None
+        f_val = actual_filter(event)
+        assert not f_val  # Datanode is not from any of the provided configs, so it should not pass the filter
+        assert event.metadata.get("predefined_args") is None
+
+
+def test_on_datanode_deleted_with_args_and_matching_config(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_1, callback_args=["foo"], datanode_config="data_node")
+        # test the on_datanode_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.DELETION, entity_id=data_node.id)
+        filter_value = actual_filter(event)
+        assert filter_value is True # datanode is from config 'data_node', so the datanode passes the filter
+        assert event.metadata.get("predefined_args") == [data_node.id]
+
+
+def test_on_datanode_deleted_with_args_and_not_matching_config(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_1, callback_args=["foo"], datanode_config="WRONG_CFG")
+        # test the on_datanode_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.DELETION, entity_id=data_node.id)
+        filter_value = actual_filter(event)
+        assert filter_value is False  # The datanode is not from WRONG_CFG, so it should not pass the filter
+        assert event.metadata.get("predefined_args") is None # No need to cache the datanode in the metadata
+
+
+def test_on_datanode_deleted_with_broadcast(data_node):
+    consumer = EventConsumer(Gui())
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_for_state, broadcast=True)
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=True)

+ 186 - 0
tests/event/test_consumer__on_datanode_written.py

@@ -0,0 +1,186 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from typing import Any
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import DataNode, Gui
+from taipy.core.config import DataNodeConfig
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import EventConsumer
+
+
+def cb_0(event: Event, datanode: DataNode, data: Any):
+    ...
+
+
+def cb_1(event: Event, datanode: DataNode, data: Any, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, datanode: DataNode, data: Any):
+    ...
+
+
+def test_on_datanode_written(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_written(callback=cb_0)
+        # test the on_datanode_written method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE,
+                      operation=EventOperation.UPDATE,
+                      entity_id=data_node.id,
+                      attribute_name="last_edit_date")
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True  # No config provided, so the datanode passes the filter
+            assert event.metadata["predefined_args"] == [data_node, data_node.read()]
+
+
+def test_on_datanode_written_multiple_configs(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_written(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1", DataNodeConfig("dn2"), "data_node"])
+        # test the on_datanode_written method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE,
+                      operation=EventOperation.UPDATE,
+                      entity_id=data_node.id,
+                      attribute_name="last_edit_date")
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True  # The datanode is from config 'data_node', so the datanode passes the filter
+            assert event.metadata["predefined_args"] == [data_node, data_node.read()]
+
+
+def test_on_datanode_written_multiple_configs_no_matching(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_written(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1"])
+        # test the on_datanode_written method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE,
+                      operation=EventOperation.UPDATE,
+                      entity_id=data_node.id,
+                      attribute_name="last_edit_date")
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            f_val = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert not f_val  # Datanode is not from any of the provided configs, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None
+
+
+def test_on_datanode_written_with_args_and_matching_config(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_written(callback=cb_1, callback_args=["foo"], datanode_config="data_node")
+        # test the on_datanode_written method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE,
+                      operation=EventOperation.UPDATE,
+                      entity_id=data_node.id,
+                      attribute_name="last_edit_date")
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True # datanode is from config 'data_node', so the datanode passes the filter
+            assert event.metadata["predefined_args"] == [data_node, data_node.read()]
+
+
+def test_on_datanode_written_with_args_and_not_matching_config(data_node):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_written(callback=cb_1, callback_args=["foo"], datanode_config="WRONG_CFG")
+        # test the on_datanode_written method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE,
+                      operation=EventOperation.UPDATE,
+                      entity_id=data_node.id,
+                      attribute_name="last_edit_date")
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is False  # datanode is not from WRONG_CFG, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None # No need to cache the datanode in the metadata
+
+
+def test_on_datanode_written_with_broadcast(data_node):
+    consumer = EventConsumer(Gui())
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_datanode_written(callback=cb_for_state, broadcast=True)
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=True)
+

+ 93 - 0
tests/event/test_consumer__on_event.py

@@ -0,0 +1,93 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import pytest
+
+from taipy import Gui
+from taipy.core.notification import Event, EventEntityType, EventOperation, _Topic
+from taipy.event._event_callback import _Callback
+from taipy.event.event_consumer import EventConsumer
+from taipy.exceptions import NoGuiDefinedInEventConsumer
+
+
+def cb_0(event: Event, extra:str):
+    ...
+
+
+def cb_1(event: Event):
+    ...
+
+
+def cb_2(event: Event):
+    ...
+
+
+def cb_for_state(state, event: Event):
+    ...
+
+
+def test_on_event():
+    consumer = EventConsumer()
+    consumer.on_event(callback=cb_0, callback_args=["foo"])
+    consumer.on_event(callback=cb_1, entity_type=EventEntityType.SCENARIO)
+    consumer.on_event(callback=cb_2, entity_type=EventEntityType.SCENARIO, entity_id="bar")
+    consumer.on_event(callback=cb_0, callback_args=["baz"], operation=EventOperation.CREATION)
+    consumer.on_event(callback=cb_0, callback_args=["qux"], entity_type=EventEntityType.SEQUENCE,
+                      operation=EventOperation.SUBMISSION)
+    consumer.on_event(callback=cb_2, entity_type=EventEntityType.SCENARIO) # duplicate topic
+
+    assert consumer._registration is not None
+    registration = consumer._registration
+    assert registration.registration_id is not None
+    assert registration.queue is not None
+    assert len(registration.topics) == 5  # 5 unique topics
+    topic_1 = _Topic()
+    topic_2 = _Topic(entity_type=EventEntityType.SCENARIO)
+    topic_3 = _Topic(entity_type=EventEntityType.SCENARIO, entity_id="bar")
+    topic_4 = _Topic(operation=EventOperation.CREATION)
+    topic_5 = _Topic(entity_type=EventEntityType.SEQUENCE, operation=EventOperation.SUBMISSION)
+    assert topic_1 in registration.topics
+    assert topic_2 in registration.topics
+    assert topic_3 in registration.topics
+    assert topic_4 in registration.topics
+    assert topic_5 in registration.topics
+
+    assert consumer._gui is None
+
+    assert len(consumer._topic_callbacks_map) == 5  # 5 unique topics
+    assert topic_1 in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic_1] == [_Callback(cb_0, ["foo"])]
+    assert topic_2 in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic_2] == [_Callback(cb_1), _Callback(cb_2)]
+    assert topic_3 in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic_3] == [_Callback(cb_2)]
+    assert topic_4 in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic_4] == [_Callback(cb_0, ["baz"])]
+    assert topic_5 in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic_5] == [_Callback(cb_0, ["qux"])]
+
+
+def test_on_event_for_state():
+    consumer = EventConsumer(gui=Gui())
+    consumer.on_event(callback=cb_for_state, broadcast=True)
+
+    assert consumer._gui is not None
+    assert len(consumer._topic_callbacks_map) == 1
+    topic = _Topic()
+    assert topic in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic] == [_Callback(cb_for_state, broadcast=True)]
+
+
+def test_on_event_missing_gui():
+    consumer = EventConsumer()
+    with pytest.raises(NoGuiDefinedInEventConsumer):
+        consumer.on_event(callback=cb_for_state, broadcast=True)
+    assert len(consumer._topic_callbacks_map) == 0

+ 165 - 0
tests/event/test_consumer__on_scenario_created.py

@@ -0,0 +1,165 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import Gui, Scenario
+from taipy.core.config import ScenarioConfig
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import EventConsumer
+
+
+def cb_0(event: Event, scenario: Scenario):
+    ...
+
+
+def cb_1(event: Event, scenario: Scenario, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, scenario: Scenario):
+    ...
+
+
+def test_on_scenario_created(scenario):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_created(callback=cb_0)
+        # test the on_scenario_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION, entity_id=scenario.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = scenario
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(scenario.id)
+            assert filter_value is True  # No config provided, so the scenario passes the filter
+            assert event.metadata["predefined_args"] == [scenario]
+
+
+def test_on_scenario_created_multiple_configs(scenario):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_created(callback=cb_0,
+                                     scenario_config=[ScenarioConfig("sc_0"), "sc_1", ScenarioConfig("sc_2"), "sc"])
+        # test the on_scenario_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION, entity_id=scenario.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = scenario
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(scenario.id)
+            assert filter_value is True  # The scenario is from config 'sc', so the scenario passes the filter
+            assert event.metadata["predefined_args"] == [scenario]
+
+
+def test_on_scenario_created_multiple_configs_no_matching(scenario):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_created(callback=cb_0,
+                                     scenario_config=[ScenarioConfig("sc_0"), "sc_1"])
+        # test the on_scenario_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION,
+                      entity_id=scenario.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = scenario
+            f_val = actual_filter(event)
+            mck_get.assert_called_once_with(scenario.id)
+            assert not f_val  # Scenario is not from any of the provided configs, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None
+
+
+def test_on_scenario_created_with_args_and_matching_config(scenario):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_created(callback=cb_1, callback_args=["foo"], scenario_config="sc")
+        # test the on_scenario_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION, entity_id=scenario.id)
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = scenario
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(scenario.id)
+            assert filter_value is True # scenario is from config 'sc', so the scenario passes the filter
+            assert event.metadata.get("predefined_args") == [scenario]
+
+
+def test_on_scenario_created_with_args_and_not_matching_config(scenario):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_created(callback=cb_1, callback_args=["foo"], scenario_config="WRONG_CFG")
+        # test the on_scenario_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION, entity_id=scenario.id)
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = scenario
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(scenario.id)
+            assert filter_value is False  # scenario is not from WRONG_CFG, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None # No need to cache the scenario in the metadata
+
+
+def test_on_scenario_created_with_broadcast():
+    consumer = EventConsumer(Gui())
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_created(callback=cb_for_state, broadcast=True)
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=True)
+

+ 150 - 0
tests/event/test_consumer__on_scenario_deleted.py

@@ -0,0 +1,150 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import Gui
+from taipy.core.config import ScenarioConfig
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import EventConsumer
+
+
+def cb_0(event: Event, scenario: str):
+    ...
+
+
+def cb_1(event: Event, scenario: str, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, scenario: str):
+    ...
+
+
+def test_on_scenario_deleted(scenario):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_0)
+        # test the on_scenario_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.DELETION, entity_id=scenario.id)
+        assert actual_filter is not None
+        filter_value = actual_filter(event)
+        assert filter_value is True  # No config provided, so the scenario passes the filter
+        assert event.metadata["predefined_args"] == [scenario.id]
+
+
+def test_on_scenario_deleted_multiple_configs(scenario):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_0,
+                                     scenario_config=[ScenarioConfig("sc_0"), "sc_1", ScenarioConfig("sc_2"), "sc"])
+        # test the on_scenario_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.DELETION, entity_id=scenario.id)
+        assert actual_filter is not None
+        filter_value = actual_filter(event)
+        assert filter_value is True  # The scenario is from config 'sc', so the scenario passes the filter
+        assert event.metadata["predefined_args"] == [scenario.id]
+
+
+def test_on_scenario_deleted_multiple_configs_no_matching(scenario):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_0,
+                                     scenario_config=[ScenarioConfig("sc_0"), "sc_1"])
+        # test the on_scenario_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.DELETION,
+                      entity_id=scenario.id)
+        assert actual_filter is not None
+        f_val = actual_filter(event)
+        assert not f_val  # Scenario is not from any of the provided configs, so it should not pass the filter
+        assert event.metadata.get("predefined_args") is None
+
+
+def test_on_scenario_deleted_with_args_and_matching_config(scenario):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_1, callback_args=["foo"], scenario_config="sc")
+        # test the on_scenario_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.DELETION, entity_id=scenario.id)
+        filter_value = actual_filter(event)
+        assert filter_value is True # scenario is from config 'sc', so the scenario passes the filter
+        assert event.metadata.get("predefined_args") == [scenario.id]
+
+
+def test_on_scenario_deleted_with_args_and_not_matching_config(scenario):
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_1, callback_args=["foo"], scenario_config="WRONG_CFG")
+        # test the on_scenario_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.DELETION, entity_id=scenario.id)
+        filter_value = actual_filter(event)
+        assert filter_value is False  # scenario is not from WRONG_CFG, so it should not pass the filter
+        assert event.metadata.get("predefined_args") is None # No need to cache the scenario in the metadata
+
+
+def test_on_scenario_deleted_with_broadcast(scenario):
+    consumer = EventConsumer(Gui())
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_for_state, broadcast=True)
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=True)
+

+ 284 - 0
tests/event/test_consumer__on_submission_finished.py

@@ -0,0 +1,284 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import Gui, Scenario, Submission, SubmissionStatus
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import EventConsumer
+
+
+def cb_0(event: Event, submittable: Scenario, submission: Submission):
+    ...
+
+
+def cb_1(event: Event, submittable: Scenario, submission: Submission, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, submittable: Scenario, submission: Submission):
+    ...
+
+
+def test_on_scenario_submission_finished(scenario, submission):
+    submission._entity_id = scenario.id
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0)
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                        attribute_value=SubmissionStatus.COMPLETED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_has_calls(calls=[mock.call(submission.id), mock.call(scenario.id)], any_order=False)
+            assert filter_value is True  # No config provided, so the event passes the filter
+            assert event.metadata["predefined_args"] == [scenario, submission]
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                        attribute_value=SubmissionStatus.FAILED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_has_calls(calls=[mock.call(submission.id), mock.call(scenario.id)], any_order=False)
+            assert filter_value is True  # No config provided, so the event passes the filter
+            assert event.metadata["predefined_args"] == [scenario, submission]
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                        attribute_value=SubmissionStatus.CANCELED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_has_calls(calls=[mock.call(submission.id), mock.call(scenario.id)], any_order=False)
+            assert filter_value is True  # No config provided, so the event passes the filter
+            assert event.metadata["predefined_args"] == [scenario, submission]
+
+
+def test_filter_false__wrong_status(scenario, submission):
+    submission._entity_id = scenario.id
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0)
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+
+        # No status value
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      )
+        filter_value = actual_filter(event)
+        assert filter_value is False  # no status value
+        assert event.metadata.get("predefined_args") is None
+
+        # wrong status
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      attribute_value=SubmissionStatus.BLOCKED,
+                      )
+        filter_value = actual_filter(event)
+        assert filter_value is False  # status is not finished
+        assert event.metadata.get("predefined_args") is None
+
+
+def test_filter_false__config_ids_and_sequence(scenario, sequence, submission):
+    submission._entity_id = sequence.id
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0, config_ids=scenario.config_id)
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      attribute_value=SubmissionStatus.COMPLETED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(submission.id)
+            assert filter_value is False  # Sequence submission do not have config so the event does not pass the filter
+            assert event.metadata.get("predefined_args") is None
+
+
+def test_filter_false__not_matching_config_ids(scenario, submission):
+    submission._entity_id = scenario.id
+    submission._entity_config_id = scenario.config_id
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0, config_ids=["NOT_MATCHING_CONFIG_ID"])
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      attribute_value=SubmissionStatus.COMPLETED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(submission.id)
+            # Submission config id is not in the provided list so the event does not pass the filter
+            assert filter_value is False
+            assert event.metadata.get("predefined_args") is None
+
+
+def test_filter_true__with_config(scenario, submission):
+    submission._entity_id = scenario.id
+    submission._entity_config_id = scenario.config_id
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0, config_ids=["scenario_cfg", scenario.config_id])
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      attribute_value=SubmissionStatus.COMPLETED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_has_calls([mock.call(submission.id), mock.call(scenario.id)])
+            assert filter_value is True
+            assert event.metadata.get("predefined_args") == [scenario, submission]
+
+
+def test_filter_true__without_config(scenario, submission):
+    submission._entity_id = scenario.id
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0)
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      attribute_value=SubmissionStatus.COMPLETED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_has_calls([mock.call(submission.id), mock.call(scenario.id)])
+            assert filter_value is True
+            assert event.metadata.get("predefined_args") == [scenario, submission]
+
+
+def test_on_scenario_submission_finished_with_args():
+    consumer = EventConsumer()
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_submission_finished(callback=cb_1, callback_args=["extra"])
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["extra"],
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+
+def test_on_scenario_submission_finished_with_args_and_state():
+    consumer = EventConsumer(gui=Gui())
+    with mock.patch("taipy.event.event_consumer.EventConsumer.on_event") as mck:
+        consumer.on_submission_finished(callback=cb_for_state, callback_args=["extra"], broadcast=True)
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=["extra"],
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=True)

+ 224 - 0
tests/event/test_consumer__process_event.py

@@ -0,0 +1,224 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from typing import Any, Dict, List
+from unittest import mock
+
+from taipy import Gui, Scenario
+from taipy.core.notification import Event, EventEntityType, EventOperation, _Topic
+from taipy.event._event_callback import _Callback
+from taipy.event.event_consumer import EventConsumer
+
+collector: Dict[str, Any] = {"cb_0": 0, "cb_1": 0, "cb_2": 0, "cb_3": 0, "cb_for_state": 0,
+            "cb_scenario_creation": 0, "cb_scenario_creation_with_state": 0}
+args_collector: Dict[str, List] = {}
+
+
+def init_collector():
+    return {"cb_0": 0, "cb_1": 0, "cb_2": 0, "cb_3": 0, "cb_for_state": 0,
+            "cb_scenario_creation": 0, "cb_scenario_creation_with_state": 0}, {}
+
+
+def cb_0(event: Event, extra:str):
+    collector["cb_0"]+=1
+    if not args_collector.get("cb_0"):
+        args_collector["cb_0"] = [extra]
+    else:
+        args_collector["cb_0"].append(extra)
+    print(f"event created at {event.creation_date} triggered callback cb_0.")  # noqa: T201
+
+
+def cb_1(event: Event):
+    collector["cb_1"]+=1
+    print(f"event created at {event.creation_date} triggered callback cb_1.")  # noqa: T201
+
+
+def cb_2(event: Event):
+    collector["cb_2"]+=1
+    print(f"event created at {event.creation_date} triggered callback cb_2.")  # noqa: T201
+
+
+def cb_3(event: Event):
+    collector["cb_3"]+=1
+    print(f"event created at {event.creation_date} triggered callback cb_3.")  # noqa: T201
+
+
+def cb_for_state(state, event: Event):
+    collector["cb_for_state"]+=1
+    print(f"event created at {event.creation_date} triggered callback cb_for_state.")  # noqa: T201
+
+
+def cb_scenario_creation(event: Event, scenario: Scenario, extra_arg: str):
+    collector["cb_scenario_creation"]+=1
+    print(f"scenario {scenario.id} created at {event.creation_date} with {extra_arg}.")  # noqa: T201
+
+
+def cb_scenario_creation_with_state(state, event: Event, scenario: Scenario, extra_arg: str):
+    collector["cb_scenario_creation_with_state"]+=1
+    print(f"scenario {scenario.id} created at {event.creation_date} with {extra_arg}.")  # noqa: T201
+
+
+
+def test_process_event(scenario):
+    global collector
+    global args_collector
+    consumer = EventConsumer()
+    consumer.on_event(callback=cb_0, callback_args=["foo"])
+    consumer.on_event(callback=cb_1, entity_type=EventEntityType.SCENARIO)
+    consumer.on_event(callback=cb_2, entity_type=EventEntityType.SCENARIO, entity_id="bar")
+    consumer.on_event(callback=cb_3, operation=EventOperation.CREATION)
+    consumer.on_event(callback=cb_0, callback_args=["baz"], operation=EventOperation.CREATION)
+    consumer.on_event(callback=cb_1, entity_type=EventEntityType.SEQUENCE, operation=EventOperation.SUBMISSION)
+    consumer.on_event(callback=cb_1, entity_type=EventEntityType.JOB,
+                      operation=EventOperation.UPDATE, attribute_name="status")
+
+    collector, args_collector = init_collector()
+    event_1 = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        entity_id="bar",
+        attribute_name=None,
+        attribute_value=None,
+        metadata={},
+    )
+    consumer.process_event(event_1)
+
+    assert collector["cb_0"] == 2
+    assert collector["cb_1"] == 1
+    assert collector["cb_2"] == 1
+    assert collector["cb_3"] == 1
+
+    collector, args_collector = init_collector()
+    event_2 = Event(
+        entity_type=EventEntityType.SEQUENCE,
+        operation=EventOperation.SUBMISSION,
+        entity_id="quux",
+        attribute_name=None,
+        attribute_value=None,
+        metadata={},
+    )
+    consumer.process_event(event_2)
+
+    assert collector["cb_0"] == 1
+    assert collector["cb_1"] == 1
+    assert collector["cb_2"] == 0
+    assert collector["cb_3"] == 0
+    collector, args_collector = init_collector()
+
+    collector, args_collector = init_collector()
+    event_3 = Event(
+        entity_type=EventEntityType.JOB,
+        operation=EventOperation.UPDATE,
+        entity_id="corge",
+        attribute_name="status",
+        attribute_value="COMPLETED",
+        metadata={},
+    )
+    consumer.process_event(event_3)
+
+    assert collector["cb_0"] == 1
+    assert collector["cb_1"] == 1
+    assert collector["cb_2"] == 0
+    assert collector["cb_3"] == 0
+    collector, args_collector = init_collector()
+
+
+def test_process_event_with_state():
+    consumer = EventConsumer(gui=Gui())
+    consumer.on_event(callback=cb_for_state, broadcast=True)
+
+    event_1 = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        entity_id="foo",
+        attribute_name=None,
+        attribute_value=None,
+        metadata={},
+    )
+    with mock.patch("taipy.Gui.broadcast_callback") as mock_broadcast:
+        consumer.process_event(event_1)
+        mock_broadcast.assert_called_once_with(cb_for_state, [event_1])
+
+
+def test_process_event_with_filter():
+    global collector
+    global args_collector
+    def filt(event: Event) -> bool:
+        return event.metadata.get("foo") == "bar"
+
+    consumer = EventConsumer()
+    consumer.on_event(callback=cb_0,
+                      callback_args=["foo"],
+                      entity_type=EventEntityType.SCENARIO,
+                      operation=EventOperation.CREATION,
+                      filter=filt,
+                      broadcast=False)
+
+    topic = _Topic(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION)
+    assert len(consumer._topic_callbacks_map) == 1
+    assert consumer._topic_callbacks_map[topic] == [_Callback(cb_0, ["foo"], False, filt)]
+
+    collector, args_collector = init_collector()
+    event_matching_filter = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        metadata={"foo": "bar"},
+    )
+    consumer.process_event(event_matching_filter)
+
+    assert collector["cb_0"] == 1
+
+    collector, args_collector = init_collector()
+    event_not_matching_filter = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        metadata={"baz": "qux"},
+    )
+    consumer.process_event(event_not_matching_filter)
+
+    assert collector["cb_0"] == 0
+    collector, args_collector = init_collector()
+
+
+def test_process_event_with_predefined_args(scenario):
+    global collector
+    global args_collector
+    consumer = EventConsumer()
+    consumer.on_event(callback=cb_scenario_creation, callback_args=["foo"])
+    collector, args_collector = init_collector()
+    event = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        entity_id="foo",
+        attribute_name=None,
+        attribute_value=None,
+        metadata={"predefined_args": [scenario]},
+    )
+    consumer.process_event(event)
+
+    assert collector["cb_scenario_creation"] == 1
+    collector, args_collector = init_collector()
+
+
+def test_process_event_with_predefined_args_and_state(scenario):
+    consumer = EventConsumer(Gui())
+    consumer.on_event(callback=cb_scenario_creation_with_state, callback_args=["foo"], broadcast=True)
+    event = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        entity_id="foo",
+        attribute_name=None,
+        attribute_value=None,
+        metadata={"predefined_args": [scenario]},
+    )
+
+    with mock.patch("taipy.Gui.broadcast_callback") as mock_broadcast:
+        consumer.process_event(event)
+        mock_broadcast.assert_called_once_with(cb_scenario_creation_with_state, [event, scenario, "foo"])

+ 0 - 0
tests/tools/__init__.py