test_notifier.py 35 KB


  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from queue import SimpleQueue
  12. from taipy import Frequency
  13. from taipy.common.config import Config
  14. from taipy.core import taipy as tp
  15. from taipy.core._version._version_manager_factory import _VersionManagerFactory
  16. from taipy.core.notification import EventEntityType, EventOperation, _Registration
  17. from taipy.core.notification._topic import _Topic
  18. from taipy.core.notification.event import Event
  19. from taipy.core.notification.notifier import Notifier
  20. from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
  21. from taipy.core.submission.submission_status import SubmissionStatus
  22. def test_register_unregister():
  23. def find_registration_and_topic(registration_id):
  24. for topic, registrations in Notifier._topics_registrations_list.items():
  25. for registration in registrations:
  26. if registration.registration_id == registration_id:
  27. return topic, registration
  28. assert len(Notifier._topics_registrations_list) == 0
  29. registration_id_0, register_queue_0 = Notifier.register()
  30. topic_0, registration_0 = find_registration_and_topic(registration_id_0)
  31. assert isinstance(registration_id_0, str) and registration_id_0 == registration_0.registration_id
  32. assert isinstance(register_queue_0, SimpleQueue)
  33. assert len(Notifier._topics_registrations_list.keys()) == 1
  34. assert len(Notifier._topics_registrations_list[topic_0]) == 1
  35. assert registration_0.queue == register_queue_0
  36. assert register_queue_0 in [registration.queue for registration in Notifier._topics_registrations_list[topic_0]]
  37. registration_id_1, register_queue_1 = Notifier.register()
  38. topic_1, registration_1 = find_registration_and_topic(registration_id_1)
  39. assert isinstance(registration_id_1, str) and registration_id_1 == registration_1.registration_id
  40. assert isinstance(register_queue_1, SimpleQueue)
  41. assert len(Notifier._topics_registrations_list.keys()) == 1
  42. assert len(Notifier._topics_registrations_list[topic_1]) == 2
  43. assert registration_1.queue == register_queue_1
  44. assert register_queue_1 in [registration.queue for registration in Notifier._topics_registrations_list[topic_1]]
  45. registration_id_2, register_queue_2 = Notifier.register(EventEntityType.SCENARIO)
  46. topic_2, registration_2 = find_registration_and_topic(registration_id_2)
  47. assert isinstance(registration_id_2, str) and registration_id_2 == registration_2.registration_id
  48. assert isinstance(register_queue_2, SimpleQueue)
  49. assert len(Notifier._topics_registrations_list.keys()) == 2
  50. assert len(Notifier._topics_registrations_list[topic_2]) == 1
  51. assert registration_2.queue == register_queue_2
  52. assert register_queue_2 in [registration.queue for registration in Notifier._topics_registrations_list[topic_2]]
  53. registration_id_3, register_queue_3 = Notifier.register(EventEntityType.SCENARIO, "scenario_id")
  54. topic_3, registration_3 = find_registration_and_topic(registration_id_3)
  55. assert isinstance(registration_id_3, str) and registration_id_3 == registration_3.registration_id
  56. assert isinstance(register_queue_3, SimpleQueue)
  57. assert len(Notifier._topics_registrations_list.keys()) == 3
  58. assert len(Notifier._topics_registrations_list[topic_3]) == 1
  59. assert registration_3.queue == register_queue_3
  60. assert register_queue_3 in [registration.queue for registration in Notifier._topics_registrations_list[topic_3]]
  61. registration_id_4, register_queue_4 = Notifier.register(
  62. EventEntityType.SEQUENCE, "sequence_id", EventOperation.UPDATE, "tasks"
  63. )
  64. topic_4, registration_4 = find_registration_and_topic(registration_id_4)
  65. assert isinstance(registration_id_4, str) and registration_id_4 == registration_4.registration_id
  66. assert isinstance(register_queue_4, SimpleQueue)
  67. assert len(Notifier._topics_registrations_list.keys()) == 4
  68. assert len(Notifier._topics_registrations_list[topic_4]) == 1
  69. assert registration_4.queue == register_queue_4
  70. assert register_queue_4 in [registration.queue for registration in Notifier._topics_registrations_list[topic_4]]
  71. registration_id_5, register_queue_5 = Notifier.register(EventEntityType.SCENARIO)
  72. topic_5, registration_5 = find_registration_and_topic(registration_id_5)
  73. assert topic_5 == topic_2
  74. assert registration_5 != registration_2
  75. assert isinstance(registration_id_5, str) and registration_id_5 == registration_5.registration_id
  76. assert isinstance(register_queue_5, SimpleQueue)
  77. assert len(Notifier._topics_registrations_list.keys()) == 4
  78. assert len(Notifier._topics_registrations_list[topic_5]) == 2
  79. assert registration_5.queue == register_queue_5
  80. assert register_queue_5 in [registration.queue for registration in Notifier._topics_registrations_list[topic_5]]
  81. registration_id_6, register_queue_6 = Notifier.register()
  82. topic_6, registration_6 = find_registration_and_topic(registration_id_6)
  83. assert topic_6 == topic_0
  84. assert registration_6 != registration_0
  85. assert len(Notifier._topics_registrations_list.keys()) == 4
  86. assert len(Notifier._topics_registrations_list[topic_6]) == 3
  87. Notifier.unregister(registration_id_6)
  88. assert len(Notifier._topics_registrations_list.keys()) == 4
  89. assert len(Notifier._topics_registrations_list[topic_6]) == 2
  90. Notifier.unregister(registration_id_4)
  91. assert len(Notifier._topics_registrations_list.keys()) == 3
  92. assert topic_4 not in Notifier._topics_registrations_list.keys()
  93. Notifier.unregister(registration_id_0)
  94. assert len(Notifier._topics_registrations_list.keys()) == 3
  95. assert len(Notifier._topics_registrations_list[topic_0]) == 1
  96. Notifier.unregister(registration_id_1)
  97. assert len(Notifier._topics_registrations_list.keys()) == 2
  98. assert all(topic not in Notifier._topics_registrations_list.keys() for topic in [topic_0, topic_1])
  99. Notifier.unregister(registration_id_2)
  100. Notifier.unregister(registration_id_3)
  101. Notifier.unregister(registration_id_5)
  102. assert len(Notifier._topics_registrations_list.keys()) == 0
  103. def test_matching():
  104. assert Notifier._is_matching(
  105. Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION), _Topic()
  106. )
  107. assert Notifier._is_matching(
  108. Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
  109. _Topic(EventEntityType.CYCLE),
  110. )
  111. assert Notifier._is_matching(
  112. Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
  113. _Topic(EventEntityType.CYCLE, "cycle_id"),
  114. )
  115. assert Notifier._is_matching(
  116. Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
  117. _Topic(operation=EventOperation.CREATION),
  118. )
  119. assert Notifier._is_matching(
  120. Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
  121. _Topic(EventEntityType.CYCLE, "cycle_id", EventOperation.CREATION),
  122. )
  123. assert Notifier._is_matching(
  124. Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
  125. _Topic(),
  126. )
  127. assert Notifier._is_matching(
  128. Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
  129. _Topic(EventEntityType.SCENARIO),
  130. )
  131. assert Notifier._is_matching(
  132. Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
  133. _Topic(
  134. EventEntityType.SCENARIO,
  135. "scenario_id",
  136. ),
  137. )
  138. assert Notifier._is_matching(
  139. Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
  140. _Topic(operation=EventOperation.SUBMISSION),
  141. )
  142. assert Notifier._is_matching(
  143. Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
  144. _Topic(EventEntityType.SCENARIO, "scenario_id", EventOperation.SUBMISSION),
  145. )
  146. assert Notifier._is_matching(
  147. Event(
  148. entity_type=EventEntityType.SEQUENCE,
  149. entity_id="sequence_id",
  150. operation=EventOperation.UPDATE,
  151. attribute_name=r"tasks",
  152. ),
  153. _Topic(),
  154. )
  155. assert Notifier._is_matching(
  156. Event(
  157. entity_type=EventEntityType.SEQUENCE,
  158. entity_id="sequence_id",
  159. operation=EventOperation.UPDATE,
  160. attribute_name="tasks",
  161. ),
  162. _Topic(EventEntityType.SEQUENCE),
  163. )
  164. assert Notifier._is_matching(
  165. Event(
  166. entity_type=EventEntityType.SEQUENCE,
  167. entity_id="sequence_id",
  168. operation=EventOperation.UPDATE,
  169. attribute_name="tasks",
  170. ),
  171. _Topic(
  172. EventEntityType.SEQUENCE,
  173. "sequence_id",
  174. ),
  175. )
  176. assert Notifier._is_matching(
  177. Event(
  178. entity_type=EventEntityType.SEQUENCE,
  179. entity_id="sequence_id",
  180. operation=EventOperation.UPDATE,
  181. attribute_name="tasks",
  182. ),
  183. _Topic(operation=EventOperation.UPDATE),
  184. )
  185. assert Notifier._is_matching(
  186. Event(
  187. entity_type=EventEntityType.SEQUENCE,
  188. entity_id="sequence_id",
  189. operation=EventOperation.UPDATE,
  190. attribute_name="tasks",
  191. ),
  192. _Topic(EventEntityType.SEQUENCE, "sequence_id", EventOperation.UPDATE),
  193. )
  194. assert Notifier._is_matching(
  195. Event(
  196. entity_type=EventEntityType.SEQUENCE,
  197. entity_id="sequence_id",
  198. operation=EventOperation.UPDATE,
  199. attribute_name="tasks",
  200. ),
  201. _Topic(attribute_name="tasks"),
  202. )
  203. assert Notifier._is_matching(
  204. Event(
  205. entity_type=EventEntityType.SEQUENCE,
  206. entity_id="sequence_id",
  207. operation=EventOperation.UPDATE,
  208. attribute_name="tasks",
  209. ),
  210. _Topic(EventEntityType.SEQUENCE, attribute_name="tasks"),
  211. )
  212. assert Notifier._is_matching(
  213. Event(
  214. entity_type=EventEntityType.SEQUENCE,
  215. entity_id="sequence_id",
  216. operation=EventOperation.UPDATE,
  217. attribute_name="tasks",
  218. ),
  219. _Topic(operation=EventOperation.UPDATE, attribute_name="tasks"),
  220. )
  221. assert Notifier._is_matching(
  222. Event(
  223. entity_type=EventEntityType.SEQUENCE,
  224. entity_id="sequence_id",
  225. operation=EventOperation.UPDATE,
  226. attribute_name="tasks",
  227. ),
  228. _Topic(EventEntityType.SEQUENCE, "sequence_id", EventOperation.UPDATE, "tasks"),
  229. )
  230. assert Notifier._is_matching(Event(EventEntityType.TASK, EventOperation.DELETION, "task_id"), _Topic())
  231. assert Notifier._is_matching(
  232. Event(EventEntityType.TASK, EventOperation.DELETION, "task_id"), _Topic(EventEntityType.TASK)
  233. )
  234. assert Notifier._is_matching(
  235. Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION),
  236. _Topic(
  237. EventEntityType.TASK,
  238. "task_id",
  239. ),
  240. )
  241. assert Notifier._is_matching(
  242. Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION),
  243. _Topic(operation=EventOperation.DELETION),
  244. )
  245. assert Notifier._is_matching(
  246. Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION),
  247. _Topic(EventEntityType.TASK, "task_id", EventOperation.DELETION),
  248. )
  249. assert not Notifier._is_matching(
  250. Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.CREATION),
  251. _Topic(EventEntityType.CYCLE),
  252. )
  253. assert not Notifier._is_matching(
  254. Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.CREATION),
  255. _Topic(EventEntityType.SCENARIO, "scenario_id"),
  256. )
  257. assert not Notifier._is_matching(
  258. Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.CREATION),
  259. _Topic(EventEntityType.TASK, "task_id", EventOperation.CREATION),
  260. )
  261. assert not Notifier._is_matching(
  262. Event(entity_type=EventEntityType.JOB, entity_id="job_id", operation=EventOperation.DELETION),
  263. _Topic(EventEntityType.JOB, "job_id", EventOperation.CREATION),
  264. )
  265. assert not Notifier._is_matching(
  266. Event(entity_type=EventEntityType.JOB, entity_id="job_id", operation=EventOperation.DELETION),
  267. _Topic(EventEntityType.JOB, "job_id_1", EventOperation.DELETION),
  268. )
  269. assert not Notifier._is_matching(
  270. Event(
  271. entity_type=EventEntityType.JOB,
  272. entity_id="job_id",
  273. operation=EventOperation.UPDATE,
  274. attribute_name="status",
  275. ),
  276. _Topic(EventEntityType.JOB, "job_id", EventOperation.UPDATE, "submit_id"),
  277. )
  278. assert not Notifier._is_matching(
  279. Event(
  280. entity_type=EventEntityType.JOB,
  281. entity_id="job_id",
  282. operation=EventOperation.UPDATE,
  283. attribute_name="status",
  284. ),
  285. _Topic(operation=EventOperation.UPDATE, attribute_name="submit_id"),
  286. )
  287. def test_publish_creation_event():
  288. _, registration_queue = Notifier.register()
  289. dn_config = Config.configure_data_node("dn_config")
  290. task_config = Config.configure_task("task_config", print, [dn_config])
  291. scenario_config = Config.configure_scenario(
  292. "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
  293. )
  294. scenario_config.add_sequences({"sequence_config": [task_config]})
  295. # Test CREATION Event
  296. scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
  297. cycle = scenario.cycle
  298. task = scenario.tasks[task_config.id]
  299. dn = scenario.data_nodes[dn_config.id]
  300. sequence = scenario.sequences["sequence_config"]
  301. assert registration_queue.qsize() == 5
  302. published_events = []
  303. while registration_queue.qsize() != 0:
  304. published_events.append(registration_queue.get())
  305. expected_event_types = [
  306. EventEntityType.CYCLE,
  307. EventEntityType.DATA_NODE,
  308. EventEntityType.TASK,
  309. EventEntityType.SEQUENCE,
  310. EventEntityType.SCENARIO,
  311. ]
  312. expected_event_entity_id = [cycle.id, dn.id, task.id, sequence.id, scenario.id]
  313. assert all(
  314. event.entity_type == expected_event_types[i]
  315. and event.entity_id == expected_event_entity_id[i]
  316. and event.operation == EventOperation.CREATION
  317. and event.attribute_name is None
  318. for i, event in enumerate(published_events)
  319. )
  320. def test_publish_update_event():
  321. dn_config = Config.configure_data_node("dn_config")
  322. task_config = Config.configure_task("task_config", print, [dn_config])
  323. scenario_config = Config.configure_scenario(
  324. "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
  325. )
  326. scenario_config.add_sequences({"sequence_config": [task_config]})
  327. _, registration_queue = Notifier.register()
  328. scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
  329. cycle = scenario.cycle
  330. task = scenario.tasks[task_config.id]
  331. dn = scenario.data_nodes[dn_config.id]
  332. sequence = scenario.sequences["sequence_config"]
  333. assert registration_queue.qsize() == 5
  334. while registration_queue.qsize() > 0:
  335. registration_queue.get()
  336. # Test UPDATE Event
  337. scenario.is_primary = False
  338. assert registration_queue.qsize() == 1
  339. tp.set_primary(scenario)
  340. assert registration_queue.qsize() == 2
  341. tp.subscribe_scenario(print, None, scenario=scenario)
  342. assert registration_queue.qsize() == 3
  343. tp.unsubscribe_scenario(print, None, scenario=scenario)
  344. assert registration_queue.qsize() == 4
  345. tp.tag(scenario, "testing")
  346. assert registration_queue.qsize() == 5
  347. tp.untag(scenario, "testing")
  348. assert registration_queue.qsize() == 6
  349. scenario.properties["flag"] = "production"
  350. assert registration_queue.qsize() == 7
  351. scenario.properties.update({"description": "a scenario", "test_mult": True})
  352. assert registration_queue.qsize() == 9
  353. scenario.properties.pop("test_mult")
  354. assert registration_queue.qsize() == 10
  355. scenario.name = "my_scenario"
  356. assert registration_queue.qsize() == 11
  357. cycle.name = "new cycle name"
  358. assert registration_queue.qsize() == 12
  359. cycle.properties["valid"] = True
  360. assert registration_queue.qsize() == 13
  361. cycle.properties.update({"re_run_periodically": True})
  362. assert registration_queue.qsize() == 14
  363. cycle.properties.pop("re_run_periodically")
  364. assert registration_queue.qsize() == 15
  365. sequence.properties["name"] = "weather_forecast"
  366. assert registration_queue.qsize() == 16
  367. tp.subscribe_sequence(print, None, sequence)
  368. assert registration_queue.qsize() == 17
  369. tp.unsubscribe_sequence(print, None, sequence)
  370. assert registration_queue.qsize() == 18
  371. task.skippable = True
  372. assert registration_queue.qsize() == 19
  373. task.properties["number_of_run"] = 2
  374. assert registration_queue.qsize() == 20
  375. task.properties.update({"debug": True})
  376. assert registration_queue.qsize() == 21
  377. task.properties.pop("debug")
  378. assert registration_queue.qsize() == 22
  379. dn.editor_id = "new editor id"
  380. assert registration_queue.qsize() == 23
  381. dn.properties["sorted"] = True
  382. assert registration_queue.qsize() == 24
  383. dn.properties.update({"only_fetch_first_100": True})
  384. assert registration_queue.qsize() == 25
  385. dn.properties.pop("only_fetch_first_100")
  386. assert registration_queue.qsize() == 26
  387. published_events = []
  388. while registration_queue.qsize() != 0:
  389. published_events.append(registration_queue.get())
  390. expected_event_types = [
  391. EventEntityType.SCENARIO,
  392. EventEntityType.SCENARIO,
  393. EventEntityType.SCENARIO,
  394. EventEntityType.SCENARIO,
  395. EventEntityType.SCENARIO,
  396. EventEntityType.SCENARIO,
  397. EventEntityType.SCENARIO,
  398. EventEntityType.SCENARIO,
  399. EventEntityType.SCENARIO,
  400. EventEntityType.SCENARIO,
  401. EventEntityType.SCENARIO,
  402. EventEntityType.CYCLE,
  403. EventEntityType.CYCLE,
  404. EventEntityType.CYCLE,
  405. EventEntityType.CYCLE,
  406. EventEntityType.SEQUENCE,
  407. EventEntityType.SEQUENCE,
  408. EventEntityType.SEQUENCE,
  409. EventEntityType.TASK,
  410. EventEntityType.TASK,
  411. EventEntityType.TASK,
  412. EventEntityType.TASK,
  413. EventEntityType.DATA_NODE,
  414. EventEntityType.DATA_NODE,
  415. EventEntityType.DATA_NODE,
  416. EventEntityType.DATA_NODE,
  417. ]
  418. expected_attribute_names = [
  419. "is_primary",
  420. "is_primary",
  421. "subscribers",
  422. "subscribers",
  423. "tags",
  424. "tags",
  425. "properties",
  426. "properties",
  427. "properties",
  428. "properties",
  429. "properties",
  430. "name",
  431. "properties",
  432. "properties",
  433. "properties",
  434. "properties",
  435. "subscribers",
  436. "subscribers",
  437. "skippable",
  438. "properties",
  439. "properties",
  440. "properties",
  441. "editor_id",
  442. "properties",
  443. "properties",
  444. "properties",
  445. ]
  446. expected_event_entity_id = [
  447. scenario.id,
  448. scenario.id,
  449. scenario.id,
  450. scenario.id,
  451. scenario.id,
  452. scenario.id,
  453. scenario.id,
  454. scenario.id,
  455. scenario.id,
  456. scenario.id,
  457. scenario.id,
  458. cycle.id,
  459. cycle.id,
  460. cycle.id,
  461. cycle.id,
  462. sequence.id,
  463. sequence.id,
  464. sequence.id,
  465. task.id,
  466. task.id,
  467. task.id,
  468. task.id,
  469. dn.id,
  470. dn.id,
  471. dn.id,
  472. dn.id,
  473. ]
  474. expected_event_operation_type = [EventOperation.UPDATE] * len(expected_event_types)
  475. assert all(
  476. event.entity_type == expected_event_types[i]
  477. and event.entity_id == expected_event_entity_id[i]
  478. and event.operation == expected_event_operation_type[i]
  479. and event.attribute_name == expected_attribute_names[i]
  480. for i, event in enumerate(published_events)
  481. )
  482. def test_publish_update_event_in_context_manager():
  483. dn_config = Config.configure_data_node("dn_config")
  484. task_config = Config.configure_task("task_config", print, [dn_config])
  485. scenario_config = Config.configure_scenario(
  486. "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
  487. )
  488. scenario_config.add_sequences({"sequence_config": [task_config]})
  489. _, registration_queue = Notifier.register()
  490. scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
  491. cycle = scenario.cycle
  492. task = scenario.tasks[task_config.id]
  493. dn = scenario.data_nodes[dn_config.id]
  494. sequence = scenario.sequences["sequence_config"]
  495. scenario.properties.update({"description": "a scenario"})
  496. assert registration_queue.qsize() == 6
  497. while registration_queue.qsize() > 0:
  498. registration_queue.get()
  499. # Test UPDATE Event in Context Manager
  500. assert registration_queue.qsize() == 0
  501. # If multiple entities is in context, the last to enter will be the first to exit
  502. # So the published event will have the order starting with scenario first and ending with dn
  503. with dn as d, task as t, sequence as s, cycle as c, scenario as sc:
  504. sc.is_primary = True
  505. assert registration_queue.qsize() == 0
  506. tp.set_primary(sc)
  507. assert registration_queue.qsize() == 0
  508. sc.properties["flag"] = "production"
  509. assert registration_queue.qsize() == 0
  510. sc.properties.update({"description": "a scenario"})
  511. assert registration_queue.qsize() == 0
  512. sc.properties.pop("description")
  513. assert registration_queue.qsize() == 0
  514. sc.name = "my_scenario"
  515. assert registration_queue.qsize() == 0
  516. c.name = "another new cycle name"
  517. assert registration_queue.qsize() == 0
  518. c.properties["valid"] = True
  519. assert registration_queue.qsize() == 0
  520. c.properties.update({"re_run_periodically": True})
  521. assert registration_queue.qsize() == 0
  522. s.properties["name"] = "weather_forecast"
  523. assert registration_queue.qsize() == 0
  524. t.skippable = True
  525. assert registration_queue.qsize() == 0
  526. t.properties["number_of_run"] = 2
  527. assert registration_queue.qsize() == 0
  528. t.properties.update({"debug": True})
  529. assert registration_queue.qsize() == 0
  530. d.editor_id = "another new editor id"
  531. assert registration_queue.qsize() == 0
  532. d.properties["sorted"] = True
  533. assert registration_queue.qsize() == 0
  534. d.properties.update({"only_fetch_first_100": True})
  535. assert registration_queue.qsize() == 0
  536. published_events = []
  537. assert registration_queue.qsize() == 16
  538. while registration_queue.qsize() != 0:
  539. published_events.append(registration_queue.get())
  540. expected_event_types = [
  541. EventEntityType.SCENARIO,
  542. EventEntityType.SCENARIO,
  543. EventEntityType.SCENARIO,
  544. EventEntityType.SCENARIO,
  545. EventEntityType.SCENARIO,
  546. EventEntityType.SCENARIO,
  547. EventEntityType.CYCLE,
  548. EventEntityType.CYCLE,
  549. EventEntityType.CYCLE,
  550. EventEntityType.SEQUENCE,
  551. EventEntityType.TASK,
  552. EventEntityType.TASK,
  553. EventEntityType.TASK,
  554. EventEntityType.DATA_NODE,
  555. EventEntityType.DATA_NODE,
  556. EventEntityType.DATA_NODE,
  557. ]
  558. expected_attribute_names = [
  559. "is_primary",
  560. "is_primary",
  561. "properties",
  562. "properties",
  563. "properties",
  564. "properties",
  565. "name",
  566. "properties",
  567. "properties",
  568. "properties",
  569. "skippable",
  570. "properties",
  571. "properties",
  572. "editor_id",
  573. "properties",
  574. "properties",
  575. ]
  576. expected_event_entity_id = [
  577. scenario.id,
  578. scenario.id,
  579. scenario.id,
  580. scenario.id,
  581. scenario.id,
  582. scenario.id,
  583. cycle.id,
  584. cycle.id,
  585. cycle.id,
  586. sequence.id,
  587. task.id,
  588. task.id,
  589. task.id,
  590. dn.id,
  591. dn.id,
  592. dn.id,
  593. ]
  594. assert all(
  595. event.entity_type == expected_event_types[i]
  596. and event.entity_id == expected_event_entity_id[i]
  597. and event.operation == EventOperation.UPDATE
  598. and event.attribute_name == expected_attribute_names[i]
  599. for i, event in enumerate(published_events)
  600. )
  601. def test_publish_submission_event():
  602. _, registration_queue = Notifier.register()
  603. dn_config = Config.configure_data_node("dn_config")
  604. task_config = Config.configure_task("task_config", print, [dn_config])
  605. scenario_config = Config.configure_scenario(
  606. "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
  607. )
  608. scenario_config.add_sequences({"sequence_config": [task_config]})
  609. scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
  610. assert registration_queue.qsize() == 5
  611. while registration_queue.qsize() > 0:
  612. registration_queue.get()
  613. # Test SUBMISSION Event
  614. submission = scenario.submit()
  615. job = submission.jobs[0]
  616. assert registration_queue.qsize() == 6
  617. published_events = []
  618. while registration_queue.qsize() != 0:
  619. published_events.append(registration_queue.get())
  620. expected_operations = [
  621. EventOperation.CREATION,
  622. EventOperation.CREATION,
  623. EventOperation.UPDATE,
  624. EventOperation.UPDATE,
  625. EventOperation.UPDATE,
  626. EventOperation.SUBMISSION,
  627. ]
  628. expected_attribute_names = [None, None, "jobs", "status", "submission_status", None]
  629. expected_event_types = [
  630. EventEntityType.SUBMISSION,
  631. EventEntityType.JOB,
  632. EventEntityType.SUBMISSION,
  633. EventEntityType.JOB,
  634. EventEntityType.SUBMISSION,
  635. EventEntityType.SCENARIO,
  636. ]
  637. expected_event_entity_id = [submission.id, job.id, submission.id, job.id, submission.id, scenario.id]
  638. assert all(
  639. event.entity_type == expected_event_types[i]
  640. and event.entity_id == expected_event_entity_id[i]
  641. and event.operation == expected_operations[i]
  642. and event.attribute_name == expected_attribute_names[i]
  643. for i, event in enumerate(published_events)
  644. )
  645. assert "job_triggered_submission_status_changed" in published_events[4].metadata
  646. assert published_events[4].metadata["job_triggered_submission_status_changed"] == job.id
  647. # Test updating submission_status manually will not add the job_triggered_submission_status_changed
  648. # to the metadata as no job was used to update the submission_status
  649. submission.submission_status = SubmissionStatus.CANCELED
  650. assert registration_queue.qsize() == 1
  651. published_event = registration_queue.get()
  652. assert published_event.entity_type == EventEntityType.SUBMISSION
  653. assert published_event.entity_id == submission.id
  654. assert published_event.operation == EventOperation.UPDATE
  655. assert published_event.attribute_name == "submission_status"
  656. assert "job_triggered_submission_status_changed" not in published_event.metadata
  657. def test_publish_deletion_event():
  658. _, registration_queue = Notifier.register()
  659. dn_config = Config.configure_data_node("dn_config")
  660. task_config = Config.configure_task("task_config", print, [dn_config])
  661. scenario_config = Config.configure_scenario(
  662. "scenario_config", [task_config], frequency=Frequency.DAILY, properties={"flag": "test"}
  663. )
  664. scenario_config.add_sequences({"sequence_config": [task_config]})
  665. scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
  666. cycle = scenario.cycle
  667. task = scenario.tasks[task_config.id]
  668. dn = scenario.data_nodes[dn_config.id]
  669. sequence = scenario.sequences["sequence_config"]
  670. submission = scenario.submit()
  671. job = submission.jobs[0]
  672. assert registration_queue.qsize() == 11
  673. while registration_queue.qsize() > 0:
  674. registration_queue.get()
  675. # Test DELETION Event
  676. tp.delete(scenario.id)
  677. assert registration_queue.qsize() == 7
  678. published_events = []
  679. while registration_queue.qsize() != 0:
  680. published_events.append(registration_queue.get())
  681. expected_event_types = [
  682. EventEntityType.CYCLE,
  683. EventEntityType.SEQUENCE,
  684. EventEntityType.SCENARIO,
  685. EventEntityType.TASK,
  686. EventEntityType.JOB,
  687. EventEntityType.DATA_NODE,
  688. EventEntityType.SUBMISSION,
  689. ]
  690. expected_event_entity_id = [cycle.id, sequence.id, scenario.id, task.id, job.id, dn.id, submission.id]
  691. expected_event_operation_type = [EventOperation.DELETION] * len(expected_event_types)
  692. assert all(
  693. event.entity_type == expected_event_types[i]
  694. and event.entity_id == expected_event_entity_id[i]
  695. and event.operation == expected_event_operation_type[i]
  696. and event.attribute_name is None
  697. for i, event in enumerate(published_events)
  698. )
  699. scenario = _ScenarioManagerFactory._build_manager()._create(scenario_config)
  700. cycle = scenario.cycle
  701. assert registration_queue.qsize() == 5
  702. # only to clear the queue
  703. while registration_queue.qsize() != 0:
  704. registration_queue.get()
  705. tp.clean_all_entities(_VersionManagerFactory._build_manager()._get_latest_version())
  706. assert registration_queue.qsize() == 6
  707. published_events = []
  708. while registration_queue.qsize() != 0:
  709. published_events.append(registration_queue.get())
  710. expected_event_types = [
  711. EventEntityType.JOB,
  712. EventEntityType.SUBMISSION,
  713. EventEntityType.CYCLE,
  714. EventEntityType.SCENARIO,
  715. EventEntityType.TASK,
  716. EventEntityType.DATA_NODE,
  717. ]
  718. expected_event_entity_id = [None, None, cycle.id, scenario.id, None, None]
  719. assert all(
  720. event.entity_type == expected_event_types[i]
  721. and event.entity_id == expected_event_entity_id[i]
  722. and event.operation == EventOperation.DELETION
  723. and event.attribute_name is None
  724. for i, event in enumerate(published_events)
  725. )
  726. def find_registration_and_topics(registration_id: str):
  727. topics = set()
  728. registration = None
  729. for topic, registrations in Notifier._topics_registrations_list.items():
  730. for reg in registrations:
  731. if reg.registration_id == registration_id:
  732. topics.add(topic)
  733. registration = reg
  734. break
  735. return topics, registration
  736. def test_register_no_topic():
  737. assert len(Notifier._topics_registrations_list) == 0
  738. empty_registration = _Registration()
  739. id, queue = Notifier._register_from_registration(empty_registration)
  740. assert id == empty_registration.registration_id
  741. assert queue == empty_registration.queue
  742. assert len(Notifier._topics_registrations_list) == 0
  743. retrieved_topics, retrieved_registration = find_registration_and_topics(id)
  744. assert retrieved_topics == set()
  745. assert retrieved_registration is None
  746. # Unregister the registration
  747. Notifier.unregister(id)
  748. assert len(Notifier._topics_registrations_list) == 0
  749. retrieved_topics, retrieved_registration = find_registration_and_topics(id)
  750. assert retrieved_topics == set()
  751. assert retrieved_registration is None
  752. def test_register_multiple_topics():
  753. assert len(Notifier._topics_registrations_list) == 0
  754. # Registration_0 with 4 topics
  755. registration_0 = _Registration()
  756. registration_0.add_topic(EventEntityType.SCENARIO)
  757. registration_0.add_topic(EventEntityType.TASK, "task_id")
  758. registration_0.add_topic(EventEntityType.DATA_NODE, operation=EventOperation.CREATION)
  759. registration_0.add_topic(operation=EventOperation.DELETION)
  760. id_0, queue_0 = Notifier._register_from_registration(registration_0)
  761. assert id_0 == registration_0.registration_id
  762. assert queue_0 == registration_0.queue
  763. assert len(registration_0.topics) == 4
  764. assert len(Notifier._topics_registrations_list) == 4
  765. for topic in registration_0.topics:
  766. assert topic in Notifier._topics_registrations_list
  767. assert len(Notifier._topics_registrations_list[topic]) == 1
  768. assert registration_0 in Notifier._topics_registrations_list[topic]
  769. retrieved_topics, retrieved_registration = find_registration_and_topics(id_0)
  770. assert retrieved_topics == registration_0.topics
  771. assert retrieved_registration == registration_0
  772. # Registration_1 with 3 common topics and 1 new topic
  773. registration_1 = _Registration()
  774. registration_1.add_topic(EventEntityType.SCENARIO)
  775. registration_1.add_topic(EventEntityType.TASK, "task_id")
  776. registration_1.add_topic(EventEntityType.DATA_NODE, operation=EventOperation.UPDATE)
  777. registration_1.add_topic(operation=EventOperation.DELETION)
  778. id_1, queue_1 = Notifier._register_from_registration(registration_1)
  779. retrieved_topics, retrieved_registration = find_registration_and_topics(id_1)
  780. assert retrieved_topics == registration_1.topics
  781. assert retrieved_registration == registration_1 != registration_0
  782. assert id_1 == registration_1.registration_id != id_0
  783. assert queue_1 == registration_1.queue != queue_0
  784. assert len(registration_1.topics) == 4
  785. assert len(Notifier._topics_registrations_list) == 5
  786. for topic in registration_1.topics:
  787. assert topic in Notifier._topics_registrations_list
  788. if topic.operation == EventOperation.UPDATE:
  789. assert len(Notifier._topics_registrations_list[topic]) == 1
  790. else:
  791. assert len(Notifier._topics_registrations_list[topic]) == 2
  792. assert registration_1 in Notifier._topics_registrations_list[topic]
  793. # Unregister registration_0
  794. Notifier.unregister(id_0)
  795. retrieved_topics, retrieved_registration = find_registration_and_topics(id_0)
  796. assert retrieved_topics == set()
  797. assert retrieved_registration is None
  798. assert len(Notifier._topics_registrations_list) == 4 # The 4 topics from registration_1
  799. for topic in registration_1.topics:
  800. assert topic in Notifier._topics_registrations_list
  801. assert len(Notifier._topics_registrations_list[topic]) == 1
  802. assert registration_1 in Notifier._topics_registrations_list[topic]
  803. # Unregister registration_1
  804. Notifier.unregister(id_1)
  805. retrieved_topics, retrieved_registration = find_registration_and_topics(id_1)
  806. assert retrieved_topics == set()
  807. assert retrieved_registration is None
  808. assert len(Notifier._topics_registrations_list) == 0