123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- from queue import SimpleQueue
- from typing import Any, Dict, List
- import pytest
- from taipy import Orchestrator
- from taipy.common.config import Config
- from taipy.core import taipy as tp
- from taipy.core.common.frequency import Frequency
- from taipy.core.job.status import Status
- from taipy.core.notification._core_event_consumer import _CoreEventConsumerBase
- from taipy.core.notification.event import Event, EventEntityType, EventOperation
- from taipy.core.notification.notifier import Notifier
- from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
- class Snapshot:
- """
- A captured snapshot of the recording core events consumer.
- """
- def __init__(self) -> None:
- self.collected_events: List[Event] = []
- self.entity_type_collected: Dict[EventEntityType, int] = {}
- self.operation_collected: Dict[EventEntityType, int] = {}
- self.attr_name_collected: Dict[EventEntityType, int] = {}
- self.attr_value_collected: Dict[EventEntityType, List[Any]] = {}
- def capture_event(self, event):
- self.collected_events.append(event)
- self.entity_type_collected[event.entity_type] = self.entity_type_collected.get(event.entity_type, 0) + 1
- self.operation_collected[event.operation] = self.operation_collected.get(event.operation, 0) + 1
- if event.attribute_name:
- self.attr_name_collected[event.attribute_name] = self.attr_name_collected.get(event.attribute_name, 0) + 1
- if self.attr_value_collected.get(event.attribute_name, None):
- self.attr_value_collected[event.attribute_name].append(event.attribute_value)
- else:
- self.attr_value_collected[event.attribute_name] = [event.attribute_value]
- class RecordingConsumer(_CoreEventConsumerBase):
- """
- A straightforward and no-thread core events consumer that allows to
- capture snapshots of received events.
- """
- def __init__(self, registration_id: str, queue: SimpleQueue):
- super().__init__(registration_id, queue)
- def capture(self) -> Snapshot:
- """
- Capture a snapshot of events received between the previous snapshot
- (or from the start of this consumer).
- """
- snapshot = Snapshot()
- while not self.queue.empty():
- event = self.queue.get()
- snapshot.capture_event(event)
- return snapshot
- def process_event(self, event: Event):
- # Nothing to do
- pass
- def start(self):
- # Nothing to do here
- pass
- def stop(self):
- # Nothing to do here either
- pass
- def identity(x):
- return x
- def test_events_published_for_scenario_creation():
- input_config = Config.configure_data_node("the_input")
- output_config = Config.configure_data_node("the_output")
- task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
- sc_config = Config.configure_scenario(
- "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
- )
- register_id_0, register_queue_0 = Notifier.register()
- all_evts = RecordingConsumer(register_id_0, register_queue_0)
- all_evts.start()
- # Create a scenario via the manager
- # should only trigger 6 creation events (for cycle, data node(x2), task, sequence and scenario)
- _ScenarioManagerFactory._build_manager()._create(sc_config)
- snapshot = all_evts.capture()
- assert len(snapshot.collected_events) == 6
- assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 1
- assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 2
- assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 1
- assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 1
- assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
- assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 6
- all_evts.stop()
- def test_no_event_published_for_getting_scenario():
- input_config = Config.configure_data_node("the_input")
- output_config = Config.configure_data_node("the_output")
- task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
- sc_config = Config.configure_scenario(
- "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
- )
- scenario = tp.create_scenario(sc_config)
- register_id_0, register_queue_0 = Notifier.register()
- all_evts = RecordingConsumer(register_id_0, register_queue_0)
- all_evts.start()
- # Get all scenarios does not trigger any event
- tp.get_scenarios()
- snapshot = all_evts.capture()
- assert len(snapshot.collected_events) == 0
- # Get one scenario does not trigger any event
- tp.get(scenario.id)
- snapshot = all_evts.capture()
- assert len(snapshot.collected_events) == 0
- all_evts.stop()
- def test_events_published_for_writing_dn():
- input_config = Config.configure_data_node("the_input")
- output_config = Config.configure_data_node("the_output")
- task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
- sc_config = Config.configure_scenario(
- "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
- )
- scenario = tp.create_scenario(sc_config)
- register_id_0, register_queue_0 = Notifier.register()
- all_evts = RecordingConsumer(register_id_0, register_queue_0)
- all_evts.start()
- # Write input manually trigger 5 data node update events
- # for last_edit_date, editor_id, editor_expiration_date, edit_in_progress and edits
- scenario.the_input.write("test")
- snapshot = all_evts.capture()
- assert len(snapshot.collected_events) == 5
- assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 5
- assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 5
- all_evts.stop()
- @pytest.mark.parametrize("standalone", [False, True])
- def test_events_published_for_scenario_submission(standalone):
- if standalone:
- Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)
- input_config = Config.configure_data_node("the_input")
- output_config = Config.configure_data_node("the_output")
- task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
- sc_config = Config.configure_scenario(
- "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
- )
- scenario = tp.create_scenario(sc_config)
- scenario.the_input.write("test")
- register_id_0, register_queue_0 = Notifier.register()
- all_evts = RecordingConsumer(register_id_0, register_queue_0)
- all_evts.start()
- # Submit a scenario triggers:
- # 1 scenario submission event
- # 7 dn update events (for last_edit_date, editor_id(x2), editor_expiration_date(x2) and edit_in_progress(x2))
- # 1 job creation event
- # 3 job update events (for status: PENDING, RUNNING and COMPLETED)
- # 1 submission creation event
- # 1 submission update event for jobs
- # 3 submission update events (for status: PENDING, RUNNING and COMPLETED)
- # 1 submission update event for is_completed
- if standalone:
- Orchestrator().run()
- scenario.submit(wait=True)
- else:
- scenario.submit()
- snapshot = all_evts.capture()
- assert len(snapshot.collected_events) == 18
- assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 0
- assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 8
- assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 0
- assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 0
- assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
- assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 4
- assert snapshot.entity_type_collected.get(EventEntityType.SUBMISSION, 0) == 5
- assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 2
- assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 15
- assert snapshot.operation_collected.get(EventOperation.SUBMISSION, 0) == 1
- assert snapshot.attr_name_collected["last_edit_date"] == 1
- assert snapshot.attr_name_collected["editor_id"] == 2
- assert snapshot.attr_name_collected["editor_expiration_date"] == 2
- assert snapshot.attr_name_collected["edit_in_progress"] == 2
- assert snapshot.attr_name_collected["edits"] == 1
- assert snapshot.attr_name_collected["status"] == 3
- assert snapshot.attr_name_collected["jobs"] == 1
- assert snapshot.attr_name_collected["submission_status"] == 3
- all_evts.stop()
- def test_events_published_for_scenario_deletion():
- input_config = Config.configure_data_node("the_input")
- output_config = Config.configure_data_node("the_output")
- task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
- sc_config = Config.configure_scenario(
- "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
- )
- scenario = tp.create_scenario(sc_config)
- scenario.the_input.write("test")
- scenario.submit()
- register_id_0, register_queue_0 = Notifier.register()
- all_evts = RecordingConsumer(register_id_0, register_queue_0)
- all_evts.start()
- # Delete a scenario trigger 8 deletion events
- # 1 scenario deletion event
- # 1 cycle deletion event
- # 2 dn deletion events (for input and output)
- # 1 task deletion event
- # 1 sequence deletion event
- # 1 job deletion event
- # 1 submission deletion event
- tp.delete(scenario.id)
- snapshot = all_evts.capture()
- assert len(snapshot.collected_events) == 8
- assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 1
- assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 2
- assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 1
- assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 1
- assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
- assert snapshot.entity_type_collected.get(EventEntityType.SUBMISSION, 0) == 1
- assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 1
- assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 0
- assert snapshot.operation_collected.get(EventOperation.SUBMISSION, 0) == 0
- assert snapshot.operation_collected.get(EventOperation.DELETION, 0) == 8
- all_evts.stop()
- def test_job_events():
- input_config = Config.configure_data_node("the_input")
- output_config = Config.configure_data_node("the_output")
- task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
- sc_config = Config.configure_scenario(
- "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
- )
- register_id, register_queue = Notifier.register(entity_type=EventEntityType.JOB)
- consumer = RecordingConsumer(register_id, register_queue)
- consumer.start()
- # Create scenario
- scenario = _ScenarioManagerFactory._build_manager()._create(sc_config)
- snapshot = consumer.capture()
- assert len(snapshot.collected_events) == 0
- # Submit scenario
- scenario.submit()
- snapshot = consumer.capture()
- # 2 events expected: one for creation, another for status update
- assert len(snapshot.collected_events) == 2
- assert snapshot.collected_events[0].operation == EventOperation.CREATION
- assert snapshot.collected_events[0].entity_type == EventEntityType.JOB
- assert snapshot.collected_events[0].metadata.get("task_config_id") == task_config.id
- assert snapshot.collected_events[1].operation == EventOperation.UPDATE
- assert snapshot.collected_events[1].entity_type == EventEntityType.JOB
- assert snapshot.collected_events[1].metadata.get("task_config_id") == task_config.id
- assert snapshot.collected_events[1].attribute_name == "status"
- assert snapshot.collected_events[1].attribute_value == Status.BLOCKED
- job = tp.get_jobs()[0]
- tp.cancel_job(job)
- snapshot = consumer.capture()
- assert len(snapshot.collected_events) == 1
- event = snapshot.collected_events[0]
- assert event.metadata.get("task_config_id") == task_config.id
- assert event.attribute_name == "status"
- assert event.attribute_value == Status.CANCELED
- consumer.stop()
- def test_scenario_events():
- input_config = Config.configure_data_node("the_input")
- output_config = Config.configure_data_node("the_output")
- task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
- sc_config = Config.configure_scenario(
- "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
- )
- register_id, register_queue = Notifier.register(entity_type=EventEntityType.SCENARIO)
- consumer = RecordingConsumer(register_id, register_queue)
- consumer.start()
- scenario = tp.create_scenario(sc_config)
- snapshot = consumer.capture()
- assert len(snapshot.collected_events) == 1
- assert snapshot.collected_events[0].operation == EventOperation.CREATION
- assert snapshot.collected_events[0].entity_type == EventEntityType.SCENARIO
- assert snapshot.collected_events[0].metadata.get("config_id") == scenario.config_id
- scenario.submit()
- snapshot = consumer.capture()
- assert len(snapshot.collected_events) == 1
- assert snapshot.collected_events[0].operation == EventOperation.SUBMISSION
- assert snapshot.collected_events[0].entity_type == EventEntityType.SCENARIO
- assert snapshot.collected_events[0].metadata.get("config_id") == scenario.config_id
- # Delete scenario
- tp.delete(scenario.id)
- snapshot = consumer.capture()
- assert len(snapshot.collected_events) == 1
- assert snapshot.collected_events[0].operation == EventOperation.DELETION
- assert snapshot.collected_events[0].entity_type == EventEntityType.SCENARIO
- consumer.stop()
- def test_data_node_events():
- input_config = Config.configure_data_node("the_input")
- output_config = Config.configure_data_node("the_output")
- task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
- sc_config = Config.configure_scenario(
- "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
- )
- register_id, register_queue = Notifier.register(entity_type=EventEntityType.DATA_NODE)
- consumer = RecordingConsumer(register_id, register_queue)
- consumer.start()
- scenario = _ScenarioManagerFactory._build_manager()._create(sc_config)
- snapshot = consumer.capture()
- # We expect two creation events since we have two data nodes:
- assert len(snapshot.collected_events) == 2
- assert snapshot.collected_events[0].operation == EventOperation.CREATION
- assert snapshot.collected_events[0].entity_type == EventEntityType.DATA_NODE
- assert snapshot.collected_events[0].metadata.get("config_id") in [output_config.id, input_config.id]
- assert snapshot.collected_events[1].operation == EventOperation.CREATION
- assert snapshot.collected_events[1].entity_type == EventEntityType.DATA_NODE
- assert snapshot.collected_events[1].metadata.get("config_id") in [output_config.id, input_config.id]
- # Delete scenario
- tp.delete(scenario.id)
- snapshot = consumer.capture()
- assert len(snapshot.collected_events) == 2
- assert snapshot.collected_events[0].operation == EventOperation.DELETION
- assert snapshot.collected_events[0].entity_type == EventEntityType.DATA_NODE
- assert snapshot.collected_events[1].operation == EventOperation.DELETION
- assert snapshot.collected_events[1].entity_type == EventEntityType.DATA_NODE
- consumer.stop()
|