123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948 |
- # Copyright 2021-2025 Avaiga Private Limited
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations under the License.
- from queue import SimpleQueue
- from taipy import Frequency
- from taipy.common.config import Config
- from taipy.core import taipy as tp
- from taipy.core._version._version_manager_factory import _VersionManagerFactory
- from taipy.core.notification import EventEntityType, EventOperation, _Registration
- from taipy.core.notification._topic import _Topic
- from taipy.core.notification.event import Event
- from taipy.core.notification.notifier import Notifier
- from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
- from taipy.core.submission.submission_status import SubmissionStatus
- def test_register_unregister():
- def find_registration_and_topic(registration_id):
- for topic, registrations in Notifier._topics_registrations_list.items():
- for registration in registrations:
- if registration.registration_id == registration_id:
- return topic, registration
- assert len(Notifier._topics_registrations_list) == 0
- registration_id_0, register_queue_0 = Notifier.register()
- topic_0, registration_0 = find_registration_and_topic(registration_id_0)
- assert isinstance(registration_id_0, str) and registration_id_0 == registration_0.registration_id
- assert isinstance(register_queue_0, SimpleQueue)
- assert len(Notifier._topics_registrations_list.keys()) == 1
- assert len(Notifier._topics_registrations_list[topic_0]) == 1
- assert registration_0.queue == register_queue_0
- assert register_queue_0 in [registration.queue for registration in Notifier._topics_registrations_list[topic_0]]
- registration_id_1, register_queue_1 = Notifier.register()
- topic_1, registration_1 = find_registration_and_topic(registration_id_1)
- assert isinstance(registration_id_1, str) and registration_id_1 == registration_1.registration_id
- assert isinstance(register_queue_1, SimpleQueue)
- assert len(Notifier._topics_registrations_list.keys()) == 1
- assert len(Notifier._topics_registrations_list[topic_1]) == 2
- assert registration_1.queue == register_queue_1
- assert register_queue_1 in [registration.queue for registration in Notifier._topics_registrations_list[topic_1]]
- registration_id_2, register_queue_2 = Notifier.register(EventEntityType.SCENARIO)
- topic_2, registration_2 = find_registration_and_topic(registration_id_2)
- assert isinstance(registration_id_2, str) and registration_id_2 == registration_2.registration_id
- assert isinstance(register_queue_2, SimpleQueue)
- assert len(Notifier._topics_registrations_list.keys()) == 2
- assert len(Notifier._topics_registrations_list[topic_2]) == 1
- assert registration_2.queue == register_queue_2
- assert register_queue_2 in [registration.queue for registration in Notifier._topics_registrations_list[topic_2]]
- registration_id_3, register_queue_3 = Notifier.register(EventEntityType.SCENARIO, "scenario_id")
- topic_3, registration_3 = find_registration_and_topic(registration_id_3)
- assert isinstance(registration_id_3, str) and registration_id_3 == registration_3.registration_id
- assert isinstance(register_queue_3, SimpleQueue)
- assert len(Notifier._topics_registrations_list.keys()) == 3
- assert len(Notifier._topics_registrations_list[topic_3]) == 1
- assert registration_3.queue == register_queue_3
- assert register_queue_3 in [registration.queue for registration in Notifier._topics_registrations_list[topic_3]]
- registration_id_4, register_queue_4 = Notifier.register(
- EventEntityType.SEQUENCE, "sequence_id", EventOperation.UPDATE, "tasks"
- )
- topic_4, registration_4 = find_registration_and_topic(registration_id_4)
- assert isinstance(registration_id_4, str) and registration_id_4 == registration_4.registration_id
- assert isinstance(register_queue_4, SimpleQueue)
- assert len(Notifier._topics_registrations_list.keys()) == 4
- assert len(Notifier._topics_registrations_list[topic_4]) == 1
- assert registration_4.queue == register_queue_4
- assert register_queue_4 in [registration.queue for registration in Notifier._topics_registrations_list[topic_4]]
- registration_id_5, register_queue_5 = Notifier.register(EventEntityType.SCENARIO)
- topic_5, registration_5 = find_registration_and_topic(registration_id_5)
- assert topic_5 == topic_2
- assert registration_5 != registration_2
- assert isinstance(registration_id_5, str) and registration_id_5 == registration_5.registration_id
- assert isinstance(register_queue_5, SimpleQueue)
- assert len(Notifier._topics_registrations_list.keys()) == 4
- assert len(Notifier._topics_registrations_list[topic_5]) == 2
- assert registration_5.queue == register_queue_5
- assert register_queue_5 in [registration.queue for registration in Notifier._topics_registrations_list[topic_5]]
- registration_id_6, register_queue_6 = Notifier.register()
- topic_6, registration_6 = find_registration_and_topic(registration_id_6)
- assert topic_6 == topic_0
- assert registration_6 != registration_0
- assert len(Notifier._topics_registrations_list.keys()) == 4
- assert len(Notifier._topics_registrations_list[topic_6]) == 3
- Notifier.unregister(registration_id_6)
- assert len(Notifier._topics_registrations_list.keys()) == 4
- assert len(Notifier._topics_registrations_list[topic_6]) == 2
- Notifier.unregister(registration_id_4)
- assert len(Notifier._topics_registrations_list.keys()) == 3
- assert topic_4 not in Notifier._topics_registrations_list.keys()
- Notifier.unregister(registration_id_0)
- assert len(Notifier._topics_registrations_list.keys()) == 3
- assert len(Notifier._topics_registrations_list[topic_0]) == 1
- Notifier.unregister(registration_id_1)
- assert len(Notifier._topics_registrations_list.keys()) == 2
- assert all(topic not in Notifier._topics_registrations_list.keys() for topic in [topic_0, topic_1])
- Notifier.unregister(registration_id_2)
- Notifier.unregister(registration_id_3)
- Notifier.unregister(registration_id_5)
- assert len(Notifier._topics_registrations_list.keys()) == 0
- def test_matching():
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION), _Topic()
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
- _Topic(EventEntityType.CYCLE),
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
- _Topic(EventEntityType.CYCLE, "cycle_id"),
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
- _Topic(operation=EventOperation.CREATION),
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
- _Topic(EventEntityType.CYCLE, "cycle_id", EventOperation.CREATION),
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
- _Topic(),
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
- _Topic(EventEntityType.SCENARIO),
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
- _Topic(
- EventEntityType.SCENARIO,
- "scenario_id",
- ),
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
- _Topic(operation=EventOperation.SUBMISSION),
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
- _Topic(EventEntityType.SCENARIO, "scenario_id", EventOperation.SUBMISSION),
- )
- assert Notifier._is_matching(
- Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.UPDATE,
- attribute_name=r"tasks",
- ),
- _Topic(),
- )
- assert Notifier._is_matching(
- Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.UPDATE,
- attribute_name="tasks",
- ),
- _Topic(EventEntityType.SEQUENCE),
- )
- assert Notifier._is_matching(
- Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.UPDATE,
- attribute_name="tasks",
- ),
- _Topic(
- EventEntityType.SEQUENCE,
- "sequence_id",
- ),
- )
- assert Notifier._is_matching(
- Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.UPDATE,
- attribute_name="tasks",
- ),
- _Topic(operation=EventOperation.UPDATE),
- )
- assert Notifier._is_matching(
- Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.UPDATE,
- attribute_name="tasks",
- ),
- _Topic(EventEntityType.SEQUENCE, "sequence_id", EventOperation.UPDATE),
- )
- assert Notifier._is_matching(
- Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.UPDATE,
- attribute_name="tasks",
- ),
- _Topic(attribute_name="tasks"),
- )
- assert Notifier._is_matching(
- Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.UPDATE,
- attribute_name="tasks",
- ),
- _Topic(EventEntityType.SEQUENCE, attribute_name="tasks"),
- )
- assert Notifier._is_matching(
- Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.UPDATE,
- attribute_name="tasks",
- ),
- _Topic(operation=EventOperation.UPDATE, attribute_name="tasks"),
- )
- assert Notifier._is_matching(
- Event(
- entity_type=EventEntityType.SEQUENCE,
- entity_id="sequence_id",
- operation=EventOperation.UPDATE,
- attribute_name="tasks",
- ),
- _Topic(EventEntityType.SEQUENCE, "sequence_id", EventOperation.UPDATE, "tasks"),
- )
- assert Notifier._is_matching(Event(EventEntityType.TASK, EventOperation.DELETION, "task_id"), _Topic())
- assert Notifier._is_matching(
- Event(EventEntityType.TASK, EventOperation.DELETION, "task_id"), _Topic(EventEntityType.TASK)
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION),
- _Topic(
- EventEntityType.TASK,
- "task_id",
- ),
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION),
- _Topic(operation=EventOperation.DELETION),
- )
- assert Notifier._is_matching(
- Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION),
- _Topic(EventEntityType.TASK, "task_id", EventOperation.DELETION),
- )
- assert not Notifier._is_matching(
- Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.CREATION),
- _Topic(EventEntityType.CYCLE),
- )
- assert not Notifier._is_matching(
- Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.CREATION),
- _Topic(EventEntityType.SCENARIO, "scenario_id"),
- )
- assert not Notifier._is_matching(
- Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.CREATION),
- _Topic(EventEntityType.TASK, "task_id", EventOperation.CREATION),
- )
- assert not Notifier._is_matching(
- Event(entity_type=EventEntityType.JOB, entity_id="job_id", operation=EventOperation.DELETION),
- _Topic(EventEntityType.JOB, "job_id", EventOperation.CREATION),
- )
- assert not Notifier._is_matching(
- Event(entity_type=EventEntityType.JOB, entity_id="job_id", operation=EventOperation.DELETION),
- _Topic(EventEntityType.JOB, "job_id_1", EventOperation.DELETION),
- )
- assert not Notifier._is_matching(
- Event(
- entity_type=EventEntityType.JOB,
- entity_id="job_id",
- operation=EventOperation.UPDATE,
- attribute_name="status",
- ),
- _Topic(EventEntityType.JOB, "job_id", EventOperation.UPDATE, "submit_id"),
- )
- assert not Notifier._is_matching(
- Event(
- entity_type=EventEntityType.JOB,
- entity_id="job_id",
- operation=EventOperation.UPDATE,
- attribute_name="status",
- ),
- _Topic(operation=EventOperation.UPDATE, attribute_name="submit_id"),
- )
- def test_publish_creation_event():
- _, registration_queue = Notifier.register()
- dn_config = Config.configure_data_node("dn_config")
- task_config = Config.configure_task("task_config", print, [dn_config])
- scenario_config = Config.configure_scenario(
- "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
- )
- scenario_config.add_sequences({"sequence_config": [task_config]})
- # Test CREATION Event
- scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
- cycle = scenario.cycle
- task = scenario.tasks[task_config.id]
- dn = scenario.data_nodes[dn_config.id]
- sequence = scenario.sequences["sequence_config"]
- assert registration_queue.qsize() == 5
- published_events = []
- while registration_queue.qsize() != 0:
- published_events.append(registration_queue.get())
- expected_event_types = [
- EventEntityType.CYCLE,
- EventEntityType.DATA_NODE,
- EventEntityType.TASK,
- EventEntityType.SEQUENCE,
- EventEntityType.SCENARIO,
- ]
- expected_event_entity_id = [cycle.id, dn.id, task.id, sequence.id, scenario.id]
- assert all(
- event.entity_type == expected_event_types[i]
- and event.entity_id == expected_event_entity_id[i]
- and event.operation == EventOperation.CREATION
- and event.attribute_name is None
- for i, event in enumerate(published_events)
- )
- def test_publish_update_event():
- dn_config = Config.configure_data_node("dn_config")
- task_config = Config.configure_task("task_config", print, [dn_config])
- scenario_config = Config.configure_scenario(
- "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
- )
- scenario_config.add_sequences({"sequence_config": [task_config]})
- _, registration_queue = Notifier.register()
- scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
- cycle = scenario.cycle
- task = scenario.tasks[task_config.id]
- dn = scenario.data_nodes[dn_config.id]
- sequence = scenario.sequences["sequence_config"]
- assert registration_queue.qsize() == 5
- while registration_queue.qsize() > 0:
- registration_queue.get()
- # Test UPDATE Event
- scenario.is_primary = False
- assert registration_queue.qsize() == 1
- tp.set_primary(scenario)
- assert registration_queue.qsize() == 2
- tp.subscribe_scenario(print, None, scenario=scenario)
- assert registration_queue.qsize() == 3
- tp.unsubscribe_scenario(print, None, scenario=scenario)
- assert registration_queue.qsize() == 4
- tp.tag(scenario, "testing")
- assert registration_queue.qsize() == 5
- tp.untag(scenario, "testing")
- assert registration_queue.qsize() == 6
- scenario.properties["flag"] = "production"
- assert registration_queue.qsize() == 7
- scenario.properties.update({"description": "a scenario", "test_mult": True})
- assert registration_queue.qsize() == 9
- scenario.properties.pop("test_mult")
- assert registration_queue.qsize() == 10
- scenario.name = "my_scenario"
- assert registration_queue.qsize() == 11
- cycle.name = "new cycle name"
- assert registration_queue.qsize() == 12
- cycle.properties["valid"] = True
- assert registration_queue.qsize() == 13
- cycle.properties.update({"re_run_periodically": True})
- assert registration_queue.qsize() == 14
- cycle.properties.pop("re_run_periodically")
- assert registration_queue.qsize() == 15
- sequence.properties["name"] = "weather_forecast"
- assert registration_queue.qsize() == 16
- tp.subscribe_sequence(print, None, sequence)
- assert registration_queue.qsize() == 17
- tp.unsubscribe_sequence(print, None, sequence)
- assert registration_queue.qsize() == 18
- task.skippable = True
- assert registration_queue.qsize() == 19
- task.properties["number_of_run"] = 2
- assert registration_queue.qsize() == 20
- task.properties.update({"debug": True})
- assert registration_queue.qsize() == 21
- task.properties.pop("debug")
- assert registration_queue.qsize() == 22
- dn.editor_id = "new editor id"
- assert registration_queue.qsize() == 23
- dn.properties["sorted"] = True
- assert registration_queue.qsize() == 24
- dn.properties.update({"only_fetch_first_100": True})
- assert registration_queue.qsize() == 25
- dn.properties.pop("only_fetch_first_100")
- assert registration_queue.qsize() == 26
- published_events = []
- while registration_queue.qsize() != 0:
- published_events.append(registration_queue.get())
- expected_event_types = [
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.CYCLE,
- EventEntityType.CYCLE,
- EventEntityType.CYCLE,
- EventEntityType.CYCLE,
- EventEntityType.SEQUENCE,
- EventEntityType.SEQUENCE,
- EventEntityType.SEQUENCE,
- EventEntityType.TASK,
- EventEntityType.TASK,
- EventEntityType.TASK,
- EventEntityType.TASK,
- EventEntityType.DATA_NODE,
- EventEntityType.DATA_NODE,
- EventEntityType.DATA_NODE,
- EventEntityType.DATA_NODE,
- ]
- expected_attribute_names = [
- "is_primary",
- "is_primary",
- "subscribers",
- "subscribers",
- "tags",
- "tags",
- "properties",
- "properties",
- "properties",
- "properties",
- "properties",
- "name",
- "properties",
- "properties",
- "properties",
- "properties",
- "subscribers",
- "subscribers",
- "skippable",
- "properties",
- "properties",
- "properties",
- "editor_id",
- "properties",
- "properties",
- "properties",
- ]
- expected_event_entity_id = [
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- cycle.id,
- cycle.id,
- cycle.id,
- cycle.id,
- sequence.id,
- sequence.id,
- sequence.id,
- task.id,
- task.id,
- task.id,
- task.id,
- dn.id,
- dn.id,
- dn.id,
- dn.id,
- ]
- expected_event_operation_type = [EventOperation.UPDATE] * len(expected_event_types)
- assert all(
- event.entity_type == expected_event_types[i]
- and event.entity_id == expected_event_entity_id[i]
- and event.operation == expected_event_operation_type[i]
- and event.attribute_name == expected_attribute_names[i]
- for i, event in enumerate(published_events)
- )
- def test_publish_update_event_in_context_manager():
- dn_config = Config.configure_data_node("dn_config")
- task_config = Config.configure_task("task_config", print, [dn_config])
- scenario_config = Config.configure_scenario(
- "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
- )
- scenario_config.add_sequences({"sequence_config": [task_config]})
- _, registration_queue = Notifier.register()
- scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
- cycle = scenario.cycle
- task = scenario.tasks[task_config.id]
- dn = scenario.data_nodes[dn_config.id]
- sequence = scenario.sequences["sequence_config"]
- scenario.properties.update({"description": "a scenario"})
- assert registration_queue.qsize() == 6
- while registration_queue.qsize() > 0:
- registration_queue.get()
- # Test UPDATE Event in Context Manager
- assert registration_queue.qsize() == 0
- # If multiple entities is in context, the last to enter will be the first to exit
- # So the published event will have the order starting with scenario first and ending with dn
- with dn as d, task as t, sequence as s, cycle as c, scenario as sc:
- sc.is_primary = True
- assert registration_queue.qsize() == 0
- tp.set_primary(sc)
- assert registration_queue.qsize() == 0
- sc.properties["flag"] = "production"
- assert registration_queue.qsize() == 0
- sc.properties.update({"description": "a scenario"})
- assert registration_queue.qsize() == 0
- sc.properties.pop("description")
- assert registration_queue.qsize() == 0
- sc.name = "my_scenario"
- assert registration_queue.qsize() == 0
- c.name = "another new cycle name"
- assert registration_queue.qsize() == 0
- c.properties["valid"] = True
- assert registration_queue.qsize() == 0
- c.properties.update({"re_run_periodically": True})
- assert registration_queue.qsize() == 0
- s.properties["name"] = "weather_forecast"
- assert registration_queue.qsize() == 0
- t.skippable = True
- assert registration_queue.qsize() == 0
- t.properties["number_of_run"] = 2
- assert registration_queue.qsize() == 0
- t.properties.update({"debug": True})
- assert registration_queue.qsize() == 0
- d.editor_id = "another new editor id"
- assert registration_queue.qsize() == 0
- d.properties["sorted"] = True
- assert registration_queue.qsize() == 0
- d.properties.update({"only_fetch_first_100": True})
- assert registration_queue.qsize() == 0
- published_events = []
- assert registration_queue.qsize() == 16
- while registration_queue.qsize() != 0:
- published_events.append(registration_queue.get())
- expected_event_types = [
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.SCENARIO,
- EventEntityType.CYCLE,
- EventEntityType.CYCLE,
- EventEntityType.CYCLE,
- EventEntityType.SEQUENCE,
- EventEntityType.TASK,
- EventEntityType.TASK,
- EventEntityType.TASK,
- EventEntityType.DATA_NODE,
- EventEntityType.DATA_NODE,
- EventEntityType.DATA_NODE,
- ]
- expected_attribute_names = [
- "is_primary",
- "is_primary",
- "properties",
- "properties",
- "properties",
- "properties",
- "name",
- "properties",
- "properties",
- "properties",
- "skippable",
- "properties",
- "properties",
- "editor_id",
- "properties",
- "properties",
- ]
- expected_event_entity_id = [
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- scenario.id,
- cycle.id,
- cycle.id,
- cycle.id,
- sequence.id,
- task.id,
- task.id,
- task.id,
- dn.id,
- dn.id,
- dn.id,
- ]
- assert all(
- event.entity_type == expected_event_types[i]
- and event.entity_id == expected_event_entity_id[i]
- and event.operation == EventOperation.UPDATE
- and event.attribute_name == expected_attribute_names[i]
- for i, event in enumerate(published_events)
- )
- def test_publish_submission_event():
- _, registration_queue = Notifier.register()
- dn_config = Config.configure_data_node("dn_config")
- task_config = Config.configure_task("task_config", print, [dn_config])
- scenario_config = Config.configure_scenario(
- "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
- )
- scenario_config.add_sequences({"sequence_config": [task_config]})
- scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
- assert registration_queue.qsize() == 5
- while registration_queue.qsize() > 0:
- registration_queue.get()
- # Test SUBMISSION Event
- submission = scenario.submit()
- job = submission.jobs[0]
- assert registration_queue.qsize() == 6
- published_events = []
- while registration_queue.qsize() != 0:
- published_events.append(registration_queue.get())
- expected_operations = [
- EventOperation.CREATION,
- EventOperation.CREATION,
- EventOperation.UPDATE,
- EventOperation.UPDATE,
- EventOperation.UPDATE,
- EventOperation.SUBMISSION,
- ]
- expected_attribute_names = [None, None, "jobs", "status", "submission_status", None]
- expected_event_types = [
- EventEntityType.SUBMISSION,
- EventEntityType.JOB,
- EventEntityType.SUBMISSION,
- EventEntityType.JOB,
- EventEntityType.SUBMISSION,
- EventEntityType.SCENARIO,
- ]
- expected_event_entity_id = [submission.id, job.id, submission.id, job.id, submission.id, scenario.id]
- assert all(
- event.entity_type == expected_event_types[i]
- and event.entity_id == expected_event_entity_id[i]
- and event.operation == expected_operations[i]
- and event.attribute_name == expected_attribute_names[i]
- for i, event in enumerate(published_events)
- )
- assert "job_triggered_submission_status_changed" in published_events[4].metadata
- assert published_events[4].metadata["job_triggered_submission_status_changed"] == job.id
- # Test updating submission_status manually will not add the job_triggered_submission_status_changed
- # to the metadata as no job was used to update the submission_status
- submission.submission_status = SubmissionStatus.CANCELED
- assert registration_queue.qsize() == 1
- published_event = registration_queue.get()
- assert published_event.entity_type == EventEntityType.SUBMISSION
- assert published_event.entity_id == submission.id
- assert published_event.operation == EventOperation.UPDATE
- assert published_event.attribute_name == "submission_status"
- assert "job_triggered_submission_status_changed" not in published_event.metadata
- def test_publish_deletion_event():
- _, registration_queue = Notifier.register()
- dn_config = Config.configure_data_node("dn_config")
- task_config = Config.configure_task("task_config", print, [dn_config])
- scenario_config = Config.configure_scenario(
- "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
- )
- scenario_config.add_sequences({"sequence_config": [task_config]})
- scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
- cycle = scenario.cycle
- task = scenario.tasks[task_config.id]
- dn = scenario.data_nodes[dn_config.id]
- sequence = scenario.sequences["sequence_config"]
- submission = scenario.submit()
- job = submission.jobs[0]
- assert registration_queue.qsize() == 11
- while registration_queue.qsize() > 0:
- registration_queue.get()
- # Test DELETION Event
- tp.delete(scenario.id)
- assert registration_queue.qsize() == 7
- published_events = []
- while registration_queue.qsize() != 0:
- published_events.append(registration_queue.get())
- expected_event_types = [
- EventEntityType.CYCLE,
- EventEntityType.SEQUENCE,
- EventEntityType.SCENARIO,
- EventEntityType.TASK,
- EventEntityType.JOB,
- EventEntityType.DATA_NODE,
- EventEntityType.SUBMISSION,
- ]
- expected_event_entity_id = [cycle.id, sequence.id, scenario.id, task.id, job.id, dn.id, submission.id]
- expected_event_operation_type = [EventOperation.DELETION] * len(expected_event_types)
- assert all(
- event.entity_type == expected_event_types[i]
- and event.entity_id == expected_event_entity_id[i]
- and event.operation == expected_event_operation_type[i]
- and event.attribute_name is None
- for i, event in enumerate(published_events)
- )
- scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
- cycle = scenario.cycle
- assert registration_queue.qsize() == 5
- # only to clear the queue
- while registration_queue.qsize() != 0:
- registration_queue.get()
- tp.clean_all_entities(_VersionManagerFactory._build_manager()._get_latest_version())
- assert registration_queue.qsize() == 6
- published_events = []
- while registration_queue.qsize() != 0:
- published_events.append(registration_queue.get())
- expected_event_types = [
- EventEntityType.JOB,
- EventEntityType.SUBMISSION,
- EventEntityType.CYCLE,
- EventEntityType.SCENARIO,
- EventEntityType.TASK,
- EventEntityType.DATA_NODE,
- ]
- expected_event_entity_id = [None, None, cycle.id, scenario.id, None, None]
- assert all(
- event.entity_type == expected_event_types[i]
- and event.entity_id == expected_event_entity_id[i]
- and event.operation == EventOperation.DELETION
- and event.attribute_name is None
- for i, event in enumerate(published_events)
- )
- def find_registration_and_topics(registration_id: str):
- topics = set()
- registration = None
- for topic, registrations in Notifier._topics_registrations_list.items():
- for reg in registrations:
- if reg.registration_id == registration_id:
- topics.add(topic)
- registration = reg
- break
- return topics, registration
- def test_register_no_topic():
- assert len(Notifier._topics_registrations_list) == 0
- empty_registration = _Registration()
- id, queue = Notifier._register_from_registration(empty_registration)
- assert id == empty_registration.registration_id
- assert queue == empty_registration.queue
- assert len(Notifier._topics_registrations_list) == 0
- retrieved_topics, retrieved_registration = find_registration_and_topics(id)
- assert retrieved_topics == set()
- assert retrieved_registration is None
- # Unregister the registration
- Notifier.unregister(id)
- assert len(Notifier._topics_registrations_list) == 0
- retrieved_topics, retrieved_registration = find_registration_and_topics(id)
- assert retrieved_topics == set()
- assert retrieved_registration is None
- def test_register_multiple_topics():
- assert len(Notifier._topics_registrations_list) == 0
- # Registration_0 with 4 topics
- registration_0 = _Registration()
- registration_0.add_topic(EventEntityType.SCENARIO)
- registration_0.add_topic(EventEntityType.TASK, "task_id")
- registration_0.add_topic(EventEntityType.DATA_NODE, operation=EventOperation.CREATION)
- registration_0.add_topic(operation=EventOperation.DELETION)
- id_0, queue_0 = Notifier._register_from_registration(registration_0)
- assert id_0 == registration_0.registration_id
- assert queue_0 == registration_0.queue
- assert len(registration_0.topics) == 4
- assert len(Notifier._topics_registrations_list) == 4
- for topic in registration_0.topics:
- assert topic in Notifier._topics_registrations_list
- assert len(Notifier._topics_registrations_list[topic]) == 1
- assert registration_0 in Notifier._topics_registrations_list[topic]
- retrieved_topics, retrieved_registration = find_registration_and_topics(id_0)
- assert retrieved_topics == registration_0.topics
- assert retrieved_registration == registration_0
- # Registration_1 with 3 common topics and 1 new topic
- registration_1 = _Registration()
- registration_1.add_topic(EventEntityType.SCENARIO)
- registration_1.add_topic(EventEntityType.TASK, "task_id")
- registration_1.add_topic(EventEntityType.DATA_NODE, operation=EventOperation.UPDATE)
- registration_1.add_topic(operation=EventOperation.DELETION)
- id_1, queue_1 = Notifier._register_from_registration(registration_1)
- retrieved_topics, retrieved_registration = find_registration_and_topics(id_1)
- assert retrieved_topics == registration_1.topics
- assert retrieved_registration == registration_1 != registration_0
- assert id_1 == registration_1.registration_id != id_0
- assert queue_1 == registration_1.queue != queue_0
- assert len(registration_1.topics) == 4
- assert len(Notifier._topics_registrations_list) == 5
- for topic in registration_1.topics:
- assert topic in Notifier._topics_registrations_list
- if topic.operation == EventOperation.UPDATE:
- assert len(Notifier._topics_registrations_list[topic]) == 1
- else:
- assert len(Notifier._topics_registrations_list[topic]) == 2
- assert registration_1 in Notifier._topics_registrations_list[topic]
- # Unregister registration_0
- Notifier.unregister(id_0)
- retrieved_topics, retrieved_registration = find_registration_and_topics(id_0)
- assert retrieved_topics == set()
- assert retrieved_registration is None
- assert len(Notifier._topics_registrations_list) == 4 # The 4 topics from registration_1
- for topic in registration_1.topics:
- assert topic in Notifier._topics_registrations_list
- assert len(Notifier._topics_registrations_list[topic]) == 1
- assert registration_1 in Notifier._topics_registrations_list[topic]
- # Unregister registration_1
- Notifier.unregister(id_1)
- retrieved_topics, retrieved_registration = find_registration_and_topics(id_1)
- assert retrieved_topics == set()
- assert retrieved_registration is None
- assert len(Notifier._topics_registrations_list) == 0
|