Browse Source

Event consumer API simplification into develop (#2566)

* implement EventConsumer

* Implement UTs for EventConsumer

* fix imports

* Update __init__.py

* Update __init__.py

* Decorate not implemented method with @abstractmethod

* oups

* Make mypy happy

* Make ruff happy

* Update codeql-analysis.yml

* fix UT

* fix UT

* Feedback from Eric and Florian: Systematically pass the gui as param

* Attempt to fix pipfile.lock

* Revert "Attempt to fix pipfile.lock"

This reverts commit ec6dc934642cd3a920e1f39116cb63334e7896f5.

* #2597 Fix missing parent_ids

* Yet another consumer API version

* Linter + Giang's feedback

* Linter + Giang's feedback

* Linter + Giang's feedback
Jean-Robin 1 week ago
parent
commit
85af59e2bb

+ 3 - 3
.github/workflows/codeql-analysis.yml

@@ -27,12 +27,12 @@ jobs:
       uses: actions/checkout@v4
 
     - name: Initialize CodeQL
-      uses: github/codeql-action/init@v2
+      uses: github/codeql-action/init@v3
       with:
         languages: ${{ matrix.language }}
 
     - name: Autobuild
-      uses: github/codeql-action/autobuild@v2
+      uses: github/codeql-action/autobuild@v3
 
     - name: Perform CodeQL Analysis
-      uses: github/codeql-action/analyze@v2
+      uses: github/codeql-action/analyze@v3

+ 1 - 0
taipy/common/_check_dependencies.py

@@ -41,6 +41,7 @@ def _check_dependency_is_installed(
 class EnterpriseEditionUtils:
     _TAIPY_ENTERPRISE_MODULE = "taipy.enterprise"
     _TAIPY_ENTERPRISE_CORE_MODULE = _TAIPY_ENTERPRISE_MODULE + ".core"
+    _TAIPY_ENTERPRISE_EVENT_PACKAGE = _TAIPY_ENTERPRISE_MODULE + ".event"
 
     @classmethod
     def _using_enterprise(cls) -> bool:

+ 2 - 2
taipy/core/data/json.py

@@ -113,7 +113,7 @@ class JSONDataNode(DataNode, _FileDataNodeMixin):
     @_self_reload(DataNode._MANAGER_NAME)
     def encoder(self) -> json.JSONEncoder:
         """The JSON encoder that is used to write into the JSON file."""
-        return self._encoder
+        return self._encoder # type: ignore[return-value]
 
     @encoder.setter
     def encoder(self, encoder: json.JSONEncoder) -> None:
@@ -123,7 +123,7 @@ class JSONDataNode(DataNode, _FileDataNodeMixin):
     @_self_reload(DataNode._MANAGER_NAME)
     def decoder(self) -> json.JSONDecoder:
         """The JSON decoder that is used to read from the JSON file."""
-        return self._decoder
+        return self._decoder # type: ignore[return-value]
 
     @decoder.setter
     def decoder(self, decoder: json.JSONDecoder) -> None:

+ 0 - 1
taipy/core/notification/__init__.py

@@ -25,7 +25,6 @@ object) must be instantiated with an associated event queue.
 
 from ._registration import _Registration
 from ._topic import _Topic
-from .core_event_consumer import CoreEventConsumerBase
 from .event import Event, EventEntityType, EventOperation, _make_event
 from .notifier import Notifier, _publish_event
 from .registration_id import RegistrationId

+ 1 - 1
taipy/core/notification/core_event_consumer.py → taipy/core/notification/_core_event_consumer.py

@@ -16,7 +16,7 @@ from queue import Empty, SimpleQueue
 from .event import Event
 
 
-class CoreEventConsumerBase(threading.Thread):
+class _CoreEventConsumerBase(threading.Thread):
     """Abstract base class for implementing a Core event consumer.
 
     This class provides a framework for consuming events from a queue in a separate thread.

+ 2 - 7
taipy/core/scenario/_scenario_manager.py

@@ -186,14 +186,9 @@ class _ScenarioManager(_Manager[Scenario], _VersionMixin):
             sequences=sequences,
         )
 
-        for task in tasks:
-            if scenario_id not in task._parent_ids:
-                task._parent_ids.update([scenario_id])
-                _task_manager._update(task)
-
         for dn in additional_data_nodes.values():
             if scenario_id not in dn._parent_ids:
-                dn._parent_ids.update([scenario_id])
+                dn._parent_ids.add(scenario_id)
                 _data_manager._update(dn)
 
         cls._repository._save(scenario)
@@ -377,7 +372,7 @@ class _ScenarioManager(_Manager[Scenario], _VersionMixin):
 
     @classmethod
     def _tag(cls, scenario: Scenario, tag: str) -> None:
-        tags = scenario.properties.get(cls._AUTHORIZED_TAGS_KEY, set())
+        tags = scenario.properties.get(cls._AUTHORIZED_TAGS_KEY, set())  # type: ignore[var-annotated]
         if len(tags) > 0 and tag not in tags:
             raise UnauthorizedTagError(f"Tag `{tag}` not authorized by scenario configuration `{scenario.config_id}`")
         scenario._add_tag(tag)

+ 12 - 5
taipy/core/scenario/scenario.py

@@ -603,11 +603,18 @@ class Scenario(_Entity, Submittable, _Labeled):
         Arguments:
             name (str): The name of the sequence to remove.
         """
-        seq_id = self.sequences[name].id
-        _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
-        _sequences.pop(name)
-        self.sequences = _sequences  # type: ignore
-        Notifier.publish(Event(EventEntityType.SEQUENCE, EventOperation.DELETION, entity_id=seq_id))
+        if seq := self.sequences.get(name):
+            from taipy.core.task._task_manager_factory import _TaskManagerFactory
+
+            for task in seq.tasks.values():
+                task._parent_ids.discard(seq.id)
+                _TaskManagerFactory._build_manager()._repository._save(task)
+            _sequences = _Reloader()._reload(self._MANAGER_NAME, self)._sequences
+            _sequences.pop(name)
+
+            self.sequences = _sequences  # type: ignore
+            Notifier.publish(Event(EventEntityType.SEQUENCE, EventOperation.DELETION, entity_id=seq.id))
+
 
     def remove_sequences(self, sequence_names: List[str]) -> None:
         """Remove multiple sequences from the scenario.

+ 8 - 5
taipy/core/task/_task_manager.py

@@ -80,8 +80,8 @@ class _TaskManager(_Manager[Task], _VersionMixin):
         for task_config in task_configs:
             data_node_configs.update([Config.data_nodes[dnc.id] for dnc in task_config.input_configs])
             data_node_configs.update([Config.data_nodes[dnc.id] for dnc in task_config.output_configs])
-
-        data_nodes = _DataManagerFactory._build_manager()._bulk_get_or_create(
+        _data_manager = _DataManagerFactory._build_manager()
+        data_nodes = _data_manager._bulk_get_or_create(
             list(data_node_configs), cycle_id, scenario_id
         )
         tasks_configs_and_owner_id = []
@@ -102,6 +102,8 @@ class _TaskManager(_Manager[Task], _VersionMixin):
         tasks = []
         for task_config, owner_id in tasks_configs_and_owner_id:
             if task := tasks_by_config.get((task_config, owner_id)):
+                task._parent_ids.add(scenario_id)
+                cls._update(task)
                 tasks.append(task)
             else:
                 version = _VersionManagerFactory._build_manager()._get_latest_version()
@@ -122,13 +124,14 @@ class _TaskManager(_Manager[Task], _VersionMixin):
                     inputs,
                     outputs,
                     owner_id=owner_id,
-                    parent_ids=set(),
+                    parent_ids={scenario_id} if scenario_id else set(),
                     version=version,
                     skippable=skippable,
                 )
                 for dn in set(inputs + outputs):
-                    dn._parent_ids.update([task.id])
-                cls._create(task)
+                    dn._parent_ids.add(task.id)
+                    _data_manager._update(dn)
+                cls._repository._save(task)
                 Notifier.publish(_make_event(task, EventOperation.CREATION))
                 tasks.append(task)
         return tasks

+ 13 - 0
taipy/event/__init__.py

@@ -0,0 +1,13 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+from ..core.notification.event import Event, EventEntityType, EventOperation
+from .event_consumer import GuiEventConsumer

+ 25 - 0
taipy/event/_event_callback.py

@@ -0,0 +1,25 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from dataclasses import dataclass, field
+from typing import Callable, List, Optional
+
+from taipy.core.notification import Event
+
+
+@dataclass
+class _Callback:
+    callback: Callable
+    args: Optional[List] = field(default_factory=list)
+    broadcast: Optional[bool] = False
+    filter: Optional[Callable[[Event], bool]] = None
+
+    def __hash__(self):
+        return hash(self.callback)

+ 29 - 0
taipy/event/_event_processor.py

@@ -0,0 +1,29 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from abc import abstractmethod
+
+from taipy.core.notification import Event
+
+
+class _AbstractEventProcessor:
+    """Abstract base class for implementing an event processor."""
+
+    @classmethod
+    @abstractmethod
+    def process_event(cls, event_consumer, event: Event):
+        raise NotImplementedError("Subclasses must implement this method.")
+
+class _EventProcessor(_AbstractEventProcessor):
+
+    @classmethod
+    def process_event(cls, event_consumer, event: Event):
+        event_consumer._process_event(event)
+

+ 1253 - 0
taipy/event/event_consumer.py

@@ -0,0 +1,1253 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+from typing import Callable, Dict, List, Optional, Union
+
+from taipy import DataNode, Gui, Scenario, Submission, SubmissionStatus
+from taipy.common._check_dependencies import EnterpriseEditionUtils
+from taipy.common.logger._taipy_logger import _TaipyLogger
+from taipy.core.common._utils import _load_fct
+from taipy.core.config import DataNodeConfig, ScenarioConfig, TaskConfig
+from taipy.core.notification import (
+    Event,
+    EventEntityType,
+    EventOperation,
+    Notifier,
+    _Registration,
+    _Topic,
+)
+from taipy.core.notification._core_event_consumer import _CoreEventConsumerBase
+from taipy.event._event_callback import _Callback
+from taipy.event._event_processor import _AbstractEventProcessor, _EventProcessor
+from taipy.exceptions import NoGuiDefinedInEventConsumer
+
+
+class GuiEventConsumer(_CoreEventConsumerBase):
+    """
+    The Taipy event consumer service.
+
+    This service listens for events in a Taipy application and triggers callback
+    executions when events matching specific topics are produced. The service handle
+    both cases where callbacks are broadcast to all states or executed once on the
+    server side.
+
+    The main method to use is `on_event()^`, that registers a callback to a topic.
+
+    Before starting the event consumer service, register each callback to a topic.
+    The topics are defined by the entity type, the entity id, the operation, and the
+    attribute name of the events. If an event matching the provided topic is produced,
+    the callback execution is triggered.
+
+    For more information about the event attributes please refer to the `Event^` class.
+
+    !!! note "Filters"
+
+        For each registered callback, you can specify a custom filter function in addition
+        to the topic. This is mostly useful when your filter is more complex than the
+        topic. The filter must accept an event as the only argument and return a
+        boolean. If the filter returns False on an event, the callback is not triggered.
+        See an example below.
+
+    !!! note "Callback extra arguments"
+
+        For each registered callback, you can also specify extra arguments to be passed to
+        the callback function in addition to the event. The extra arguments must be provided
+        as a list of values.
+
+    !!! note "Broadcast a callback to all states"
+
+        When registering a callback, you can specify if the callback is automatically
+        broadcast to all states. In this case, the first argument of the callback must be
+        the state otherwise it is the `GUI^` instance. The second argument is the event.
+        Optionally, the callback can accept more extra arguments (see the `callback_args`
+        argument).
+
+    !!! example
+
+        === "One callback to match all events"
+
+            ```python
+            from taipy import Event, GuiEventConsumer, Gui
+
+            def event_received(gui: Gui, event: Event):
+                print(f"Received event created at : {event.creation_date}")
+
+            if __name__ == "__main__":
+                event_consumer = GuiEventConsumer()
+                event_consumer.on_event(callback=event_received)
+                event_consumer.start()
+            ```
+
+        === "Two callbacks to match different topics"
+
+            ```python
+            from taipy import Event, GuiEventConsumer, Gui
+
+            def on_entity_creation(event: Event, gui: Gui):
+                print(f" {event.entity_type} entity created at {event.creation_date}")
+
+            def on_scenario(event: Event, gui: Gui):
+                print(f"Scenario '{event.entity_id}' processed for a '{event.operation}' operation.")
+
+            if __name__ == "__main__":
+                event_consumer = GuiEventConsumer()
+                event_consumer.on_event(callback=on_entity_creation, operation=EventOperation.CREATION)
+                event_consumer.on_event(callback=scenario_event, entity_type=EventEntityType.SCENARIO)
+                event_consumer.start()
+            ```
+
+        === "Callbacks to be broadcast to all states"
+
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui
+
+            def event_received(state, event: Event):
+                scenario = tp.get(event.entity_id)
+                print(f"Received event created at : {event.creation_date} for scenario '{scenario.name}'.")
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.broadcast_on_event(callback=event_received)
+                event_consumer.start()
+                taipy.run(gui)
+            ```
+
+        === "Two callbacks for scenario creations"
+
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+
+            def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
+                print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
+
+            def store_latest_scenario(state: State, event: Event, scenario: Scenario):
+                print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
+                state.latest_scenario = scenario
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.on_scenario_created(callback=print_scenario_created)
+                event_consumer.broadcast_on_scenario_created(callback=store_latest_scenario)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        === "With specific filters"
+
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui
+
+            def cycle_filter(event: Event, gui: Gui):
+                scenario = tp.get(event.entity_id)
+                return scenario.cycle.name == "2023"
+
+            def event_received(state, event: Event):
+                scenario = tp.get(event.entity_id)
+                cycle = scenario.cycle
+                print(f"Received event for scenario '{scenario.name}' in cycle 'cycle.name'.")
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.broadcast_on_event(
+                    callback=event_received,
+                    entity_type=EventEntityType.SCENARIO,
+                    filter=cycle_filter)
+                event_consumer.start()
+                taipy.run(gui)
+            ```
+
+    Others methods such as `on_data_node_written()^` or `on_submission_finished()^` are
+    utility methods as shortcuts to easily register callbacks for predefined topics and
+    filters.
+    """
+
+    def __init__(self, gui: Optional[Gui] = None) -> None:
+        """Initialize the Gui Event Consumer service.
+
+        Arguments:
+            gui (Gui): The Gui instance used to broadcast the callbacks to all states.
+        """
+        self._registration = _Registration()
+        self._topic_callbacks_map: Dict[_Topic, List[_Callback]] = {}
+        self._gui = gui
+        self.event_processor: _AbstractEventProcessor = _EventProcessor()
+        if EnterpriseEditionUtils._using_enterprise():
+            self.event_processor = _load_fct(
+                EnterpriseEditionUtils._TAIPY_ENTERPRISE_EVENT_PACKAGE + "._event_processor",
+                "_AuthorizedEventProcessor",
+            )()
+        super().__init__(self._registration.registration_id, self._registration.queue)
+
+    def on_event(
+        self,
+        callback: Callable,
+        callback_args: Optional[List] = None,
+        entity_type: Optional[EventEntityType] = None,
+        entity_id: Optional[str] = None,
+        operation: Optional[EventOperation] = None,
+        attribute_name: Optional[str] = None,
+        filter: Optional[Callable[[Event], bool]] = None,
+    ) -> "GuiEventConsumer":
+        """Register a callback to be executed on a specific event.
+
+        Arguments:
+            callback (callable): The callback to be executed when the event is produced.
+                The callback takes the event as the first argument and the GUI instance as
+                the second argument.
+                ```python
+                def on_event_received(event: Event, gui: Gui):
+                    ...
+                ```
+                Optionally, the callback can accept extra arguments (see the `callback_args`
+                argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the event and the GUI.
+            entity_type (Optional[EventEntityType]): The entity type of the event.
+                If None, the callback is registered for all entity types.
+            entity_id (Optional[str]): The entity id of the event.
+                If None, the callback is registered for all entities.
+            operation (Optional[EventOperation]): The operation of the event.
+                If None, the callback is registered for all operations.
+            attribute_name (Optional[str]): The attribute name of an update event.
+                If None, the callback is registered for all attribute names.
+            filter (Optional[Callable[[Event], bool]]): A custom filter to apply to
+                the event before triggering the callback. The filter must accept an event
+                as the only argument and return a boolean. If the filter returns False, the
+                callback is not triggered.
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_event(
+            callback=callback,
+            callback_args=callback_args,
+            entity_type=entity_type,
+            entity_id=entity_id,
+            operation=operation,
+            attribute_name=attribute_name,
+            filter=filter,
+            broadcast=False,
+        )
+
+    def broadcast_on_event(
+        self,
+        callback: Callable,
+        callback_args: Optional[List] = None,
+        entity_type: Optional[EventEntityType] = None,
+        entity_id: Optional[str] = None,
+        operation: Optional[EventOperation] = None,
+        attribute_name: Optional[str] = None,
+        filter: Optional[Callable[[Event], bool]] = None,
+    ) -> "GuiEventConsumer":
+        """Register a callback to be broadcast to all states on a specific event.
+
+                Arguments:
+                    callback (callable): The callback to be executed for each state when the
+                        event is produced. The callback takes the state as the first argument
+                        and the event as the second argument.
+                        ```python
+                        def on_event_received(state, event: Event):
+                            ...
+                        ```
+                        Optionally, the callback can accept extra arguments (see the `callback_args`
+                        argument).
+                    callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                        function in addition to the state and the event.
+                    entity_type (Optional[EventEntityType]): The entity type of the event.
+                        If None, the callback is registered for all entity types.
+                    entity_id (Optional[str]): The entity id of the event.
+                        If None, the callback is registered for all entities.
+                    operation (Optional[EventOperation]): The operation of the event.
+                        If None, the callback is registered for all operations.
+                    attribute_name (Optional[str]): The attribute name of an update event.
+                        If None, the callback is registered for all attribute names.
+                    filter (Optional[Callable[[Event], bool]]): A custom filter to apply to
+                        the event before triggering the callback. The filter must accept an event
+                        as the only argument and return a boolean. If the filter returns False, the
+                        callback is not triggered.
+                Returns:
+                    GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+                """
+        return self.__on_event(
+            callback=callback,
+            callback_args=callback_args,
+            entity_type=entity_type,
+            entity_id=entity_id,
+            operation=operation,
+            attribute_name=attribute_name,
+            filter=filter,
+            broadcast=True,
+        )
+
+    def __on_event(
+        self,
+        callback: Callable,
+        callback_args: Optional[List] = None,
+        entity_type: Optional[EventEntityType] = None,
+        entity_id: Optional[str] = None,
+        operation: Optional[EventOperation] = None,
+        attribute_name: Optional[str] = None,
+        filter: Optional[Callable[[Event], bool]] = None,
+        broadcast: bool = False,
+    ) -> "GuiEventConsumer":
+        topic = self.__build_topic(entity_type, entity_id, operation, attribute_name)
+        cb = self.__build_callback(callback, callback_args, filter, broadcast)
+        self.__register_callback(topic, cb)
+        return self
+
+    def on_scenario_created(self,
+                            callback: Callable,
+                            callback_args: Optional[List] = None,
+                            scenario_config: Union[str, ScenarioConfig, List, None] = None,
+                            ) -> "GuiEventConsumer":
+        """ Register a callback for scenario creation events.
+
+        !!! Example:
+
+            === "A callback for all scenario creations"
+
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+
+            def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
+                print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.on_scenario_created(callback=print_scenario_created)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+            === "One callback for a specific scenario configuration"
+
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui
+
+            def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
+                print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
+
+            if __name__ == "__main__":
+                event_consumer = GuiEventConsumer()
+                event_consumer.on_scenario_created(callback=print_scenario_created, scenario_config="my_cfg")
+                event_consumer.start()
+                ...
+            ```
+
+        Arguments:
+            callback (callable):The callback to be executed when consuming the event.
+                ```python
+                def on_event_received(event: Event, scenario: Scenario, gui: Gui):
+                    ...
+                ```
+                The callback is triggered when a scenario is created. It takes the event
+                the scenario, and the GUI instance as arguments. It can also accept extra
+                arguments (see the `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the event, the scenario and the GUI.
+            scenario_config (Union[str, ScenarioConfig, List, None]): The
+                optional scenario configuration ids or scenario configurations
+                for which the callback is registered. If None, the callback is registered
+                for all scenario configurations.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_scenario_created(
+            callback=callback,
+            callback_args=callback_args,
+            scenario_config=scenario_config,
+            broadcast=False,
+        )
+
+    def broadcast_on_scenario_created(self,
+                                      callback: Callable,
+                                      callback_args: Optional[List] = None,
+                                      scenario_config: Union[str, ScenarioConfig, List, None] = None,
+                                      ) -> "GuiEventConsumer":
+        """ Register a callback executed for all states on scenario creation events.
+
+        !!! Examples:
+
+                === "Two callbacks for all scenario creations"
+
+                ```python
+                import taipy as tp
+                from taipy import Event, GuiEventConsumer, Gui, State
+
+                def store_latest_scenario(state: State, event: Event, scenario: Scenario):
+                    print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
+                    state.latest_scenario = scenario
+
+                if __name__ == "__main__":
+                    gui = Gui()
+                    event_consumer = GuiEventConsumer(gui)
+                    event_consumer.broadcast_on_scenario_created(callback=store_latest_scenario)
+                    event_consumer.start()
+                    ...
+                    taipy.run(gui)
+                ```
+
+                === "One callback for a specific scenario configuration"
+
+                ```python
+                import taipy as tp
+                from taipy import Event, GuiEventConsumer, Gui
+
+                def scenario_created(state, event: Event, scenario: Scenario):
+                    print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
+                    state.latest_scenario = scenario
+
+                if __name__ == "__main__":
+                    event_consumer = GuiEventConsumer()
+                    event_consumer.broadcast_on_scenario_created(callback=scenario_created, scenario_config="my_cfg")
+                    event_consumer.start()
+                    ...
+                ```
+
+        Arguments:
+            callback (callable):The callback to be executed for each state when
+                a scenario creation event occurs.
+                ```python
+                def on_event_received(state: State, event: Event, scenario: Scenario):
+                    ...
+                ```
+                The callback takes the state, the event, and the scenario as arguments.
+                Optionally, the callback can accept extra arguments (see the
+                `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the state, the event, and the scenario.
+            scenario_config (Union[str, ScenarioConfig, List, None]): The
+                optional scenario configuration ids or scenario configurations
+                for which the callback is registered. If None, the callback is registered
+                for all scenario configurations.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_scenario_created(
+            callback=callback,
+            callback_args=callback_args,
+            scenario_config=scenario_config,
+            broadcast=True,
+        )
+
+    def __on_scenario_created(self,
+                              callback: Callable,
+                              callback_args: Optional[List] = None,
+                              scenario_config: Union[str, ScenarioConfig, List, None] = None,
+                              broadcast: bool = False,
+                              ) -> "GuiEventConsumer":
+        scenario_config = self.__format_configs_parameter(ScenarioConfig, scenario_config)
+
+        def _filter(event: Event) -> bool:
+            if not event.entity_id:
+                return False
+            import taipy as tp
+
+            sc = tp.get(event.entity_id)
+            if not isinstance(sc, Scenario):
+                return False
+            if scenario_config and sc.config_id not in scenario_config: # type: ignore[union-attr]
+                return False
+            event.metadata["predefined_args"] = [sc]
+            return True
+
+        self.__on_event(callback=callback,
+                        callback_args=callback_args,
+                        entity_type=EventEntityType.SCENARIO,
+                        operation=EventOperation.CREATION,
+                        filter=_filter,
+                        broadcast=broadcast)
+        return self
+
+    def on_scenario_deleted(self,
+                            callback: Callable,
+                            callback_args: Optional[List] = None,
+                            scenario_config: Union[str, ScenarioConfig, List, None] = None,
+                            ) -> "GuiEventConsumer":
+        """ Register a callback for scenario deletion events.
+
+        !!! Example:
+
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+
+            def print_scenario_deleted(event: Event, scenario_id: str, gui: Gui):
+                print(f"A scenario has been deleted at '{event.creation_date}'.")
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.on_scenario_deleted(callback=print_scenario_)
+                event_consumer.on_scenario_deleted(callback=print_scenario_deleted)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        Arguments:
+            callback (callable):The callback to be executed on scenario deletion event.
+                ```python
+                def on_event_received(event: Event, scenario_id: str, gui: Gui):
+                    ...
+                ```
+                The callback takes the event, the scenario id, and the GUI instance as
+                arguments. Optionally, it can also accept extra arguments (see the
+                `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the event, the scenario id, and the GUI.
+            scenario_config (Union[str, ScenarioConfig, List, None]): The
+                optional scenario configuration ids or scenario configurations
+                for which the callback is registered. If None, the callback is registered
+                for all scenario configurations.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_scenario_deleted(
+            callback=callback,
+            callback_args=callback_args,
+            scenario_config=scenario_config,
+            broadcast=False,
+        )
+
+    def broadcast_on_scenario_deleted(self,
+                                      callback: Callable,
+                                      callback_args: Optional[List] = None,
+                                      scenario_config: Union[str, ScenarioConfig, List, None] = None,
+                                      ) -> "GuiEventConsumer":
+        """ Register a callback executed for all states on scenario deletion events.
+
+        !!! Example:
+
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+            from taipy.gui import notify
+
+            def on_scenario_deleted(state: State, event: Event, scenario_id: str):
+                notify(state, f"A scenario has been deleted at '{event.creation_date}'.")
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.broadcast_on_scenario_deleted(callback=on_scenario_deleted)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        Arguments:
+            callback (Callable):The callback to be executed for each state on scenario
+                deletion event.
+                ```python
+                def on_event_received(state: State, event: Event, scenario_id: str):
+                    ...
+                ```
+                The callback takes the state, the event, and the scenario id as arguments.
+                Optionally, it can also accept extra arguments (see the `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the state, the event, and the scenario id.
+            scenario_config (Union[str, ScenarioConfig, List, None]): The
+                optional scenario configuration ids or scenario configurations
+                for which the callback is registered. If None, the callback is registered
+                for all scenario configurations.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_scenario_deleted(
+            callback=callback,
+            callback_args=callback_args,
+            scenario_config=scenario_config,
+            broadcast=True,
+        )
+
+    def __on_scenario_deleted(self,
+                              callback: Callable,
+                              callback_args: Optional[List] = None,
+                              scenario_config: Union[str, ScenarioConfig, List, None] = None,
+                              broadcast: bool = False
+                              ) -> "GuiEventConsumer":
+        scenario_config = self.__format_configs_parameter(ScenarioConfig, scenario_config)
+
+        def _filter(event: Event) -> bool:
+            if not scenario_config:
+                event.metadata["predefined_args"] = [event.entity_id]
+                return True
+            for cfg_id in scenario_config:
+                if cfg_id in str(event.entity_id):
+                    event.metadata["predefined_args"] = [event.entity_id]
+                    return True
+            return False
+
+        self.__on_event(callback=callback,
+                        callback_args=callback_args,
+                        entity_type=EventEntityType.SCENARIO,
+                        operation=EventOperation.DELETION,
+                        filter=_filter,
+                        broadcast=broadcast)
+        return self
+
+    def on_datanode_written(self,
+                            callback: Callable,
+                            callback_args: Optional[List] = None,
+                            datanode_config: Union[str, DataNodeConfig, List, None] = None,
+                            ) -> "GuiEventConsumer":
+        """ Register a callback for data node written events.
+
+        The callback is triggered when a datanode is written (see methods
+        `Datanode.write()^` or `Datanode.append()^`).
+
+        !!! Example:
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+
+            def last_data_edition(event: Event, datanode: Datanode, data: Any, gui: Gui):
+                print(f"Datanode written at '{event.creation_date}'.")
+                state.last_data_edition.append[datanode.id]
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.on_datanode_written(callback=last_data_edition, broadcast=True)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        Arguments:
+            callback (callable):The callback to be executed when consuming the event.
+                ```python
+                def on_event_received(event: Event,
+                                      datanode: Datanode,
+                                      data: Any,
+                                      gui: Gui):
+                    ...
+                ```
+               The callback takes the event, the datanode, the data, and the GUI instance as
+                arguments. Optionally, the callback can accept extra arguments (see the
+                `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the event, the datanode, the data, and the GUI.
+            datanode_config (Union[str, DataNodeConfig, List, None]): The
+                optional datanode configuration ids or datanode configurations
+                for which the callback is registered. If None, the callback is registered
+                for all datanode configurations.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_datanode_written(
+            callback=callback,
+            callback_args=callback_args,
+            datanode_config=datanode_config,
+            broadcast=False,
+        )
+
+    def broadcast_on_datanode_written(self,
+                                      callback: Callable,
+                                      callback_args: Optional[List] = None,
+                                      datanode_config: Union[str, DataNodeConfig, List, None] = None,
+                                      ) -> "GuiEventConsumer":
+        """ Register a callback for data node written events.
+
+        The callback is triggered when a datanode is written (see methods
+        `Datanode.write()^` or `Datanode.append()^`).
+
+        !!! Example:
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+
+            def last_data_edition(state: State, event: Event, datanode: Datanode, data: Any):
+                print(f"Datanode written at '{event.creation_date}'.")
+                state.last_data_edition.append[datanode.id]
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.broadcast_on_datanode_written(callback=last_data_edition)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        Arguments:
+            callback (callable): The callback to be executed for all states on data node
+                written events.
+                ```python
+                def on_event_received(state: State, event: Event, datanode: Datanode, data: Any):
+                    ...
+                ```
+               The callback takes the state, the event, the datanode, the data, and the GUI
+                instance as arguments. Optionally, the callback can accept extra arguments
+                (see the `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the state, the event, the datanode, and the data.
+            datanode_config (Union[str, DataNodeConfig, List, None]): The
+                optional datanode configuration ids or datanode configurations
+                for which the callback is registered. If None, the callback is registered
+                for all datanode configurations.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_datanode_written(
+            callback=callback,
+            callback_args=callback_args,
+            datanode_config=datanode_config,
+            broadcast=True,
+        )
+
+    def __on_datanode_written(self,
+                              callback: Callable,
+                              callback_args: Optional[List] = None,
+                              datanode_config: Union[str, DataNodeConfig, List, None] = None,
+                              broadcast: bool = False
+                              ) -> "GuiEventConsumer":
+        datanode_config = self.__format_configs_parameter(DataNodeConfig, datanode_config)
+
+        def _filter(event: Event) -> bool:
+            if not event.entity_id:
+                return False
+
+            import taipy as tp
+
+            dn = tp.get(event.entity_id)
+            if not isinstance(dn, DataNode):
+                return False
+            if datanode_config and dn.config_id not in datanode_config:
+                return False
+            event.metadata["predefined_args"] = [dn, dn.read()]
+            return True
+
+        self.__on_event(callback=callback,
+                        callback_args=callback_args,
+                        entity_type=EventEntityType.DATA_NODE,
+                        operation=EventOperation.UPDATE,
+                        attribute_name="last_edit_date",
+                        filter=_filter,
+                        broadcast=broadcast)
+        return self
+
+    def on_datanode_deleted(self,
+                            callback: Callable,
+                            callback_args: Optional[List] = None,
+                            datanode_config: Union[str, DataNodeConfig, List, None] = None,
+                            ) -> "GuiEventConsumer":
+        """ Register a callback for data node deletion events.
+
+        !!! Example:
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+
+            def on_deletions(event: Event, datanode_id: str, gui: Gui):
+                print(f"Datanode deleted at '{event.creation_date}'.")
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.on_datanode_deleted(callback=record_deletions)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        Arguments:
+            callback (callable):The callback to be executed when consuming the event.
+                ```python
+                def on_event_received(event: Event, datanode_id: str, gui: Gui):
+                    ...
+                ```
+                The callback takes the event, the datanode id, and the GUI instance as
+                arguments. Optionally, it can accept extra arguments (see the
+                `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the event, the datanode id, and the GUI.
+            datanode_config (Union[str, DataNodeConfig, List, None]): The
+                optional datanode configuration ids or datanode configurations
+                for which the callback is registered. If None, the callback is registered
+                for all datanode configurations.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_datanode_deleted(
+            callback=callback,
+            callback_args=callback_args,
+            datanode_config=datanode_config,
+            broadcast=False,
+        )
+
+    def broadcast_on_datanode_deleted(self,
+                                      callback: Callable,
+                                      callback_args: Optional[List] = None,
+                                      datanode_config: Union[str, DataNodeConfig, List, None] = None,
+                                      ) -> "GuiEventConsumer":
+        """ Register a callback for each state on data node deletion events.
+
+        !!! Example:
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+
+            def record_deletions(state: State, event: Event, datanode_id: str):
+                print(f"Datanode deleted at '{event.creation_date}'.")
+                state.deleted_datanodes.append[datanode_id]
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.broadcast_on_datanode_deleted(callback=record_deletions)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        Arguments:
+            callback (callable): The callback to be executed for each state on data node
+                deletion events.
+                ```python
+                def on_event_received(state: State, event: Event, datanode_id: str):
+                    ...
+                ```
+                The callback takes the state, the event, the datanode id, and the GUI
+                instance as arguments. Optionally, it can accept extra arguments (see the
+                `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the state, the event, and the datanode id.
+            datanode_config (Union[str, DataNodeConfig, List, None]): The
+                optional datanode configuration ids or datanode configurations
+                for which the callback is registered. If None, the callback is registered
+                for all datanode configurations.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_datanode_deleted(
+            callback=callback,
+            callback_args=callback_args,
+            datanode_config=datanode_config,
+            broadcast=True,
+        )
+
+    def __on_datanode_deleted(self,
+                              callback: Callable,
+                              callback_args: Optional[List] = None,
+                              datanode_config: Union[str, DataNodeConfig, List, None] = None,
+                              broadcast: bool = False
+                              ) -> "GuiEventConsumer":
+        datanode_config = self.__format_configs_parameter(DataNodeConfig, datanode_config)
+
+        def _filter(event: Event) -> bool:
+            if not datanode_config:
+                event.metadata["predefined_args"] = [event.entity_id]
+                return True
+            for cfg_id in datanode_config:
+                if cfg_id in str(event.entity_id):
+                    event.metadata["predefined_args"] = [event.entity_id]
+                    return True
+            return False
+
+        self.__on_event(callback=callback,
+                        callback_args=callback_args,
+                        entity_type=EventEntityType.DATA_NODE,
+                        operation=EventOperation.DELETION,
+                        filter=_filter,
+                        broadcast=broadcast)
+        return self
+
+    def on_datanode_created(self,
+                            callback: Callable,
+                            callback_args: Optional[List] = None,
+                            datanode_config: Union[str, DataNodeConfig, List, None] = None,
+                            ) -> "GuiEventConsumer":
+        """ Register a callback to be executed on data node creation event.
+
+        !!! Example:
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+
+            def on_datanode_creations(event: Event, datanode: DataNode, gui: Gui):
+                print(f"Datanode created at '{event.creation_date}'.")
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.on_datanode_created(callback=record_creations)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        Arguments:
+            callback (callable):The callback to be executed on data node creation events.
+                ```python
+                def on_event_received(event: Event, datanode: DataNode, gui: Gui):
+                    ...
+                ```
+                The callback takes the event, the datanode, and the GUI instance as
+                arguments. Optionally, the callback can accept extra arguments (see the
+                `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the event, the datanode, and the GUI.
+            datanode_config (Union[str, ScenarioConfig, List, None]): The
+                optional datanode configuration ids or datanode configurations
+                for which the callback is registered. If None, the callback is registered
+                for all datanode configurations.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_datanode_created(
+            callback=callback,
+            callback_args=callback_args,
+            datanode_config=datanode_config,
+            broadcast=False,
+        )
+
+    def broadcast_on_datanode_created(self,
+                                      callback: Callable,
+                                      callback_args: Optional[List] = None,
+                                      datanode_config: Union[str, DataNodeConfig, List, None] = None,
+                                      ) -> "GuiEventConsumer":
+        """ Register a callback to be executed for each state on data node creation event.
+
+        !!! Examples:
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+            from taipy.gui import notify
+
+            def on_datanode_creations(state: State, event: Event, datanode: DataNode):
+                print(f"Datanode created at '{event.creation_date}'.")
+                notify(state, f"Datanode '{datanode.id}' created.")
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.broadcast_on_datanode_created(callback=record_creations)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        Arguments:
+            callback (callable):The callback to be executed on data node creation events.
+                ```python
+                def on_event_received(state: State, event: Event, datanode: DataNode):
+                    ...
+                ```
+                The callback takes the state, the event, the datanode as arguments.
+                Optionally, the callback can accept extra arguments (see the
+                `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the state, the event, and the datanode.
+            datanode_config (Union[str, ScenarioConfig, List, None]): The
+                optional datanode configuration ids or datanode configurations
+                for which the callback is registered. If None, the callback is registered
+                for all datanode configurations.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_datanode_created(
+            callback=callback,
+            callback_args=callback_args,
+            datanode_config=datanode_config,
+            broadcast=True,
+        )
+
+    def __on_datanode_created(self,
+                              callback: Callable,
+                              callback_args: Optional[List] = None,
+                              datanode_config: Union[str, DataNodeConfig, List, None] = None,
+                              broadcast: bool = False
+                              ) -> "GuiEventConsumer":
+        datanode_config = self.__format_configs_parameter(DataNodeConfig, datanode_config)
+
+        def _filter(event: Event) -> bool:
+            if not event.entity_id:
+                return False
+            import taipy as tp
+
+            dn = tp.get(event.entity_id)
+            if not isinstance(dn, DataNode):
+                return False
+            if datanode_config and dn.config_id not in datanode_config:
+                return False
+            event.metadata["predefined_args"] = [dn]
+            return True
+
+        self.__on_event(callback=callback,
+                        callback_args=callback_args,
+                        entity_type=EventEntityType.DATA_NODE,
+                        operation=EventOperation.CREATION,
+                        filter=_filter,
+                        broadcast=broadcast)
+        return self
+
+    def on_submission_finished(self,
+                               callback: Callable,
+                               callback_args: Optional[List] = None,
+                               config_ids: Union[str, ScenarioConfig, TaskConfig, List, None] = None,
+                               ) -> "GuiEventConsumer":
+        """Register a callback for submission finished events.
+
+        !!! Example:
+            ```python
+            import taipy as tp
+            from taipy import Event, GuiEventConsumer, Gui, State
+
+            def record_submissions(event: Event, submittable: Submittable, submission: Submission, gui: Gui):
+                if submission.submission_status == SubmissionStatus.COMPLETED:
+                    print(f"Submission completed at '{event.creation_date}'. Status: '{submission.submission_status}'")
+                elif submission.submission_status == SubmissionStatus.FAILED:
+                    print(f"Submission failed at '{event.creation_date}'. Status: '{submission.submission_status}'")
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.on_submission_finished(callback=record_submissions)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        Arguments:
+            callback (callable): The callback to be executed on submission finished
+                events.
+                ```python
+                def on_event_received(event: Event, submittable: Submittable, submission: Submission, gui: Gui):
+                    ...
+                ```
+                The callback takes the event, the submittable (scenario, sequence or task),
+                the submission, and the GUI instance as arguments. Optionally, the
+                callback can accept extra arguments (see the `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the event, the submittable, the submission, and the GUI.
+            config_ids (Union[str, ScenarioConfig, TaskConfig, List, None]): The
+                optional scenario configuration ids or task configuration ids or scenario
+                configurations or task configurations for which the callback is registered.
+                If None, the callback is registered for any submittable.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_submission_finished(
+            callback=callback,
+            callback_args=callback_args,
+            config_ids=config_ids,
+            broadcast=False,
+        )
+
+    def broadcast_on_submission_finished(self,
+                                         callback: Callable,
+                                         callback_args: Optional[List] = None,
+                                         config_ids: Union[str, ScenarioConfig, TaskConfig, List, None] = None,
+                                         ) -> "GuiEventConsumer":
+        """Register a callback to be executed for each state on submission finished events.
+
+        !!! Example:
+            ```python
+            import taipy as tp
+            from taipy import Event, EventConsumer, Gui, State
+
+            def record_submissions(state: State, event: Event, submittable: Submittable, submission: Submission):
+                print(f"Submission finished at '{event.creation_date}'. Status: '{submission.submission_status}'")
+                if submission.submission_status == SubmissionStatus.COMPLETED:
+                    state.completed.append[submittable.id]
+                elif submission.submission_status == SubmissionStatus.FAILED:
+                    state.failed.append[submittable.id]
+
+            if __name__ == "__main__":
+                gui = Gui()
+                event_consumer = GuiEventConsumer(gui)
+                event_consumer.on_submission_finished(callback=record_submissions, broadcast=True)
+                event_consumer.start()
+                ...
+                taipy.run(gui)
+            ```
+
+        Arguments:
+            callback (callable): The callback to be executed for each state on submission
+                finished events.
+                ```python
+                def on_event_received(state: State, event: Event, submittable: Submittable, submission: Submission):
+                    ...
+                ```
+                The callback takes the state, the event, the submittable (scenario, sequence
+                or task), and the submission. Optionally, the callback can accept extra
+                arguments (see the `callback_args` argument).
+            callback_args (List[AnyOf]): The extra arguments to be passed to the callback
+                function in addition to the state, the event, the submittable, and the
+                submission.
+            config_ids (Union[str, ScenarioConfig, TaskConfig, List, None]): The
+                optional scenario configuration ids or task configuration ids or scenario
+                configurations or task configurations for which the callback is registered.
+                If None, the callback is registered for any submittable.
+
+        Returns:
+            GuiEventConsumer: The current instance of the `GuiEventConsumer` service.
+        """
+        return self.__on_submission_finished(
+            callback=callback,
+            callback_args=callback_args,
+            config_ids=config_ids,
+            broadcast=True,
+        )
+
+    def __on_submission_finished(self,
+                                 callback: Callable,
+                                 callback_args: Optional[List] = None,
+                                 config_ids: Union[str, ScenarioConfig, TaskConfig, List, None] = None,
+                                 broadcast: bool = False
+                                 ) -> "GuiEventConsumer":
+        if isinstance(config_ids, str):
+            config_ids = [config_ids]
+        if isinstance(config_ids, TaskConfig):
+            config_ids = [config_ids.id]
+        if isinstance(config_ids, ScenarioConfig):
+            config_ids = [config_ids.id]
+        if isinstance(config_ids, list):
+            res = []
+            for cfg in config_ids:
+                if isinstance(cfg, TaskConfig):
+                    res.append(cfg.id)
+                elif isinstance(cfg, ScenarioConfig):
+                    res.append(cfg.id)
+                else:
+                    res.append(cfg)
+            config_ids = res
+
+        def _filter(event: Event) -> bool:
+            finished_statuses = {SubmissionStatus.COMPLETED, SubmissionStatus.FAILED, SubmissionStatus.CANCELED}
+            if not event.entity_id or not event.attribute_value or event.attribute_value not in finished_statuses:
+                return False
+            import taipy as tp
+
+            submission = tp.get(event.entity_id)
+            if not isinstance(submission, Submission):
+                return False
+            if config_ids:
+                # We are filtering on a specific config
+                if not submission.entity_config_id:
+                    # It is a submission for a sequence that does not have configs
+                    return False
+                if submission.entity_config_id not in config_ids:
+                    # It is a submission for a config that is not in the list
+                    return False
+
+            submittable = tp.get(submission.entity_id) # type: ignore[arg-type]
+            event.metadata["predefined_args"] = [submittable, submission]
+            return True
+
+        self.__on_event(callback=callback,
+                        callback_args=callback_args,
+                        entity_type=EventEntityType.SUBMISSION,
+                        operation=EventOperation.UPDATE,
+                        attribute_name="submission_status",
+                        filter=_filter,
+                        broadcast=broadcast)
+        return self
+
+    def process_event(self, event: Event) -> None:
+        """Process an event.
+
+        This method is responsible for processing the incoming event.
+
+        Args:
+            event (Event): The event to be processed.
+        """
+        self.event_processor.process_event(self, event)
+
+    def start(self):
+        """Start the event consumer thread."""
+        Notifier._register_from_registration(self._registration)
+        super().start()
+
+    def stop(self):
+        """Stop the event consumer thread."""
+        super().stop()
+        Notifier.unregister(self._registration.registration_id)
+
+    @staticmethod
+    def __format_configs_parameter(clazz, parameter) -> List[str]:
+        if isinstance(parameter, str):
+            parameter = [parameter]
+        if isinstance(parameter, clazz):
+            parameter = [parameter.id]  # type: ignore[attr-defined]
+        if isinstance(parameter, list):
+            parameter = [cfg.id if isinstance(cfg, clazz) else cfg for cfg in parameter]  # type: ignore[attr-defined]
+        return parameter
+
+    def __build_topic(self, entity_type, entity_id, operation, attribute_name):
+        topic = _Topic(entity_type, entity_id, operation, attribute_name)
+        self._registration.add_topic(
+            entity_type=topic.entity_type,
+            entity_id=topic.entity_id,
+            operation=topic.operation,
+            attribute_name=topic.attribute_name
+        )
+        return topic
+
+    def __build_callback(self, callback, callback_args, filter, broadcast):
+        if broadcast and self._gui is None:
+            _TaipyLogger._get_logger().error(
+                "A callback is set to be broadcast to all states of "
+                "the GUI but no GUI is provided to the event consumer."
+            )
+            raise NoGuiDefinedInEventConsumer()
+        if callback_args is None:
+            callback_args = []
+        cb = _Callback(callback, args=callback_args, broadcast=broadcast, filter=filter)
+        return cb
+
+    def __register_callback(self, topic, cb):
+        if self._topic_callbacks_map.get(topic) is None:
+            self._topic_callbacks_map[topic] = []
+        self._topic_callbacks_map[topic].append(cb)
+
+    def _process_event(self, event: Event) -> None:
+        for topic, cbs in self._topic_callbacks_map.items():
+            if Notifier._is_matching(event, topic):
+                for cb in cbs:
+                    if not cb.filter or cb.filter(event):
+                        self.__do_process(cb, event)
+
+    def __do_process(self, cb, event: Event) -> None:
+        predefined_args = event.metadata.pop("predefined_args", [])
+        if cb.broadcast:
+            if not self._gui:
+                _TaipyLogger._get_logger().error(
+                    "A callback is set to be broadcast to all states of "
+                    "the GUI but no GUI is provided to the event consumer."
+                )
+                return
+            self._gui.broadcast_callback(cb.callback, [event, *predefined_args, *cb.args])
+        else:
+            cb.callback(event, *predefined_args, self._gui, *cb.args)

+ 16 - 0
taipy/exceptions/__init__.py

@@ -0,0 +1,16 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+"""Exceptions raised by taipy package functionalities."""
+
+class NoGuiDefinedInEventConsumer(Exception):
+    """Raised when an on event callback is registered to be broadcast to all states,
+    but no GUI is defined in the event consumer."""

+ 3 - 3
taipy/gui_core/_context.py

@@ -52,8 +52,8 @@ from taipy.core import get as core_get
 from taipy.core import submit as core_submit
 from taipy.core.data._file_datanode_mixin import _FileDataNodeMixin
 from taipy.core.data.data_node_id import EDIT_COMMENT_KEY, EDIT_EDITOR_ID_KEY, EDIT_JOB_ID_KEY, EDIT_TIMESTAMP_KEY
-from taipy.core.notification import CoreEventConsumerBase, EventEntityType
-from taipy.core.notification.event import Event, EventOperation
+from taipy.core.notification._core_event_consumer import _CoreEventConsumerBase
+from taipy.core.notification.event import Event, EventEntityType, EventOperation
 from taipy.core.notification.notifier import Notifier
 from taipy.core.reason import ReasonCollection
 from taipy.core.submission.submission_status import SubmissionStatus
@@ -73,7 +73,7 @@ from ._utils import _ClientStatus
 from .filters import CustomScenarioFilter, ParamType
 
 
-class _GuiCoreContext(CoreEventConsumerBase):
+class _GuiCoreContext(_CoreEventConsumerBase):
     __PROP_ENTITY_ID = "id"
     __PROP_ENTITY_COMMENT = "comment"
     __PROP_CONFIG_ID = "config"

+ 4 - 4
tests/core/notification/test_core_event_consumer.py

@@ -14,14 +14,14 @@ from queue import SimpleQueue
 from taipy.common.config import Config
 from taipy.core import taipy as tp
 from taipy.core.common.frequency import Frequency
-from taipy.core.notification.core_event_consumer import CoreEventConsumerBase
+from taipy.core.notification._core_event_consumer import _CoreEventConsumerBase
 from taipy.core.notification.event import Event, EventEntityType, EventOperation
 from taipy.core.notification.notifier import Notifier
 from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
 from tests.core.utils import assert_true_after_time
 
 
-class AllCoreEventConsumerProcessor(CoreEventConsumerBase):
+class AllCoreEventConsumerProcessor(_CoreEventConsumerBase):
     def __init__(self, registration_id: str, queue: SimpleQueue):
         self.event_collected = 0
         self.event_entity_type_collected: dict = {}
@@ -36,7 +36,7 @@ class AllCoreEventConsumerProcessor(CoreEventConsumerBase):
         self.event_operation_collected[event.operation] = self.event_operation_collected.get(event.operation, 0) + 1
 
 
-class ScenarioCoreEventConsumerProcessor(CoreEventConsumerBase):
+class ScenarioCoreEventConsumerProcessor(_CoreEventConsumerBase):
     def __init__(self, registration_id: str, queue: SimpleQueue):
         self.scenario_event_collected = 0
         self.event_operation_collected: dict = {}
@@ -47,7 +47,7 @@ class ScenarioCoreEventConsumerProcessor(CoreEventConsumerBase):
         self.event_operation_collected[event.operation] = self.event_operation_collected.get(event.operation, 0) + 1
 
 
-class TaskCreationCoreEventConsumerProcessor(CoreEventConsumerBase):
+class TaskCreationCoreEventConsumerProcessor(_CoreEventConsumerBase):
     def __init__(self, registration_id: str, queue: SimpleQueue):
         self.task_event_collected = 0
         self.creation_event_operation_collected = 0

+ 2 - 2
tests/core/notification/test_events_published.py

@@ -19,7 +19,7 @@ from taipy.common.config import Config
 from taipy.core import taipy as tp
 from taipy.core.common.frequency import Frequency
 from taipy.core.job.status import Status
-from taipy.core.notification.core_event_consumer import CoreEventConsumerBase
+from taipy.core.notification._core_event_consumer import _CoreEventConsumerBase
 from taipy.core.notification.event import Event, EventEntityType, EventOperation
 from taipy.core.notification.notifier import Notifier
 from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
@@ -49,7 +49,7 @@ class Snapshot:
                 self.attr_value_collected[event.attribute_name] = [event.attribute_value]
 
 
-class RecordingConsumer(CoreEventConsumerBase):
+class RecordingConsumer(_CoreEventConsumerBase):
     """
     A straightforward and no-thread core events consumer that allows to
     capture snapshots of received events.

+ 1 - 1
tests/core/notification/test_notifier.py

@@ -11,10 +11,10 @@
 
 from queue import SimpleQueue
 
+from taipy import Frequency
 from taipy.common.config import Config
 from taipy.core import taipy as tp
 from taipy.core._version._version_manager_factory import _VersionManagerFactory
-from taipy.core.common.frequency import Frequency
 from taipy.core.notification import EventEntityType, EventOperation, _Registration
 from taipy.core.notification._topic import _Topic
 from taipy.core.notification.event import Event

+ 14 - 13
tests/core/sequence/test_sequence_manager.py

@@ -791,19 +791,20 @@ def test_hard_delete_one_single_sequence_with_cycle_data_nodes():
 
 
 def test_hard_delete_shared_entities():
-    input_dn = Config.configure_data_node("my_input", "in_memory", scope=Scope.SCENARIO, default_data="testing")
-    intermediate_dn = Config.configure_data_node("my_inter", "in_memory", scope=Scope.GLOBAL, default_data="testing")
-    output_dn = Config.configure_data_node("my_output", "in_memory", scope=Scope.GLOBAL, default_data="testing")
+    input_dn = Config.configure_data_node("my_input", "in_memory", default_data="testing")
+    intermediate_dn = Config.configure_data_node("my_inter", "in_memory")
+    output_dn = Config.configure_data_node("my_output", "in_memory")
     task_1 = Config.configure_task("task_1", print, input_dn, intermediate_dn)
     task_2 = Config.configure_task("task_2", print, intermediate_dn, output_dn)
 
-    tasks_scenario_1 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_1")
-    tasks_scenario_2 = _TaskManager._bulk_get_or_create([task_1, task_2], scenario_id="scenario_id_2")
+    scenario_config = Config.configure_scenario("sc", [task_1, task_2])
+    import taipy as tp
+
+    scenario_1 = tp.create_scenario(scenario_config, name="scenario_1")
+    scenario_1.add_sequence("sequence", [scenario_1.task_1, scenario_1.task_2])
+    scenario_2 = tp.create_scenario(scenario_config, name="scenario_2")
+    scenario_2.add_sequence("sequence", [scenario_2.task_1, scenario_2.task_2])
 
-    scenario_1 = Scenario("scenario_1", tasks_scenario_1, {}, sequences={"sequence": {"tasks": tasks_scenario_1}})
-    scenario_2 = Scenario("scenario_2", tasks_scenario_2, {}, sequences={"sequence": {"tasks": tasks_scenario_2}})
-    _ScenarioManager._repository._save(scenario_1)
-    _ScenarioManager._repository._save(scenario_2)
     sequence_1 = scenario_1.sequences["sequence"]
     sequence_2 = scenario_2.sequences["sequence"]
 
@@ -812,14 +813,14 @@ def test_hard_delete_shared_entities():
 
     assert len(_ScenarioManager._get_all()) == 2
     assert len(_SequenceManager._get_all()) == 2
-    assert len(_TaskManager._get_all()) == 3
-    assert len(_DataManager._get_all()) == 4
+    assert len(_TaskManager._get_all()) == 4
+    assert len(_DataManager._get_all()) == 6
     assert len(_JobManager._get_all()) == 4
     _SequenceManager._hard_delete(sequence_1.id)
     assert len(_ScenarioManager._get_all()) == 2
     assert len(_SequenceManager._get_all()) == 1
-    assert len(_TaskManager._get_all()) == 3
-    assert len(_DataManager._get_all()) == 4
+    assert len(_TaskManager._get_all()) == 4
+    assert len(_DataManager._get_all()) == 6
     assert len(_JobManager._get_all()) == 4
 
 

+ 10 - 0
tests/event/__init__.py

@@ -0,0 +1,10 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.

+ 100 - 0
tests/event/conftest.py

@@ -0,0 +1,100 @@
+# Copyright 2021-2024 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+from datetime import datetime
+from unittest.mock import patch
+
+import pytest
+
+from taipy import Frequency, Scope
+from taipy.core.cycle.cycle import Cycle
+from taipy.core.cycle.cycle_id import CycleId
+from taipy.core.data.in_memory import InMemoryDataNode
+from taipy.core.notification.notifier import Notifier
+from taipy.core.scenario.scenario import Scenario
+from taipy.core.scenario.scenario_id import ScenarioId
+from taipy.core.sequence.sequence import Sequence
+from taipy.core.sequence.sequence_id import SequenceId
+from taipy.core.submission.submission import Submission
+
+current_time = datetime.now()
+
+@pytest.fixture(scope="function")
+def current_datetime():
+    return current_time
+
+
+@pytest.fixture(scope="function")
+def data_node():
+    return InMemoryDataNode(
+        "data_node",
+        Scope.SCENARIO,
+        version="random_version_number",
+        properties={"default_data": "foo"},
+    )
+
+
+@pytest.fixture(scope="function")
+def scenario(cycle):
+    return Scenario(
+        "sc",
+        set(),
+        {},
+        set(),
+        ScenarioId("SCENARIO_sc_id"),
+        current_time,
+        is_primary=False,
+        tags={"foo"},
+        cycle=None,
+        version="random_version_number",
+    )
+
+
+@pytest.fixture(scope="function")
+def cycle():
+    example_date = datetime.fromisoformat("2021-11-11T11:11:01.000001")
+    return Cycle(
+        Frequency.DAILY,
+        {},
+        creation_date=example_date,
+        start_date=example_date,
+        end_date=example_date,
+        name="cc",
+        id=CycleId("cc_id"),
+    )
+
+
+@pytest.fixture(scope="function")
+def sequence(scenario):
+    return Sequence({}, [], SequenceId(f"SEQUENCE_sequence_{scenario.id}"), version="random_version_number")
+
+
+@pytest.fixture(scope="function")
+def submission():
+    return Submission("entity_id", "entity_type")
+
+
+@pytest.fixture(scope="function", autouse=True)
+def init(init_notifier):
+    init_notifier()
+
+    with patch("sys.argv", ["prog"]):
+        yield
+
+    init_notifier()
+
+
+@pytest.fixture
+def init_notifier():
+    def _init_notifier():
+        Notifier._topics_registrations_list = {}
+
+    return _init_notifier

+ 165 - 0
tests/event/test_consumer__on_datanode_created.py

@@ -0,0 +1,165 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import DataNode, Gui
+from taipy.core.config import DataNodeConfig
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import GuiEventConsumer
+
+
+def cb_0(event: Event, datanode: DataNode):
+    ...
+
+
+def cb_1(event: Event, datanode: DataNode, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, datanode: DataNode):
+    ...
+
+
+def test_on_datanode_created(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_created(callback=cb_0)
+        # test the on_datanode_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.CREATION, entity_id=data_node.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True  # No config provided, so the datanode passes the filter
+            assert event.metadata["predefined_args"] == [data_node]
+
+
+def test_on_datanode_created_multiple_configs(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_created(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1", DataNodeConfig("dn2"), "data_node"])
+        # test the on_datanode_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.CREATION, entity_id=data_node.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True  # The datanode is from config 'data_node', so the datanode passes the filter
+            assert event.metadata["predefined_args"] == [data_node]
+
+
+def test_on_datanode_created_multiple_configs_no_matching(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_created(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1"])
+        # test the on_datanode_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.CREATION,
+                      entity_id=data_node.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            f_val = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert not f_val  # DataNode is not from any of the provided configs, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None
+
+
+def test_on_datanode_created_with_args_and_matching_config(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_created(callback=cb_1, callback_args=["foo"], datanode_config="data_node")
+        # test the on_datanode_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.CREATION, entity_id=data_node.id)
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True # datanode is from config 'data_node', so the datanode passes the filter
+            assert event.metadata.get("predefined_args") == [data_node]
+
+
+def test_on_datanode_created_with_args_and_not_matching_config(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_created(callback=cb_1, callback_args=["foo"], datanode_config="WRONG_CFG")
+        # test the on_datanode_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.CREATION, entity_id=data_node.id)
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is False  # datanode is not from WRONG_CFG, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None # No need to cache the datanode in the metadata
+
+
+def test_on_datanode_created_with_broadcast():
+    consumer = GuiEventConsumer(Gui())
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.broadcast_on_datanode_created(callback=cb_for_state)
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=True)
+

+ 149 - 0
tests/event/test_consumer__on_datanode_deleted.py

@@ -0,0 +1,149 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import Gui
+from taipy.core.config import DataNodeConfig
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import GuiEventConsumer
+
+
+def cb_0(event: Event, datanode: str):
+    ...
+
+
+def cb_1(event: Event, datanode: str, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, datanode: str):
+    ...
+
+
+def test_on_datanode_deleted(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_0)
+        # test the on_datanode_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    operation=EventOperation.DELETION,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.DELETION, entity_id=data_node.id)
+        assert actual_filter is not None
+        filter_value = actual_filter(event)
+        assert filter_value is True  # No config provided, so the datanode passes the filter
+        assert event.metadata["predefined_args"] == [data_node.id]
+
+
+def test_on_datanode_deleted_multiple_configs(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1", DataNodeConfig("dn2"), "data_node"])
+        # test the on_datanode_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.DELETION, entity_id=data_node.id)
+        assert actual_filter is not None
+        filter_value = actual_filter(event)
+        assert filter_value is True  # The datanode is from config 'data_node', so the datanode passes the filter
+        assert event.metadata["predefined_args"] == [data_node.id]
+
+
+def test_on_datanode_deleted_multiple_configs_no_matching(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1"])
+        # test the on_datanode_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.DELETION,
+                      entity_id=data_node.id)
+        assert actual_filter is not None
+        f_val = actual_filter(event)
+        assert not f_val  # Datanode is not from any of the provided configs, so it should not pass the filter
+        assert event.metadata.get("predefined_args") is None
+
+
+def test_on_datanode_deleted_with_args_and_matching_config(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_1, callback_args=["foo"], datanode_config="data_node")
+        # test the on_datanode_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.DELETION, entity_id=data_node.id)
+        filter_value = actual_filter(event)
+        assert filter_value is True # datanode is from config 'data_node', so the datanode passes the filter
+        assert event.metadata.get("predefined_args") == [data_node.id]
+
+
+def test_on_datanode_deleted_with_args_and_not_matching_config(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_deleted(callback=cb_1, callback_args=["foo"], datanode_config="WRONG_CFG")
+        # test the on_datanode_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE, operation=EventOperation.DELETION, entity_id=data_node.id)
+        filter_value = actual_filter(event)
+        assert filter_value is False  # The datanode is not from WRONG_CFG, so it should not pass the filter
+        assert event.metadata.get("predefined_args") is None # No need to cache the datanode in the metadata
+
+
+def test_on_datanode_deleted_with_broadcast(data_node):
+    consumer = GuiEventConsumer(Gui())
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.broadcast_on_datanode_deleted(callback=cb_for_state)
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=True)

+ 186 - 0
tests/event/test_consumer__on_datanode_written.py

@@ -0,0 +1,186 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from typing import Any
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import DataNode, Gui
+from taipy.core.config import DataNodeConfig
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import GuiEventConsumer
+
+
+def cb_0(event: Event, datanode: DataNode, data: Any):
+    ...
+
+
+def cb_1(event: Event, datanode: DataNode, data: Any, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, datanode: DataNode, data: Any):
+    ...
+
+
+def test_on_datanode_written(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_written(callback=cb_0)
+        # test the on_datanode_written method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE,
+                      operation=EventOperation.UPDATE,
+                      entity_id=data_node.id,
+                      attribute_name="last_edit_date")
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True  # No config provided, so the datanode passes the filter
+            assert event.metadata["predefined_args"] == [data_node, data_node.read()]
+
+
+def test_on_datanode_written_multiple_configs(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_written(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1", DataNodeConfig("dn2"), "data_node"])
+        # test the on_datanode_written method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE,
+                      operation=EventOperation.UPDATE,
+                      entity_id=data_node.id,
+                      attribute_name="last_edit_date")
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True  # The datanode is from config 'data_node', so the datanode passes the filter
+            assert event.metadata["predefined_args"] == [data_node, data_node.read()]
+
+
+def test_on_datanode_written_multiple_configs_no_matching(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_written(callback=cb_0,
+                                     datanode_config=[DataNodeConfig("dn0"), "dn1"])
+        # test the on_datanode_written method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.DATA_NODE,
+                      operation=EventOperation.UPDATE,
+                      entity_id=data_node.id,
+                      attribute_name="last_edit_date")
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            f_val = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert not f_val  # Datanode is not from any of the provided configs, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None
+
+
+def test_on_datanode_written_with_args_and_matching_config(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_written(callback=cb_1, callback_args=["foo"], datanode_config="data_node")
+        # test the on_datanode_written method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE,
+                      operation=EventOperation.UPDATE,
+                      entity_id=data_node.id,
+                      attribute_name="last_edit_date")
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is True # datanode is from config 'data_node', so the datanode passes the filter
+            assert event.metadata["predefined_args"] == [data_node, data_node.read()]
+
+
+def test_on_datanode_written_with_args_and_not_matching_config(data_node):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_datanode_written(callback=cb_1, callback_args=["foo"], datanode_config="WRONG_CFG")
+        # test the on_datanode_written method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.DATA_NODE,
+                      operation=EventOperation.UPDATE,
+                      entity_id=data_node.id,
+                      attribute_name="last_edit_date")
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = data_node
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(data_node.id)
+            assert filter_value is False  # datanode is not from WRONG_CFG, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None # No need to cache the datanode in the metadata
+
+
+def test_on_datanode_written_with_broadcast(data_node):
+    consumer = GuiEventConsumer(Gui())
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.broadcast_on_datanode_written(callback=cb_for_state)
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.DATA_NODE,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="last_edit_date",
+                                    filter=ANY,
+                                    broadcast=True)
+

+ 93 - 0
tests/event/test_consumer__on_event.py

@@ -0,0 +1,93 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+
+import pytest
+
+from taipy import Gui
+from taipy.core.notification import Event, EventEntityType, EventOperation, _Topic
+from taipy.event._event_callback import _Callback
+from taipy.event.event_consumer import GuiEventConsumer
+from taipy.exceptions import NoGuiDefinedInEventConsumer
+
+
+def cb_0(event: Event, extra:str):
+    ...
+
+
+def cb_1(event: Event):
+    ...
+
+
+def cb_2(event: Event):
+    ...
+
+
+def cb_for_state(state, event: Event):
+    ...
+
+
+def test_on_event():
+    consumer = GuiEventConsumer()
+    consumer.on_event(callback=cb_0, callback_args=["foo"])
+    consumer.on_event(callback=cb_1, entity_type=EventEntityType.SCENARIO)
+    consumer.on_event(callback=cb_2, entity_type=EventEntityType.SCENARIO, entity_id="bar")
+    consumer.on_event(callback=cb_0, callback_args=["baz"], operation=EventOperation.CREATION)
+    consumer.on_event(callback=cb_0, callback_args=["qux"], entity_type=EventEntityType.SEQUENCE,
+                      operation=EventOperation.SUBMISSION)
+    consumer.on_event(callback=cb_2, entity_type=EventEntityType.SCENARIO) # duplicate topic
+
+    assert consumer._registration is not None
+    registration = consumer._registration
+    assert registration.registration_id is not None
+    assert registration.queue is not None
+    assert len(registration.topics) == 5  # 5 unique topics
+    topic_1 = _Topic()
+    topic_2 = _Topic(entity_type=EventEntityType.SCENARIO)
+    topic_3 = _Topic(entity_type=EventEntityType.SCENARIO, entity_id="bar")
+    topic_4 = _Topic(operation=EventOperation.CREATION)
+    topic_5 = _Topic(entity_type=EventEntityType.SEQUENCE, operation=EventOperation.SUBMISSION)
+    assert topic_1 in registration.topics
+    assert topic_2 in registration.topics
+    assert topic_3 in registration.topics
+    assert topic_4 in registration.topics
+    assert topic_5 in registration.topics
+
+    assert consumer._gui is None
+
+    assert len(consumer._topic_callbacks_map) == 5  # 5 unique topics
+    assert topic_1 in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic_1] == [_Callback(cb_0, ["foo"])]
+    assert topic_2 in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic_2] == [_Callback(cb_1), _Callback(cb_2)]
+    assert topic_3 in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic_3] == [_Callback(cb_2)]
+    assert topic_4 in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic_4] == [_Callback(cb_0, ["baz"])]
+    assert topic_5 in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic_5] == [_Callback(cb_0, ["qux"])]
+
+
+def test_on_event_for_state():
+    consumer = GuiEventConsumer(gui=Gui())
+    consumer.broadcast_on_event(callback=cb_for_state)
+
+    assert consumer._gui is not None
+    assert len(consumer._topic_callbacks_map) == 1
+    topic = _Topic()
+    assert topic in consumer._topic_callbacks_map
+    assert consumer._topic_callbacks_map[topic] == [_Callback(cb_for_state, broadcast=True)]
+
+
+def test_on_event_missing_gui():
+    consumer = GuiEventConsumer()
+    with pytest.raises(NoGuiDefinedInEventConsumer):
+        consumer.broadcast_on_event(callback=cb_for_state)
+    assert len(consumer._topic_callbacks_map) == 0

+ 165 - 0
tests/event/test_consumer__on_scenario_created.py

@@ -0,0 +1,165 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import Gui, Scenario
+from taipy.core.config import ScenarioConfig
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import GuiEventConsumer
+
+
+def cb_0(event: Event, scenario: Scenario):
+    ...
+
+
+def cb_1(event: Event, scenario: Scenario, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, scenario: Scenario):
+    ...
+
+
+def test_on_scenario_created(scenario):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_scenario_created(callback=cb_0)
+        # test the on_scenario_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION, entity_id=scenario.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = scenario
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(scenario.id)
+            assert filter_value is True  # No config provided, so the scenario passes the filter
+            assert event.metadata["predefined_args"] == [scenario]
+
+
+def test_on_scenario_created_multiple_configs(scenario):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_scenario_created(callback=cb_0,
+                                     scenario_config=[ScenarioConfig("sc_0"), "sc_1", ScenarioConfig("sc_2"), "sc"])
+        # test the on_scenario_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION, entity_id=scenario.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = scenario
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(scenario.id)
+            assert filter_value is True  # The scenario is from config 'sc', so the scenario passes the filter
+            assert event.metadata["predefined_args"] == [scenario]
+
+
+def test_on_scenario_created_multiple_configs_no_matching(scenario):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_scenario_created(callback=cb_0,
+                                     scenario_config=[ScenarioConfig("sc_0"), "sc_1"])
+        # test the on_scenario_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION,
+                      entity_id=scenario.id)
+        assert actual_filter is not None
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = scenario
+            f_val = actual_filter(event)
+            mck_get.assert_called_once_with(scenario.id)
+            assert not f_val  # Scenario is not from any of the provided configs, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None
+
+
+def test_on_scenario_created_with_args_and_matching_config(scenario):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_scenario_created(callback=cb_1, callback_args=["foo"], scenario_config="sc")
+        # test the on_scenario_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION, entity_id=scenario.id)
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = scenario
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(scenario.id)
+            assert filter_value is True # scenario is from config 'sc', so the scenario passes the filter
+            assert event.metadata.get("predefined_args") == [scenario]
+
+
+def test_on_scenario_created_with_args_and_not_matching_config(scenario):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_scenario_created(callback=cb_1, callback_args=["foo"], scenario_config="WRONG_CFG")
+        # test the on_scenario_created method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION, entity_id=scenario.id)
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.return_value = scenario
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(scenario.id)
+            assert filter_value is False  # scenario is not from WRONG_CFG, so it should not pass the filter
+            assert event.metadata.get("predefined_args") is None # No need to cache the scenario in the metadata
+
+
+def test_on_scenario_created_with_broadcast():
+    consumer = GuiEventConsumer(Gui())
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.broadcast_on_scenario_created(callback=cb_for_state)
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.CREATION,
+                                    filter=ANY,
+                                    broadcast=True)
+

+ 150 - 0
tests/event/test_consumer__on_scenario_deleted.py

@@ -0,0 +1,150 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import Gui
+from taipy.core.config import ScenarioConfig
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import GuiEventConsumer
+
+
+def cb_0(event: Event, scenario: str):
+    ...
+
+
+def cb_1(event: Event, scenario: str, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, scenario: str):
+    ...
+
+
+def test_on_scenario_deleted(scenario):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_0)
+        # test the on_scenario_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.DELETION, entity_id=scenario.id)
+        assert actual_filter is not None
+        filter_value = actual_filter(event)
+        assert filter_value is True  # No config provided, so the scenario passes the filter
+        assert event.metadata["predefined_args"] == [scenario.id]
+
+
+def test_on_scenario_deleted_multiple_configs(scenario):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_0,
+                                     scenario_config=[ScenarioConfig("sc_0"), "sc_1", ScenarioConfig("sc_2"), "sc"])
+        # test the on_scenario_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.DELETION, entity_id=scenario.id)
+        assert actual_filter is not None
+        filter_value = actual_filter(event)
+        assert filter_value is True  # The scenario is from config 'sc', so the scenario passes the filter
+        assert event.metadata["predefined_args"] == [scenario.id]
+
+
+def test_on_scenario_deleted_multiple_configs_no_matching(scenario):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_0,
+                                     scenario_config=[ScenarioConfig("sc_0"), "sc_1"])
+        # test the on_scenario_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.DELETION,
+                      entity_id=scenario.id)
+        assert actual_filter is not None
+        f_val = actual_filter(event)
+        assert not f_val  # Scenario is not from any of the provided configs, so it should not pass the filter
+        assert event.metadata.get("predefined_args") is None
+
+
+def test_on_scenario_deleted_with_args_and_matching_config(scenario):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_1, callback_args=["foo"], scenario_config="sc")
+        # test the on_scenario_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.DELETION, entity_id=scenario.id)
+        filter_value = actual_filter(event)
+        assert filter_value is True # scenario is from config 'sc', so the scenario passes the filter
+        assert event.metadata.get("predefined_args") == [scenario.id]
+
+
+def test_on_scenario_deleted_with_args_and_not_matching_config(scenario):
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_scenario_deleted(callback=cb_1, callback_args=["foo"], scenario_config="WRONG_CFG")
+        # test the on_scenario_deleted method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["foo"],
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.SCENARIO, operation=EventOperation.DELETION, entity_id=scenario.id)
+        filter_value = actual_filter(event)
+        assert filter_value is False  # scenario is not from WRONG_CFG, so it should not pass the filter
+        assert event.metadata.get("predefined_args") is None # No need to cache the scenario in the metadata
+
+
+def test_on_scenario_deleted_with_broadcast(scenario):
+    consumer = GuiEventConsumer(Gui())
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.broadcast_on_scenario_deleted(callback=cb_for_state)
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SCENARIO,
+                                    operation=EventOperation.DELETION,
+                                    filter=ANY,
+                                    broadcast=True)
+

+ 284 - 0
tests/event/test_consumer__on_submission_finished.py

@@ -0,0 +1,284 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from unittest import mock
+from unittest.mock import ANY
+
+from taipy import Gui, Scenario, Submission, SubmissionStatus
+from taipy.core.notification import Event, EventEntityType, EventOperation
+from taipy.event.event_consumer import GuiEventConsumer
+
+
+def cb_0(event: Event, submittable: Scenario, submission: Submission):
+    ...
+
+
+def cb_1(event: Event, submittable: Scenario, submission: Submission, extra:str):
+    ...
+
+
+def cb_for_state(state, event: Event, submittable: Scenario, submission: Submission):
+    ...
+
+
+def test_on_scenario_submission_finished(scenario, submission):
+    submission._entity_id = scenario.id
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0)
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                        attribute_value=SubmissionStatus.COMPLETED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_has_calls(calls=[mock.call(submission.id), mock.call(scenario.id)], any_order=False)
+            assert filter_value is True  # No config provided, so the event passes the filter
+            assert event.metadata["predefined_args"] == [scenario, submission]
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                        attribute_value=SubmissionStatus.FAILED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_has_calls(calls=[mock.call(submission.id), mock.call(scenario.id)], any_order=False)
+            assert filter_value is True  # No config provided, so the event passes the filter
+            assert event.metadata["predefined_args"] == [scenario, submission]
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                        attribute_value=SubmissionStatus.CANCELED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_has_calls(calls=[mock.call(submission.id), mock.call(scenario.id)], any_order=False)
+            assert filter_value is True  # No config provided, so the event passes the filter
+            assert event.metadata["predefined_args"] == [scenario, submission]
+
+
+def test_filter_false__wrong_status(scenario, submission):
+    submission._entity_id = scenario.id
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0)
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+
+        # No status value
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      )
+        filter_value = actual_filter(event)
+        assert filter_value is False  # no status value
+        assert event.metadata.get("predefined_args") is None
+
+        # wrong status
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      attribute_value=SubmissionStatus.BLOCKED,
+                      )
+        filter_value = actual_filter(event)
+        assert filter_value is False  # status is not finished
+        assert event.metadata.get("predefined_args") is None
+
+
+def test_filter_false__config_ids_and_sequence(scenario, sequence, submission):
+    submission._entity_id = sequence.id
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0, config_ids=scenario.config_id)
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      attribute_value=SubmissionStatus.COMPLETED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(submission.id)
+            assert filter_value is False  # Sequence submission do not have config so the event does not pass the filter
+            assert event.metadata.get("predefined_args") is None
+
+
+def test_filter_false__not_matching_config_ids(scenario, submission):
+    submission._entity_id = scenario.id
+    submission._entity_config_id = scenario.config_id
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0, config_ids=["NOT_MATCHING_CONFIG_ID"])
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      attribute_value=SubmissionStatus.COMPLETED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_called_once_with(submission.id)
+            # Submission config id is not in the provided list so the event does not pass the filter
+            assert filter_value is False
+            assert event.metadata.get("predefined_args") is None
+
+
+def test_filter_true__with_config(scenario, submission):
+    submission._entity_id = scenario.id
+    submission._entity_config_id = scenario.config_id
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0, config_ids=["scenario_cfg", scenario.config_id])
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      attribute_value=SubmissionStatus.COMPLETED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_has_calls([mock.call(submission.id), mock.call(scenario.id)])
+            assert filter_value is True
+            assert event.metadata.get("predefined_args") == [scenario, submission]
+
+
+def test_filter_true__without_config(scenario, submission):
+    submission._entity_id = scenario.id
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_submission_finished(callback=cb_0)
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_0,
+                                    callback_args=None,
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+        # check the filter method is correct
+        actual_filter = mck.call_args.kwargs["filter"]
+        assert actual_filter is not None
+
+        event = Event(entity_type=EventEntityType.SUBMISSION,
+                      operation=EventOperation.UPDATE,
+                      entity_id=submission.id,
+                      attribute_name="submission_status",
+                      attribute_value=SubmissionStatus.COMPLETED,
+                      )
+        with (mock.patch("taipy.get") as mck_get):
+            mck_get.side_effect = [submission, scenario]
+            filter_value = actual_filter(event)
+            mck_get.assert_has_calls([mock.call(submission.id), mock.call(scenario.id)])
+            assert filter_value is True
+            assert event.metadata.get("predefined_args") == [scenario, submission]
+
+
+def test_on_scenario_submission_finished_with_args():
+    consumer = GuiEventConsumer()
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.on_submission_finished(callback=cb_1, callback_args=["extra"])
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_1,
+                                    callback_args=["extra"],
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=False)
+
+
+def test_on_scenario_submission_finished_with_args_and_state():
+    consumer = GuiEventConsumer(gui=Gui())
+    with mock.patch("taipy.event.event_consumer.GuiEventConsumer._GuiEventConsumer__on_event") as mck:
+        consumer.broadcast_on_submission_finished(callback=cb_for_state, callback_args=["extra"])
+        # test the on_submission_finished method delegates to on_event with the correct parameters
+        mck.assert_called_once_with(callback=cb_for_state,
+                                    callback_args=["extra"],
+                                    entity_type=EventEntityType.SUBMISSION,
+                                    operation=EventOperation.UPDATE,
+                                    attribute_name="submission_status",
+                                    filter=ANY,
+                                    broadcast=True)

+ 223 - 0
tests/event/test_consumer__process_event.py

@@ -0,0 +1,223 @@
+# Copyright 2021-2025 Avaiga Private Limited
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations under the License.
+from typing import Any, Dict, List, Optional
+from unittest import mock
+
+from taipy import Gui, Scenario
+from taipy.core.notification import Event, EventEntityType, EventOperation, _Topic
+from taipy.event._event_callback import _Callback
+from taipy.event.event_consumer import GuiEventConsumer
+
+collector: Dict[str, Any] = {"cb_0": 0, "cb_1": 0, "cb_2": 0, "cb_3": 0, "cb_for_state": 0,
+            "cb_scenario_creation": 0, "cb_scenario_creation_with_state": 0}
+args_collector: Dict[str, List] = {}
+
+
+def init_collector():
+    return {"cb_0": 0, "cb_1": 0, "cb_2": 0, "cb_3": 0, "cb_for_state": 0,
+            "cb_scenario_creation": 0, "cb_scenario_creation_with_state": 0}, {}
+
+
+def cb_0(event: Event, gui: Optional[Gui], extra:str):
+    collector["cb_0"]+=1
+    if not args_collector.get("cb_0"):
+        args_collector["cb_0"] = [extra]
+    else:
+        args_collector["cb_0"].append(extra)
+    print(f"event created at {event.creation_date} triggered callback cb_0.")  # noqa: T201
+
+
+def cb_1(event: Event, gui: Optional[Gui]):
+    collector["cb_1"]+=1
+    print(f"event created at {event.creation_date} triggered callback cb_1.")  # noqa: T201
+
+
+def cb_2(event: Event, gui: Gui,):
+    collector["cb_2"]+=1
+    print(f"event created at {event.creation_date} triggered callback cb_2.")  # noqa: T201
+
+
+def cb_3(event: Event, gui: Gui, ):
+    collector["cb_3"]+=1
+    print(f"event created at {event.creation_date} triggered callback cb_3.")  # noqa: T201
+
+
+def cb_for_state(state, event: Event):
+    collector["cb_for_state"]+=1
+    print(f"event created at {event.creation_date} triggered callback cb_for_state.")  # noqa: T201
+
+
+def cb_scenario_creation(event: Event, scenario: Scenario, gui: Gui, extra_arg: str):
+    collector["cb_scenario_creation"]+=1
+    print(f"scenario {scenario.id} created at {event.creation_date} with {extra_arg}.")  # noqa: T201
+
+
+def cb_scenario_creation_with_state(state, event: Event, scenario: Scenario, extra_arg: str):
+    collector["cb_scenario_creation_with_state"]+=1
+    print(f"scenario {scenario.id} created at {event.creation_date} with {extra_arg}.")  # noqa: T201
+
+
+
+def test_process_event(scenario):
+    global collector
+    global args_collector
+    consumer = GuiEventConsumer()
+    consumer.on_event(callback=cb_0, callback_args=["foo"])
+    consumer.on_event(callback=cb_1, entity_type=EventEntityType.SCENARIO)
+    consumer.on_event(callback=cb_2, entity_type=EventEntityType.SCENARIO, entity_id="bar")
+    consumer.on_event(callback=cb_3, operation=EventOperation.CREATION)
+    consumer.on_event(callback=cb_0, callback_args=["baz"], operation=EventOperation.CREATION)
+    consumer.on_event(callback=cb_1, entity_type=EventEntityType.SEQUENCE, operation=EventOperation.SUBMISSION)
+    consumer.on_event(callback=cb_1, entity_type=EventEntityType.JOB,
+                      operation=EventOperation.UPDATE, attribute_name="status")
+
+    collector, args_collector = init_collector()
+    event_1 = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        entity_id="bar",
+        attribute_name=None,
+        attribute_value=None,
+        metadata={},
+    )
+    consumer.process_event(event_1)
+
+    assert collector["cb_0"] == 2
+    assert collector["cb_1"] == 1
+    assert collector["cb_2"] == 1
+    assert collector["cb_3"] == 1
+
+    collector, args_collector = init_collector()
+    event_2 = Event(
+        entity_type=EventEntityType.SEQUENCE,
+        operation=EventOperation.SUBMISSION,
+        entity_id="quux",
+        attribute_name=None,
+        attribute_value=None,
+        metadata={},
+    )
+    consumer.process_event(event_2)
+
+    assert collector["cb_0"] == 1
+    assert collector["cb_1"] == 1
+    assert collector["cb_2"] == 0
+    assert collector["cb_3"] == 0
+    collector, args_collector = init_collector()
+
+    collector, args_collector = init_collector()
+    event_3 = Event(
+        entity_type=EventEntityType.JOB,
+        operation=EventOperation.UPDATE,
+        entity_id="corge",
+        attribute_name="status",
+        attribute_value="COMPLETED",
+        metadata={},
+    )
+    consumer.process_event(event_3)
+
+    assert collector["cb_0"] == 1
+    assert collector["cb_1"] == 1
+    assert collector["cb_2"] == 0
+    assert collector["cb_3"] == 0
+    collector, args_collector = init_collector()
+
+
+def test_process_event_with_state():
+    consumer = GuiEventConsumer(gui=Gui())
+    consumer.broadcast_on_event(callback=cb_for_state)
+
+    event_1 = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        entity_id="foo",
+        attribute_name=None,
+        attribute_value=None,
+        metadata={},
+    )
+    with mock.patch("taipy.Gui.broadcast_callback") as mock_broadcast:
+        consumer.process_event(event_1)
+        mock_broadcast.assert_called_once_with(cb_for_state, [event_1])
+
+
+def test_process_event_with_filter():
+    global collector
+    global args_collector
+    def filt(event: Event) -> bool:
+        return event.metadata.get("foo") == "bar"
+
+    consumer = GuiEventConsumer()
+    consumer.on_event(callback=cb_0,
+                      callback_args=["foo"],
+                      entity_type=EventEntityType.SCENARIO,
+                      operation=EventOperation.CREATION,
+                      filter=filt)
+
+    topic = _Topic(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION)
+    assert len(consumer._topic_callbacks_map) == 1
+    assert consumer._topic_callbacks_map[topic] == [_Callback(cb_0, ["foo"], False, filt)]
+
+    collector, args_collector = init_collector()
+    event_matching_filter = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        metadata={"foo": "bar"},
+    )
+    consumer.process_event(event_matching_filter)
+
+    assert collector["cb_0"] == 1
+
+    collector, args_collector = init_collector()
+    event_not_matching_filter = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        metadata={"baz": "qux"},
+    )
+    consumer.process_event(event_not_matching_filter)
+
+    assert collector["cb_0"] == 0
+    collector, args_collector = init_collector()
+
+
+def test_process_event_with_predefined_args(scenario):
+    global collector
+    global args_collector
+    consumer = GuiEventConsumer()
+    consumer.on_event(callback=cb_scenario_creation, callback_args=["foo"])
+    collector, args_collector = init_collector()
+    event = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        entity_id="foo",
+        attribute_name=None,
+        attribute_value=None,
+        metadata={"predefined_args": [scenario]},
+    )
+    consumer.process_event(event)
+
+    assert collector["cb_scenario_creation"] == 1
+    collector, args_collector = init_collector()
+
+
+def test_process_event_with_predefined_args_and_state(scenario):
+    consumer = GuiEventConsumer(Gui())
+    consumer.broadcast_on_event(callback=cb_scenario_creation_with_state, callback_args=["foo"])
+    event = Event(
+        entity_type=EventEntityType.SCENARIO,
+        operation=EventOperation.CREATION,
+        entity_id="foo",
+        attribute_name=None,
+        attribute_value=None,
+        metadata={"predefined_args": [scenario]},
+    )
+
+    with mock.patch("taipy.Gui.broadcast_callback") as mock_broadcast:
+        consumer.process_event(event)
+        mock_broadcast.assert_called_once_with(cb_scenario_creation_with_state, [event, scenario, "foo"])

+ 0 - 0
tests/tools/__init__.py


+ 1 - 0
tools/gui/generate_pyi.py

@@ -81,6 +81,7 @@ with open(gui_pyi_file, "w", encoding="utf-8") as write_file:
 # Generate Page Builder pyi file (gui/builder/__init__.pyi)
 # ##################################################################################################
 # Types that appear in viselements.json
+
 from datetime import datetime  # noqa: E402, F401
 
 from taipy.core import Cycle, DataNode, Job, Scenario  # noqa: E402, F401