|
@@ -15,7 +15,7 @@ from taipy.common.config import Config
|
|
|
from taipy.core import taipy as tp
|
|
|
from taipy.core._version._version_manager_factory import _VersionManagerFactory
|
|
|
from taipy.core.common.frequency import Frequency
|
|
|
-from taipy.core.notification import EventEntityType, EventOperation
|
|
|
+from taipy.core.notification import EventEntityType, EventOperation, _Registration
|
|
|
from taipy.core.notification._topic import _Topic
|
|
|
from taipy.core.notification.event import Event
|
|
|
from taipy.core.notification.notifier import Notifier
|
|
@@ -23,7 +23,7 @@ from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactor
|
|
|
from taipy.core.submission.submission_status import SubmissionStatus
|
|
|
|
|
|
|
|
|
-def test_register():
|
|
|
+def test_register_unregister():
|
|
|
def find_registration_and_topic(registration_id):
|
|
|
for topic, registrations in Notifier._topics_registrations_list.items():
|
|
|
for registration in registrations:
|
|
@@ -87,6 +87,8 @@ def test_register():
|
|
|
registration_id_5, register_queue_5 = Notifier.register(EventEntityType.SCENARIO)
|
|
|
topic_5, registration_5 = find_registration_and_topic(registration_id_5)
|
|
|
|
|
|
+ assert topic_5 == topic_2
|
|
|
+ assert registration_5 != registration_2
|
|
|
assert isinstance(registration_id_5, str) and registration_id_5 == registration_5.registration_id
|
|
|
assert isinstance(register_queue_5, SimpleQueue)
|
|
|
assert len(Notifier._topics_registrations_list.keys()) == 4
|
|
@@ -95,12 +97,15 @@ def test_register():
|
|
|
assert register_queue_5 in [registration.queue for registration in Notifier._topics_registrations_list[topic_5]]
|
|
|
|
|
|
registration_id_6, register_queue_6 = Notifier.register()
|
|
|
+ topic_6, registration_6 = find_registration_and_topic(registration_id_6)
|
|
|
+ assert topic_6 == topic_0
|
|
|
+ assert registration_6 != registration_0
|
|
|
assert len(Notifier._topics_registrations_list.keys()) == 4
|
|
|
- assert len(Notifier._topics_registrations_list[topic_0]) == 3
|
|
|
+ assert len(Notifier._topics_registrations_list[topic_6]) == 3
|
|
|
|
|
|
Notifier.unregister(registration_id_6)
|
|
|
assert len(Notifier._topics_registrations_list.keys()) == 4
|
|
|
- assert len(Notifier._topics_registrations_list[topic_0]) == 2
|
|
|
+ assert len(Notifier._topics_registrations_list[topic_6]) == 2
|
|
|
|
|
|
Notifier.unregister(registration_id_4)
|
|
|
assert len(Notifier._topics_registrations_list.keys()) == 3
|
|
@@ -249,9 +254,9 @@ def test_matching():
|
|
|
),
|
|
|
_Topic(EventEntityType.SEQUENCE, "sequence_id", EventOperation.UPDATE, "tasks"),
|
|
|
)
|
|
|
- assert Notifier._is_matching(Event(EventEntityType.TASK, "task_id", EventOperation.DELETION), _Topic())
|
|
|
+ assert Notifier._is_matching(Event(EventEntityType.TASK, EventOperation.DELETION, "task_id"), _Topic())
|
|
|
assert Notifier._is_matching(
|
|
|
- Event(EventEntityType.TASK, "task_id", EventOperation.DELETION), _Topic(EventEntityType.TASK)
|
|
|
+ Event(EventEntityType.TASK, EventOperation.DELETION, "task_id"), _Topic(EventEntityType.TASK)
|
|
|
)
|
|
|
assert Notifier._is_matching(
|
|
|
Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION),
|
|
@@ -315,7 +320,7 @@ def test_publish_creation_event():
|
|
|
dn_config = Config.configure_data_node("dn_config")
|
|
|
task_config = Config.configure_task("task_config", print, [dn_config])
|
|
|
scenario_config = Config.configure_scenario(
|
|
|
- "scenario_config", [task_config], frequency=Frequency.DAILY, flag="test"
|
|
|
+ "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
|
|
|
)
|
|
|
scenario_config.add_sequences({"sequence_config": [task_config]})
|
|
|
|
|
@@ -352,15 +357,15 @@ def test_publish_creation_event():
|
|
|
|
|
|
|
|
|
def test_publish_update_event():
|
|
|
- _, registration_queue = Notifier.register()
|
|
|
|
|
|
dn_config = Config.configure_data_node("dn_config")
|
|
|
task_config = Config.configure_task("task_config", print, [dn_config])
|
|
|
scenario_config = Config.configure_scenario(
|
|
|
- "scenario_config", [task_config], frequency=Frequency.DAILY, flag="test"
|
|
|
+ "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
|
|
|
)
|
|
|
scenario_config.add_sequences({"sequence_config": [task_config]})
|
|
|
|
|
|
+ _, registration_queue = Notifier.register()
|
|
|
scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
|
|
|
cycle = scenario.cycle
|
|
|
task = scenario.tasks[task_config.id]
|
|
@@ -549,15 +554,14 @@ def test_publish_update_event():
|
|
|
|
|
|
|
|
|
def test_publish_update_event_in_context_manager():
|
|
|
- _, registration_queue = Notifier.register()
|
|
|
-
|
|
|
dn_config = Config.configure_data_node("dn_config")
|
|
|
task_config = Config.configure_task("task_config", print, [dn_config])
|
|
|
scenario_config = Config.configure_scenario(
|
|
|
- "scenario_config", [task_config], frequency=Frequency.DAILY, flag="test"
|
|
|
+ "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
|
|
|
)
|
|
|
scenario_config.add_sequences({"sequence_config": [task_config]})
|
|
|
|
|
|
+ _, registration_queue = Notifier.register()
|
|
|
scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
|
|
|
cycle = scenario.cycle
|
|
|
task = scenario.tasks[task_config.id]
|
|
@@ -700,7 +704,7 @@ def test_publish_submission_event():
|
|
|
dn_config = Config.configure_data_node("dn_config")
|
|
|
task_config = Config.configure_task("task_config", print, [dn_config])
|
|
|
scenario_config = Config.configure_scenario(
|
|
|
- "scenario_config", [task_config], frequency=Frequency.DAILY, flag="test"
|
|
|
+ "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
|
|
|
)
|
|
|
scenario_config.add_sequences({"sequence_config": [task_config]})
|
|
|
scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
|
|
@@ -767,7 +771,7 @@ def test_publish_deletion_event():
|
|
|
dn_config = Config.configure_data_node("dn_config")
|
|
|
task_config = Config.configure_task("task_config", print, [dn_config])
|
|
|
scenario_config = Config.configure_scenario(
|
|
|
- "scenario_config", [task_config], frequency=Frequency.DAILY, flag="test"
|
|
|
+ "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
|
|
|
)
|
|
|
scenario_config.add_sequences({"sequence_config": [task_config]})
|
|
|
scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
|
|
@@ -844,3 +848,101 @@ def test_publish_deletion_event():
|
|
|
and event.attribute_name is None
|
|
|
for i, event in enumerate(published_events)
|
|
|
)
|
|
|
+
|
|
|
+
|
|
|
+def find_registration_and_topics(registration_id: str):
|
|
|
+ topics = set()
|
|
|
+ registration = None
|
|
|
+ for topic, registrations in Notifier._topics_registrations_list.items():
|
|
|
+ for reg in registrations:
|
|
|
+ if reg.registration_id == registration_id:
|
|
|
+ topics.add(topic)
|
|
|
+ registration = reg
|
|
|
+ break
|
|
|
+ return topics, registration
|
|
|
+
|
|
|
+
|
|
|
+def test_register_no_topic():
|
|
|
+ assert len(Notifier._topics_registrations_list) == 0
|
|
|
+ empty_registration = _Registration()
|
|
|
+
|
|
|
+ id, queue = Notifier._register_from_registration(empty_registration)
|
|
|
+
|
|
|
+ assert id == empty_registration.registration_id
|
|
|
+ assert queue == empty_registration.queue
|
|
|
+ assert len(Notifier._topics_registrations_list) == 0
|
|
|
+ retrieved_topics, retrieved_registration = find_registration_and_topics(id)
|
|
|
+ assert retrieved_topics == set()
|
|
|
+ assert retrieved_registration is None
|
|
|
+
|
|
|
+ # Unregister the registration
|
|
|
+ Notifier.unregister(id)
|
|
|
+ assert len(Notifier._topics_registrations_list) == 0
|
|
|
+ retrieved_topics, retrieved_registration = find_registration_and_topics(id)
|
|
|
+ assert retrieved_topics == set()
|
|
|
+ assert retrieved_registration is None
|
|
|
+
|
|
|
+
|
|
|
+def test_register_multiple_topics():
|
|
|
+ assert len(Notifier._topics_registrations_list) == 0
|
|
|
+
|
|
|
+ # Registration_0 with 4 topics
|
|
|
+ registration_0 = _Registration()
|
|
|
+ registration_0.add_topic(EventEntityType.SCENARIO)
|
|
|
+ registration_0.add_topic(EventEntityType.TASK, "task_id")
|
|
|
+ registration_0.add_topic(EventEntityType.DATA_NODE, operation=EventOperation.CREATION)
|
|
|
+ registration_0.add_topic(operation=EventOperation.DELETION)
|
|
|
+ id_0, queue_0 = Notifier._register_from_registration(registration_0)
|
|
|
+
|
|
|
+ assert id_0 == registration_0.registration_id
|
|
|
+ assert queue_0 == registration_0.queue
|
|
|
+ assert len(registration_0.topics) == 4
|
|
|
+ assert len(Notifier._topics_registrations_list) == 4
|
|
|
+ for topic in registration_0.topics:
|
|
|
+ assert topic in Notifier._topics_registrations_list
|
|
|
+ assert len(Notifier._topics_registrations_list[topic]) == 1
|
|
|
+ assert registration_0 in Notifier._topics_registrations_list[topic]
|
|
|
+ retrieved_topics, retrieved_registration = find_registration_and_topics(id_0)
|
|
|
+ assert retrieved_topics == registration_0.topics
|
|
|
+ assert retrieved_registration == registration_0
|
|
|
+
|
|
|
+ # Registration_1 with 3 common topics and 1 new topic
|
|
|
+ registration_1 = _Registration()
|
|
|
+ registration_1.add_topic(EventEntityType.SCENARIO)
|
|
|
+ registration_1.add_topic(EventEntityType.TASK, "task_id")
|
|
|
+ registration_1.add_topic(EventEntityType.DATA_NODE, operation=EventOperation.UPDATE)
|
|
|
+ registration_1.add_topic(operation=EventOperation.DELETION)
|
|
|
+ id_1, queue_1 = Notifier._register_from_registration(registration_1)
|
|
|
+
|
|
|
+ retrieved_topics, retrieved_registration = find_registration_and_topics(id_1)
|
|
|
+ assert retrieved_topics == registration_1.topics
|
|
|
+ assert retrieved_registration == registration_1 != registration_0
|
|
|
+ assert id_1 == registration_1.registration_id != id_0
|
|
|
+ assert queue_1 == registration_1.queue != queue_0
|
|
|
+ assert len(registration_1.topics) == 4
|
|
|
+ assert len(Notifier._topics_registrations_list) == 5
|
|
|
+ for topic in registration_1.topics:
|
|
|
+ assert topic in Notifier._topics_registrations_list
|
|
|
+ if topic.operation == EventOperation.UPDATE:
|
|
|
+ assert len(Notifier._topics_registrations_list[topic]) == 1
|
|
|
+ else:
|
|
|
+ assert len(Notifier._topics_registrations_list[topic]) == 2
|
|
|
+ assert registration_1 in Notifier._topics_registrations_list[topic]
|
|
|
+
|
|
|
+ # Unregister registration_0
|
|
|
+ Notifier.unregister(id_0)
|
|
|
+ retrieved_topics, retrieved_registration = find_registration_and_topics(id_0)
|
|
|
+ assert retrieved_topics == set()
|
|
|
+ assert retrieved_registration is None
|
|
|
+ assert len(Notifier._topics_registrations_list) == 4 # The 4 topics from registration_1
|
|
|
+ for topic in registration_1.topics:
|
|
|
+ assert topic in Notifier._topics_registrations_list
|
|
|
+ assert len(Notifier._topics_registrations_list[topic]) == 1
|
|
|
+ assert registration_1 in Notifier._topics_registrations_list[topic]
|
|
|
+
|
|
|
+ # Unregister registration_1
|
|
|
+ Notifier.unregister(id_1)
|
|
|
+ retrieved_topics, retrieved_registration = find_registration_and_topics(id_1)
|
|
|
+ assert retrieved_topics == set()
|
|
|
+ assert retrieved_registration is None
|
|
|
+ assert len(Notifier._topics_registrations_list) == 0
|