test_notifier.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836
  1. # Copyright 2023 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from queue import SimpleQueue
  12. from src.taipy.core import taipy as tp
  13. from src.taipy.core.notification import EventEntityType, EventOperation
  14. from src.taipy.core.notification._topic import _Topic
  15. from src.taipy.core.notification.event import Event
  16. from src.taipy.core.notification.notifier import Notifier
  17. from taipy.config import Config, Frequency
  18. def test_register():
  19. def find_registration_and_topic(registration_id):
  20. for topic, registrations in Notifier._topics_registrations_list.items():
  21. for registration in registrations:
  22. if registration.registration_id == registration_id:
  23. return topic, registration
  24. assert len(Notifier._topics_registrations_list) == 0
  25. registration_id_0, register_queue_0 = Notifier.register()
  26. topic_0, registration_0 = find_registration_and_topic(registration_id_0)
  27. assert isinstance(registration_id_0, str) and registration_id_0 == registration_0.registration_id
  28. assert isinstance(register_queue_0, SimpleQueue)
  29. assert len(Notifier._topics_registrations_list.keys()) == 1
  30. assert len(Notifier._topics_registrations_list[topic_0]) == 1
  31. assert registration_0.queue == register_queue_0
  32. assert register_queue_0 in [registration.queue for registration in Notifier._topics_registrations_list[topic_0]]
  33. registration_id_1, register_queue_1 = Notifier.register()
  34. topic_1, registration_1 = find_registration_and_topic(registration_id_1)
  35. assert isinstance(registration_id_1, str) and registration_id_1 == registration_1.registration_id
  36. assert isinstance(register_queue_1, SimpleQueue)
  37. assert len(Notifier._topics_registrations_list.keys()) == 1
  38. assert len(Notifier._topics_registrations_list[topic_1]) == 2
  39. assert registration_1.queue == register_queue_1
  40. assert register_queue_1 in [registration.queue for registration in Notifier._topics_registrations_list[topic_1]]
  41. registration_id_2, register_queue_2 = Notifier.register(EventEntityType.SCENARIO)
  42. topic_2, registration_2 = find_registration_and_topic(registration_id_2)
  43. assert isinstance(registration_id_2, str) and registration_id_2 == registration_2.registration_id
  44. assert isinstance(register_queue_2, SimpleQueue)
  45. assert len(Notifier._topics_registrations_list.keys()) == 2
  46. assert len(Notifier._topics_registrations_list[topic_2]) == 1
  47. assert registration_2.queue == register_queue_2
  48. assert register_queue_2 in [registration.queue for registration in Notifier._topics_registrations_list[topic_2]]
  49. registration_id_3, register_queue_3 = Notifier.register(EventEntityType.SCENARIO, "scenario_id")
  50. topic_3, registration_3 = find_registration_and_topic(registration_id_3)
  51. assert isinstance(registration_id_3, str) and registration_id_3 == registration_3.registration_id
  52. assert isinstance(register_queue_3, SimpleQueue)
  53. assert len(Notifier._topics_registrations_list.keys()) == 3
  54. assert len(Notifier._topics_registrations_list[topic_3]) == 1
  55. assert registration_3.queue == register_queue_3
  56. assert register_queue_3 in [registration.queue for registration in Notifier._topics_registrations_list[topic_3]]
  57. registration_id_4, register_queue_4 = Notifier.register(
  58. EventEntityType.SEQUENCE, "sequence_id", EventOperation.UPDATE, "tasks"
  59. )
  60. topic_4, registration_4 = find_registration_and_topic(registration_id_4)
  61. assert isinstance(registration_id_4, str) and registration_id_4 == registration_4.registration_id
  62. assert isinstance(register_queue_4, SimpleQueue)
  63. assert len(Notifier._topics_registrations_list.keys()) == 4
  64. assert len(Notifier._topics_registrations_list[topic_4]) == 1
  65. assert registration_4.queue == register_queue_4
  66. assert register_queue_4 in [registration.queue for registration in Notifier._topics_registrations_list[topic_4]]
  67. registration_id_5, register_queue_5 = Notifier.register(EventEntityType.SCENARIO)
  68. topic_5, registration_5 = find_registration_and_topic(registration_id_5)
  69. assert isinstance(registration_id_5, str) and registration_id_5 == registration_5.registration_id
  70. assert isinstance(register_queue_5, SimpleQueue)
  71. assert len(Notifier._topics_registrations_list.keys()) == 4
  72. assert len(Notifier._topics_registrations_list[topic_5]) == 2
  73. assert registration_5.queue == register_queue_5
  74. assert register_queue_5 in [registration.queue for registration in Notifier._topics_registrations_list[topic_5]]
  75. registration_id_6, register_queue_6 = Notifier.register()
  76. assert len(Notifier._topics_registrations_list.keys()) == 4
  77. assert len(Notifier._topics_registrations_list[topic_0]) == 3
  78. Notifier.unregister(registration_id_6)
  79. assert len(Notifier._topics_registrations_list.keys()) == 4
  80. assert len(Notifier._topics_registrations_list[topic_0]) == 2
  81. Notifier.unregister(registration_id_4)
  82. assert len(Notifier._topics_registrations_list.keys()) == 3
  83. assert topic_4 not in Notifier._topics_registrations_list.keys()
  84. Notifier.unregister(registration_id_0)
  85. assert len(Notifier._topics_registrations_list.keys()) == 3
  86. assert len(Notifier._topics_registrations_list[topic_0]) == 1
  87. Notifier.unregister(registration_id_1)
  88. assert len(Notifier._topics_registrations_list.keys()) == 2
  89. assert all(topic not in Notifier._topics_registrations_list.keys() for topic in [topic_0, topic_1])
  90. Notifier.unregister(registration_id_2)
  91. Notifier.unregister(registration_id_3)
  92. Notifier.unregister(registration_id_5)
  93. assert len(Notifier._topics_registrations_list.keys()) == 0
  94. def test_matching():
  95. assert Notifier._is_matching(
  96. Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION), _Topic()
  97. )
  98. assert Notifier._is_matching(
  99. Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
  100. _Topic(EventEntityType.CYCLE),
  101. )
  102. assert Notifier._is_matching(
  103. Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
  104. _Topic(EventEntityType.CYCLE, "cycle_id"),
  105. )
  106. assert Notifier._is_matching(
  107. Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
  108. _Topic(operation=EventOperation.CREATION),
  109. )
  110. assert Notifier._is_matching(
  111. Event(entity_type=EventEntityType.CYCLE, entity_id="cycle_id", operation=EventOperation.CREATION),
  112. _Topic(EventEntityType.CYCLE, "cycle_id", EventOperation.CREATION),
  113. )
  114. assert Notifier._is_matching(
  115. Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
  116. _Topic(),
  117. )
  118. assert Notifier._is_matching(
  119. Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
  120. _Topic(EventEntityType.SCENARIO),
  121. )
  122. assert Notifier._is_matching(
  123. Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
  124. _Topic(
  125. EventEntityType.SCENARIO,
  126. "scenario_id",
  127. ),
  128. )
  129. assert Notifier._is_matching(
  130. Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
  131. _Topic(operation=EventOperation.SUBMISSION),
  132. )
  133. assert Notifier._is_matching(
  134. Event(entity_type=EventEntityType.SCENARIO, entity_id="scenario_id", operation=EventOperation.SUBMISSION),
  135. _Topic(EventEntityType.SCENARIO, "scenario_id", EventOperation.SUBMISSION),
  136. )
  137. assert Notifier._is_matching(
  138. Event(
  139. entity_type=EventEntityType.SEQUENCE,
  140. entity_id="sequence_id",
  141. operation=EventOperation.UPDATE,
  142. attribute_name=r"tasks",
  143. ),
  144. _Topic(),
  145. )
  146. assert Notifier._is_matching(
  147. Event(
  148. entity_type=EventEntityType.SEQUENCE,
  149. entity_id="sequence_id",
  150. operation=EventOperation.UPDATE,
  151. attribute_name="tasks",
  152. ),
  153. _Topic(EventEntityType.SEQUENCE),
  154. )
  155. assert Notifier._is_matching(
  156. Event(
  157. entity_type=EventEntityType.SEQUENCE,
  158. entity_id="sequence_id",
  159. operation=EventOperation.UPDATE,
  160. attribute_name="tasks",
  161. ),
  162. _Topic(
  163. EventEntityType.SEQUENCE,
  164. "sequence_id",
  165. ),
  166. )
  167. assert Notifier._is_matching(
  168. Event(
  169. entity_type=EventEntityType.SEQUENCE,
  170. entity_id="sequence_id",
  171. operation=EventOperation.UPDATE,
  172. attribute_name="tasks",
  173. ),
  174. _Topic(operation=EventOperation.UPDATE),
  175. )
  176. assert Notifier._is_matching(
  177. Event(
  178. entity_type=EventEntityType.SEQUENCE,
  179. entity_id="sequence_id",
  180. operation=EventOperation.UPDATE,
  181. attribute_name="tasks",
  182. ),
  183. _Topic(EventEntityType.SEQUENCE, "sequence_id", EventOperation.UPDATE),
  184. )
  185. assert Notifier._is_matching(
  186. Event(
  187. entity_type=EventEntityType.SEQUENCE,
  188. entity_id="sequence_id",
  189. operation=EventOperation.UPDATE,
  190. attribute_name="tasks",
  191. ),
  192. _Topic(attribute_name="tasks"),
  193. )
  194. assert Notifier._is_matching(
  195. Event(
  196. entity_type=EventEntityType.SEQUENCE,
  197. entity_id="sequence_id",
  198. operation=EventOperation.UPDATE,
  199. attribute_name="tasks",
  200. ),
  201. _Topic(EventEntityType.SEQUENCE, attribute_name="tasks"),
  202. )
  203. assert Notifier._is_matching(
  204. Event(
  205. entity_type=EventEntityType.SEQUENCE,
  206. entity_id="sequence_id",
  207. operation=EventOperation.UPDATE,
  208. attribute_name="tasks",
  209. ),
  210. _Topic(operation=EventOperation.UPDATE, attribute_name="tasks"),
  211. )
  212. assert Notifier._is_matching(
  213. Event(
  214. entity_type=EventEntityType.SEQUENCE,
  215. entity_id="sequence_id",
  216. operation=EventOperation.UPDATE,
  217. attribute_name="tasks",
  218. ),
  219. _Topic(EventEntityType.SEQUENCE, "sequence_id", EventOperation.UPDATE, "tasks"),
  220. )
  221. assert Notifier._is_matching(Event(EventEntityType.TASK, "task_id", EventOperation.DELETION), _Topic())
  222. assert Notifier._is_matching(
  223. Event(EventEntityType.TASK, "task_id", EventOperation.DELETION), _Topic(EventEntityType.TASK)
  224. )
  225. assert Notifier._is_matching(
  226. Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION),
  227. _Topic(
  228. EventEntityType.TASK,
  229. "task_id",
  230. ),
  231. )
  232. assert Notifier._is_matching(
  233. Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION),
  234. _Topic(operation=EventOperation.DELETION),
  235. )
  236. assert Notifier._is_matching(
  237. Event(entity_type=EventEntityType.TASK, entity_id="task_id", operation=EventOperation.DELETION),
  238. _Topic(EventEntityType.TASK, "task_id", EventOperation.DELETION),
  239. )
  240. assert not Notifier._is_matching(
  241. Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.CREATION),
  242. _Topic(EventEntityType.CYCLE),
  243. )
  244. assert not Notifier._is_matching(
  245. Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.CREATION),
  246. _Topic(EventEntityType.SCENARIO, "scenario_id"),
  247. )
  248. assert not Notifier._is_matching(
  249. Event(entity_type=EventEntityType.DATA_NODE, entity_id="dn_id", operation=EventOperation.CREATION),
  250. _Topic(EventEntityType.TASK, "task_id", EventOperation.CREATION),
  251. )
  252. assert not Notifier._is_matching(
  253. Event(entity_type=EventEntityType.JOB, entity_id="job_id", operation=EventOperation.DELETION),
  254. _Topic(EventEntityType.JOB, "job_id", EventOperation.CREATION),
  255. )
  256. assert not Notifier._is_matching(
  257. Event(entity_type=EventEntityType.JOB, entity_id="job_id", operation=EventOperation.DELETION),
  258. _Topic(EventEntityType.JOB, "job_id_1", EventOperation.DELETION),
  259. )
  260. assert not Notifier._is_matching(
  261. Event(
  262. entity_type=EventEntityType.JOB,
  263. entity_id="job_id",
  264. operation=EventOperation.UPDATE,
  265. attribute_name="status",
  266. ),
  267. _Topic(EventEntityType.JOB, "job_id", EventOperation.UPDATE, "submit_id"),
  268. )
  269. assert not Notifier._is_matching(
  270. Event(
  271. entity_type=EventEntityType.JOB,
  272. entity_id="job_id",
  273. operation=EventOperation.UPDATE,
  274. attribute_name="status",
  275. ),
  276. _Topic(operation=EventOperation.UPDATE, attribute_name="submit_id"),
  277. )
  278. def test_publish_creation_event():
  279. _, registration_queue = Notifier.register()
  280. dn_config = Config.configure_data_node("dn_config")
  281. task_config = Config.configure_task("task_config", print, [dn_config])
  282. scenario_config = Config.configure_scenario(
  283. "scenario_config", [task_config], frequency=Frequency.DAILY, flag="test"
  284. )
  285. scenario_config.add_sequences({"sequence_config": [task_config]})
  286. # Test CREATION Event
  287. scenario = tp.create_scenario(scenario_config)
  288. cycle = scenario.cycle
  289. task = scenario.tasks[task_config.id]
  290. dn = scenario.data_nodes[dn_config.id]
  291. sequence = scenario.sequences["sequence_config"]
  292. assert registration_queue.qsize() == 5
  293. published_events = []
  294. while registration_queue.qsize() != 0:
  295. published_events.append(registration_queue.get())
  296. expected_event_types = [
  297. EventEntityType.CYCLE,
  298. EventEntityType.DATA_NODE,
  299. EventEntityType.TASK,
  300. EventEntityType.SEQUENCE,
  301. EventEntityType.SCENARIO,
  302. ]
  303. expected_event_entity_id = [cycle.id, dn.id, task.id, sequence.id, scenario.id]
  304. assert all(
  305. [
  306. event.entity_type == expected_event_types[i]
  307. and event.entity_id == expected_event_entity_id[i]
  308. and event.operation == EventOperation.CREATION
  309. and event.attribute_name is None
  310. for i, event in enumerate(published_events)
  311. ]
  312. )
  313. def test_publish_update_event():
  314. _, registration_queue = Notifier.register()
  315. dn_config = Config.configure_data_node("dn_config")
  316. task_config = Config.configure_task("task_config", print, [dn_config])
  317. scenario_config = Config.configure_scenario(
  318. "scenario_config", [task_config], frequency=Frequency.DAILY, flag="test"
  319. )
  320. scenario_config.add_sequences({"sequence_config": [task_config]})
  321. scenario = tp.create_scenario(scenario_config)
  322. cycle = scenario.cycle
  323. task = scenario.tasks[task_config.id]
  324. dn = scenario.data_nodes[dn_config.id]
  325. sequence = scenario.sequences["sequence_config"]
  326. assert registration_queue.qsize() == 5
  327. while registration_queue.qsize() > 0:
  328. registration_queue.get()
  329. # Test UPDATE Event
  330. scenario.is_primary = False
  331. assert registration_queue.qsize() == 1
  332. tp.set_primary(scenario)
  333. assert registration_queue.qsize() == 2
  334. tp.subscribe_scenario(print, None, scenario=scenario)
  335. assert registration_queue.qsize() == 3
  336. tp.unsubscribe_scenario(print, None, scenario=scenario)
  337. assert registration_queue.qsize() == 4
  338. tp.tag(scenario, "testing")
  339. assert registration_queue.qsize() == 5
  340. tp.untag(scenario, "testing")
  341. assert registration_queue.qsize() == 6
  342. scenario.properties["flag"] = "production"
  343. assert registration_queue.qsize() == 7
  344. scenario.properties.update({"description": "a scenario", "test_mult": True})
  345. assert registration_queue.qsize() == 9
  346. scenario.properties.pop("test_mult")
  347. assert registration_queue.qsize() == 10
  348. scenario.name = "my_scenario"
  349. assert registration_queue.qsize() == 11
  350. cycle.name = "new cycle name"
  351. assert registration_queue.qsize() == 12
  352. cycle.properties["valid"] = True
  353. assert registration_queue.qsize() == 13
  354. cycle.properties.update({"re_run_periodically": True})
  355. assert registration_queue.qsize() == 14
  356. cycle.properties.pop("re_run_periodically")
  357. assert registration_queue.qsize() == 15
  358. sequence.properties["name"] = "weather_forecast"
  359. assert registration_queue.qsize() == 16
  360. tp.subscribe_sequence(print, None, sequence)
  361. assert registration_queue.qsize() == 17
  362. tp.unsubscribe_sequence(print, None, sequence)
  363. assert registration_queue.qsize() == 18
  364. task.skippable = True
  365. assert registration_queue.qsize() == 19
  366. task.properties["number_of_run"] = 2
  367. assert registration_queue.qsize() == 20
  368. task.properties.update({"debug": True})
  369. assert registration_queue.qsize() == 21
  370. task.properties.pop("debug")
  371. assert registration_queue.qsize() == 22
  372. dn.editor_id = "new editor id"
  373. assert registration_queue.qsize() == 23
  374. dn.properties["sorted"] = True
  375. assert registration_queue.qsize() == 24
  376. dn.properties.update({"only_fetch_first_100": True})
  377. assert registration_queue.qsize() == 25
  378. dn.properties.pop("only_fetch_first_100")
  379. assert registration_queue.qsize() == 26
  380. published_events = []
  381. while registration_queue.qsize() != 0:
  382. published_events.append(registration_queue.get())
  383. expected_event_types = [
  384. EventEntityType.SCENARIO,
  385. EventEntityType.SCENARIO,
  386. EventEntityType.SCENARIO,
  387. EventEntityType.SCENARIO,
  388. EventEntityType.SCENARIO,
  389. EventEntityType.SCENARIO,
  390. EventEntityType.SCENARIO,
  391. EventEntityType.SCENARIO,
  392. EventEntityType.SCENARIO,
  393. EventEntityType.SCENARIO,
  394. EventEntityType.SCENARIO,
  395. EventEntityType.CYCLE,
  396. EventEntityType.CYCLE,
  397. EventEntityType.CYCLE,
  398. EventEntityType.CYCLE,
  399. EventEntityType.SEQUENCE,
  400. EventEntityType.SEQUENCE,
  401. EventEntityType.SEQUENCE,
  402. EventEntityType.TASK,
  403. EventEntityType.TASK,
  404. EventEntityType.TASK,
  405. EventEntityType.TASK,
  406. EventEntityType.DATA_NODE,
  407. EventEntityType.DATA_NODE,
  408. EventEntityType.DATA_NODE,
  409. EventEntityType.DATA_NODE,
  410. ]
  411. expected_attribute_names = [
  412. "is_primary",
  413. "is_primary",
  414. "subscribers",
  415. "subscribers",
  416. "tags",
  417. "tags",
  418. "properties",
  419. "properties",
  420. "properties",
  421. "properties",
  422. "properties",
  423. "name",
  424. "properties",
  425. "properties",
  426. "properties",
  427. "properties",
  428. "subscribers",
  429. "subscribers",
  430. "skippable",
  431. "properties",
  432. "properties",
  433. "properties",
  434. "editor_id",
  435. "properties",
  436. "properties",
  437. "properties",
  438. ]
  439. expected_event_entity_id = [
  440. scenario.id,
  441. scenario.id,
  442. scenario.id,
  443. scenario.id,
  444. scenario.id,
  445. scenario.id,
  446. scenario.id,
  447. scenario.id,
  448. scenario.id,
  449. scenario.id,
  450. scenario.id,
  451. cycle.id,
  452. cycle.id,
  453. cycle.id,
  454. cycle.id,
  455. sequence.id,
  456. sequence.id,
  457. sequence.id,
  458. task.id,
  459. task.id,
  460. task.id,
  461. task.id,
  462. dn.id,
  463. dn.id,
  464. dn.id,
  465. dn.id,
  466. ]
  467. expected_event_operation_type = [EventOperation.UPDATE] * len(expected_event_types)
  468. assert all(
  469. [
  470. event.entity_type == expected_event_types[i]
  471. and event.entity_id == expected_event_entity_id[i]
  472. and event.operation == expected_event_operation_type[i]
  473. and event.attribute_name == expected_attribute_names[i]
  474. for i, event in enumerate(published_events)
  475. ]
  476. )
  477. def test_publish_update_event_in_context_manager():
  478. _, registration_queue = Notifier.register()
  479. dn_config = Config.configure_data_node("dn_config")
  480. task_config = Config.configure_task("task_config", print, [dn_config])
  481. scenario_config = Config.configure_scenario(
  482. "scenario_config", [task_config], frequency=Frequency.DAILY, flag="test"
  483. )
  484. scenario_config.add_sequences({"sequence_config": [task_config]})
  485. scenario = tp.create_scenario(scenario_config)
  486. cycle = scenario.cycle
  487. task = scenario.tasks[task_config.id]
  488. dn = scenario.data_nodes[dn_config.id]
  489. sequence = scenario.sequences["sequence_config"]
  490. scenario.properties.update({"description": "a scenario"})
  491. assert registration_queue.qsize() == 6
  492. while registration_queue.qsize() > 0:
  493. registration_queue.get()
  494. # Test UPDATE Event in Context Manager
  495. assert registration_queue.qsize() == 0
  496. # If multiple entities is in context, the last to enter will be the first to exit
  497. # So the published event will have the order starting with scenario first and ending with dn
  498. with dn as d, task as t, sequence as s, cycle as c, scenario as sc:
  499. sc.is_primary = True
  500. assert registration_queue.qsize() == 0
  501. tp.set_primary(sc)
  502. assert registration_queue.qsize() == 0
  503. sc.properties["flag"] = "production"
  504. assert registration_queue.qsize() == 0
  505. sc.properties.update({"description": "a scenario"})
  506. assert registration_queue.qsize() == 0
  507. sc.properties.pop("description")
  508. assert registration_queue.qsize() == 0
  509. sc.name = "my_scenario"
  510. assert registration_queue.qsize() == 0
  511. c.name = "another new cycle name"
  512. assert registration_queue.qsize() == 0
  513. c.properties["valid"] = True
  514. assert registration_queue.qsize() == 0
  515. c.properties.update({"re_run_periodically": True})
  516. assert registration_queue.qsize() == 0
  517. s.properties["name"] = "weather_forecast"
  518. assert registration_queue.qsize() == 0
  519. t.skippable = True
  520. assert registration_queue.qsize() == 0
  521. t.properties["number_of_run"] = 2
  522. assert registration_queue.qsize() == 0
  523. t.properties.update({"debug": True})
  524. assert registration_queue.qsize() == 0
  525. d.editor_id = "another new editor id"
  526. assert registration_queue.qsize() == 0
  527. d.properties["sorted"] = True
  528. assert registration_queue.qsize() == 0
  529. d.properties.update({"only_fetch_first_100": True})
  530. assert registration_queue.qsize() == 0
  531. published_events = []
  532. assert registration_queue.qsize() == 16
  533. while registration_queue.qsize() != 0:
  534. published_events.append(registration_queue.get())
  535. expected_event_types = [
  536. EventEntityType.SCENARIO,
  537. EventEntityType.SCENARIO,
  538. EventEntityType.SCENARIO,
  539. EventEntityType.SCENARIO,
  540. EventEntityType.SCENARIO,
  541. EventEntityType.SCENARIO,
  542. EventEntityType.CYCLE,
  543. EventEntityType.CYCLE,
  544. EventEntityType.CYCLE,
  545. EventEntityType.SEQUENCE,
  546. EventEntityType.TASK,
  547. EventEntityType.TASK,
  548. EventEntityType.TASK,
  549. EventEntityType.DATA_NODE,
  550. EventEntityType.DATA_NODE,
  551. EventEntityType.DATA_NODE,
  552. ]
  553. expected_attribute_names = [
  554. "is_primary",
  555. "is_primary",
  556. "properties",
  557. "properties",
  558. "properties",
  559. "properties",
  560. "name",
  561. "properties",
  562. "properties",
  563. "properties",
  564. "skippable",
  565. "properties",
  566. "properties",
  567. "editor_id",
  568. "properties",
  569. "properties",
  570. ]
  571. expected_event_entity_id = [
  572. scenario.id,
  573. scenario.id,
  574. scenario.id,
  575. scenario.id,
  576. scenario.id,
  577. scenario.id,
  578. cycle.id,
  579. cycle.id,
  580. cycle.id,
  581. sequence.id,
  582. task.id,
  583. task.id,
  584. task.id,
  585. dn.id,
  586. dn.id,
  587. dn.id,
  588. ]
  589. assert all(
  590. [
  591. event.entity_type == expected_event_types[i]
  592. and event.entity_id == expected_event_entity_id[i]
  593. and event.operation == EventOperation.UPDATE
  594. and event.attribute_name == expected_attribute_names[i]
  595. for i, event in enumerate(published_events)
  596. ]
  597. )
  598. def test_publish_submission_event():
  599. _, registration_queue = Notifier.register()
  600. dn_config = Config.configure_data_node("dn_config")
  601. task_config = Config.configure_task("task_config", print, [dn_config])
  602. scenario_config = Config.configure_scenario(
  603. "scenario_config", [task_config], frequency=Frequency.DAILY, flag="test"
  604. )
  605. scenario_config.add_sequences({"sequence_config": [task_config]})
  606. scenario = tp.create_scenario(scenario_config)
  607. assert registration_queue.qsize() == 5
  608. while registration_queue.qsize() > 0:
  609. registration_queue.get()
  610. # Test SUBMISSION Event
  611. job = scenario.submit()[0]
  612. assert registration_queue.qsize() == 6
  613. published_events = []
  614. while registration_queue.qsize() != 0:
  615. published_events.append(registration_queue.get())
  616. expected_operations = [
  617. EventOperation.CREATION,
  618. EventOperation.CREATION,
  619. EventOperation.UPDATE,
  620. EventOperation.UPDATE,
  621. EventOperation.UPDATE,
  622. EventOperation.SUBMISSION,
  623. ]
  624. expected_attribute_names = [None, None, "jobs", "status", "submission_status", None]
  625. expected_event_types = [
  626. EventEntityType.SUBMISSION,
  627. EventEntityType.JOB,
  628. EventEntityType.SUBMISSION,
  629. EventEntityType.JOB,
  630. EventEntityType.SUBMISSION,
  631. EventEntityType.SCENARIO,
  632. ]
  633. expected_event_entity_id = [job.submit_id, job.id, job.submit_id, job.id, job.submit_id, scenario.id]
  634. assert all(
  635. [
  636. event.entity_type == expected_event_types[i]
  637. and event.entity_id == expected_event_entity_id[i]
  638. and event.operation == expected_operations[i]
  639. and event.attribute_name == expected_attribute_names[i]
  640. for i, event in enumerate(published_events)
  641. ]
  642. )
  643. def test_publish_deletion_event():
  644. _, registration_queue = Notifier.register()
  645. dn_config = Config.configure_data_node("dn_config")
  646. task_config = Config.configure_task("task_config", print, [dn_config])
  647. scenario_config = Config.configure_scenario(
  648. "scenario_config", [task_config], frequency=Frequency.DAILY, flag="test"
  649. )
  650. scenario_config.add_sequences({"sequence_config": [task_config]})
  651. scenario = tp.create_scenario(scenario_config)
  652. cycle = scenario.cycle
  653. task = scenario.tasks[task_config.id]
  654. dn = scenario.data_nodes[dn_config.id]
  655. sequence = scenario.sequences["sequence_config"]
  656. job = scenario.submit()[0]
  657. assert registration_queue.qsize() == 11
  658. while registration_queue.qsize() > 0:
  659. registration_queue.get()
  660. # Test DELETION Event
  661. tp.delete(scenario.id)
  662. assert registration_queue.qsize() == 7
  663. published_events = []
  664. while registration_queue.qsize() != 0:
  665. published_events.append(registration_queue.get())
  666. expected_event_types = [
  667. EventEntityType.CYCLE,
  668. EventEntityType.SEQUENCE,
  669. EventEntityType.SCENARIO,
  670. EventEntityType.TASK,
  671. EventEntityType.JOB,
  672. EventEntityType.DATA_NODE,
  673. EventEntityType.SUBMISSION,
  674. ]
  675. expected_event_entity_id = [cycle.id, sequence.id, scenario.id, task.id, job.id, dn.id, job.submit_id]
  676. expected_event_operation_type = [EventOperation.DELETION] * len(expected_event_types)
  677. assert all(
  678. [
  679. event.entity_type == expected_event_types[i]
  680. and event.entity_id == expected_event_entity_id[i]
  681. and event.operation == expected_event_operation_type[i]
  682. and event.attribute_name is None
  683. for i, event in enumerate(published_events)
  684. ]
  685. )
  686. scenario = tp.create_scenario(scenario_config)
  687. cycle = scenario.cycle
  688. assert registration_queue.qsize() == 5
  689. # only to clear the queue
  690. while registration_queue.qsize() != 0:
  691. registration_queue.get()
  692. tp.clean_all_entities_by_version()
  693. assert registration_queue.qsize() == 5
  694. published_events = []
  695. while registration_queue.qsize() != 0:
  696. published_events.append(registration_queue.get())
  697. expected_event_types = [
  698. EventEntityType.JOB,
  699. EventEntityType.CYCLE,
  700. EventEntityType.SCENARIO,
  701. EventEntityType.TASK,
  702. EventEntityType.DATA_NODE,
  703. ]
  704. expected_event_entity_id = [None, cycle.id, scenario.id, None, None]
  705. assert all(
  706. [
  707. event.entity_type == expected_event_types[i]
  708. and event.entity_id == expected_event_entity_id[i]
  709. and event.operation == EventOperation.DELETION
  710. and event.attribute_name is None
  711. for i, event in enumerate(published_events)
  712. ]
  713. )