test_events_published.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from queue import SimpleQueue
  12. from typing import Any, Dict, List
  13. import pytest
  14. from taipy import Orchestrator
  15. from taipy.common.config import Config
  16. from taipy.core import taipy as tp
  17. from taipy.core.common.frequency import Frequency
  18. from taipy.core.job.status import Status
  19. from taipy.core.notification._core_event_consumer import _CoreEventConsumerBase
  20. from taipy.core.notification.event import Event, EventEntityType, EventOperation
  21. from taipy.core.notification.notifier import Notifier
  22. from taipy.core.scenario._scenario_manager_factory import _ScenarioManagerFactory
  23. class Snapshot:
  24. """
  25. A captured snapshot of the recording core events consumer.
  26. """
  27. def __init__(self) -> None:
  28. self.collected_events: List[Event] = []
  29. self.entity_type_collected: Dict[EventEntityType, int] = {}
  30. self.operation_collected: Dict[EventEntityType, int] = {}
  31. self.attr_name_collected: Dict[EventEntityType, int] = {}
  32. self.attr_value_collected: Dict[EventEntityType, List[Any]] = {}
  33. def capture_event(self, event):
  34. self.collected_events.append(event)
  35. self.entity_type_collected[event.entity_type] = self.entity_type_collected.get(event.entity_type, 0) + 1
  36. self.operation_collected[event.operation] = self.operation_collected.get(event.operation, 0) + 1
  37. if event.attribute_name:
  38. self.attr_name_collected[event.attribute_name] = self.attr_name_collected.get(event.attribute_name, 0) + 1
  39. if self.attr_value_collected.get(event.attribute_name, None):
  40. self.attr_value_collected[event.attribute_name].append(event.attribute_value)
  41. else:
  42. self.attr_value_collected[event.attribute_name] = [event.attribute_value]
  43. class RecordingConsumer(_CoreEventConsumerBase):
  44. """
  45. A straightforward and no-thread core events consumer that allows to
  46. capture snapshots of received events.
  47. """
  48. def __init__(self, registration_id: str, queue: SimpleQueue):
  49. super().__init__(registration_id, queue)
  50. def capture(self) -> Snapshot:
  51. """
  52. Capture a snapshot of events received between the previous snapshot
  53. (or from the start of this consumer).
  54. """
  55. snapshot = Snapshot()
  56. while not self.queue.empty():
  57. event = self.queue.get()
  58. snapshot.capture_event(event)
  59. return snapshot
  60. def process_event(self, event: Event):
  61. # Nothing to do
  62. pass
  63. def start(self):
  64. # Nothing to do here
  65. pass
  66. def stop(self):
  67. # Nothing to do here either
  68. pass
  69. def identity(x):
  70. return x
  71. def test_events_published_for_scenario_creation():
  72. input_config = Config.configure_data_node("the_input")
  73. output_config = Config.configure_data_node("the_output")
  74. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  75. sc_config = Config.configure_scenario(
  76. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  77. )
  78. register_id_0, register_queue_0 = Notifier.register()
  79. all_evts = RecordingConsumer(register_id_0, register_queue_0)
  80. all_evts.start()
  81. # Create a scenario via the manager
  82. # should only trigger 6 creation events (for cycle, data node(x2), task, sequence and scenario)
  83. _ScenarioManagerFactory._build_manager()._create(sc_config)
  84. snapshot = all_evts.capture()
  85. assert len(snapshot.collected_events) == 6
  86. assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 1
  87. assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 2
  88. assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 1
  89. assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 1
  90. assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
  91. assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 6
  92. all_evts.stop()
  93. def test_no_event_published_for_getting_scenario():
  94. input_config = Config.configure_data_node("the_input")
  95. output_config = Config.configure_data_node("the_output")
  96. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  97. sc_config = Config.configure_scenario(
  98. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  99. )
  100. scenario = tp.create_scenario(sc_config)
  101. register_id_0, register_queue_0 = Notifier.register()
  102. all_evts = RecordingConsumer(register_id_0, register_queue_0)
  103. all_evts.start()
  104. # Get all scenarios does not trigger any event
  105. tp.get_scenarios()
  106. snapshot = all_evts.capture()
  107. assert len(snapshot.collected_events) == 0
  108. # Get one scenario does not trigger any event
  109. tp.get(scenario.id)
  110. snapshot = all_evts.capture()
  111. assert len(snapshot.collected_events) == 0
  112. all_evts.stop()
  113. def test_events_published_for_writing_dn():
  114. input_config = Config.configure_data_node("the_input")
  115. output_config = Config.configure_data_node("the_output")
  116. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  117. sc_config = Config.configure_scenario(
  118. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  119. )
  120. scenario = tp.create_scenario(sc_config)
  121. register_id_0, register_queue_0 = Notifier.register()
  122. all_evts = RecordingConsumer(register_id_0, register_queue_0)
  123. all_evts.start()
  124. # Write input manually trigger 5 data node update events
  125. # for last_edit_date, editor_id, editor_expiration_date, edit_in_progress and edits
  126. scenario.the_input.write("test")
  127. snapshot = all_evts.capture()
  128. assert len(snapshot.collected_events) == 5
  129. assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 5
  130. assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 5
  131. all_evts.stop()
  132. @pytest.mark.parametrize("standalone", [False, True])
  133. def test_events_published_for_scenario_submission(standalone):
  134. if standalone:
  135. Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)
  136. input_config = Config.configure_data_node("the_input")
  137. output_config = Config.configure_data_node("the_output")
  138. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  139. sc_config = Config.configure_scenario(
  140. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  141. )
  142. scenario = tp.create_scenario(sc_config)
  143. scenario.the_input.write("test")
  144. register_id_0, register_queue_0 = Notifier.register()
  145. all_evts = RecordingConsumer(register_id_0, register_queue_0)
  146. all_evts.start()
  147. # Submit a scenario triggers:
  148. # 1 scenario submission event
  149. # 7 dn update events (for last_edit_date, editor_id(x2), editor_expiration_date(x2) and edit_in_progress(x2))
  150. # 1 job creation event
  151. # 3 job update events (for status: PENDING, RUNNING and COMPLETED)
  152. # 1 submission creation event
  153. # 1 submission update event for jobs
  154. # 3 submission update events (for status: PENDING, RUNNING and COMPLETED)
  155. # 1 submission update event for is_completed
  156. if standalone:
  157. Orchestrator().run()
  158. scenario.submit(wait=True)
  159. else:
  160. scenario.submit()
  161. snapshot = all_evts.capture()
  162. assert len(snapshot.collected_events) == 18
  163. assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 0
  164. assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 8
  165. assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 0
  166. assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 0
  167. assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
  168. assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 4
  169. assert snapshot.entity_type_collected.get(EventEntityType.SUBMISSION, 0) == 5
  170. assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 2
  171. assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 15
  172. assert snapshot.operation_collected.get(EventOperation.SUBMISSION, 0) == 1
  173. assert snapshot.attr_name_collected["last_edit_date"] == 1
  174. assert snapshot.attr_name_collected["editor_id"] == 2
  175. assert snapshot.attr_name_collected["editor_expiration_date"] == 2
  176. assert snapshot.attr_name_collected["edit_in_progress"] == 2
  177. assert snapshot.attr_name_collected["edits"] == 1
  178. assert snapshot.attr_name_collected["status"] == 3
  179. assert snapshot.attr_name_collected["jobs"] == 1
  180. assert snapshot.attr_name_collected["submission_status"] == 3
  181. all_evts.stop()
  182. def test_events_published_for_scenario_deletion():
  183. input_config = Config.configure_data_node("the_input")
  184. output_config = Config.configure_data_node("the_output")
  185. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  186. sc_config = Config.configure_scenario(
  187. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  188. )
  189. scenario = tp.create_scenario(sc_config)
  190. scenario.the_input.write("test")
  191. scenario.submit()
  192. register_id_0, register_queue_0 = Notifier.register()
  193. all_evts = RecordingConsumer(register_id_0, register_queue_0)
  194. all_evts.start()
  195. # Delete a scenario trigger 8 deletion events
  196. # 1 scenario deletion event
  197. # 1 cycle deletion event
  198. # 2 dn deletion events (for input and output)
  199. # 1 task deletion event
  200. # 1 sequence deletion event
  201. # 1 job deletion event
  202. # 1 submission deletion event
  203. tp.delete(scenario.id)
  204. snapshot = all_evts.capture()
  205. assert len(snapshot.collected_events) == 8
  206. assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 1
  207. assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 2
  208. assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 1
  209. assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 1
  210. assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
  211. assert snapshot.entity_type_collected.get(EventEntityType.SUBMISSION, 0) == 1
  212. assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 1
  213. assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 0
  214. assert snapshot.operation_collected.get(EventOperation.SUBMISSION, 0) == 0
  215. assert snapshot.operation_collected.get(EventOperation.DELETION, 0) == 8
  216. all_evts.stop()
  217. def test_job_events():
  218. input_config = Config.configure_data_node("the_input")
  219. output_config = Config.configure_data_node("the_output")
  220. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  221. sc_config = Config.configure_scenario(
  222. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  223. )
  224. register_id, register_queue = Notifier.register(entity_type=EventEntityType.JOB)
  225. consumer = RecordingConsumer(register_id, register_queue)
  226. consumer.start()
  227. # Create scenario
  228. scenario = _ScenarioManagerFactory._build_manager()._create(sc_config)
  229. snapshot = consumer.capture()
  230. assert len(snapshot.collected_events) == 0
  231. # Submit scenario
  232. scenario.submit()
  233. snapshot = consumer.capture()
  234. # 2 events expected: one for creation, another for status update
  235. assert len(snapshot.collected_events) == 2
  236. assert snapshot.collected_events[0].operation == EventOperation.CREATION
  237. assert snapshot.collected_events[0].entity_type == EventEntityType.JOB
  238. assert snapshot.collected_events[0].metadata.get("task_config_id") == task_config.id
  239. assert snapshot.collected_events[1].operation == EventOperation.UPDATE
  240. assert snapshot.collected_events[1].entity_type == EventEntityType.JOB
  241. assert snapshot.collected_events[1].metadata.get("task_config_id") == task_config.id
  242. assert snapshot.collected_events[1].attribute_name == "status"
  243. assert snapshot.collected_events[1].attribute_value == Status.BLOCKED
  244. job = tp.get_jobs()[0]
  245. tp.cancel_job(job)
  246. snapshot = consumer.capture()
  247. assert len(snapshot.collected_events) == 1
  248. event = snapshot.collected_events[0]
  249. assert event.metadata.get("task_config_id") == task_config.id
  250. assert event.attribute_name == "status"
  251. assert event.attribute_value == Status.CANCELED
  252. consumer.stop()
  253. def test_scenario_events():
  254. input_config = Config.configure_data_node("the_input")
  255. output_config = Config.configure_data_node("the_output")
  256. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  257. sc_config = Config.configure_scenario(
  258. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  259. )
  260. register_id, register_queue = Notifier.register(entity_type=EventEntityType.SCENARIO)
  261. consumer = RecordingConsumer(register_id, register_queue)
  262. consumer.start()
  263. scenario = tp.create_scenario(sc_config)
  264. snapshot = consumer.capture()
  265. assert len(snapshot.collected_events) == 1
  266. assert snapshot.collected_events[0].operation == EventOperation.CREATION
  267. assert snapshot.collected_events[0].entity_type == EventEntityType.SCENARIO
  268. assert snapshot.collected_events[0].metadata.get("config_id") == scenario.config_id
  269. scenario.submit()
  270. snapshot = consumer.capture()
  271. assert len(snapshot.collected_events) == 1
  272. assert snapshot.collected_events[0].operation == EventOperation.SUBMISSION
  273. assert snapshot.collected_events[0].entity_type == EventEntityType.SCENARIO
  274. assert snapshot.collected_events[0].metadata.get("config_id") == scenario.config_id
  275. # Delete scenario
  276. tp.delete(scenario.id)
  277. snapshot = consumer.capture()
  278. assert len(snapshot.collected_events) == 1
  279. assert snapshot.collected_events[0].operation == EventOperation.DELETION
  280. assert snapshot.collected_events[0].entity_type == EventEntityType.SCENARIO
  281. consumer.stop()
  282. def test_data_node_events():
  283. input_config = Config.configure_data_node("the_input")
  284. output_config = Config.configure_data_node("the_output")
  285. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  286. sc_config = Config.configure_scenario(
  287. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  288. )
  289. register_id, register_queue = Notifier.register(entity_type=EventEntityType.DATA_NODE)
  290. consumer = RecordingConsumer(register_id, register_queue)
  291. consumer.start()
  292. scenario = _ScenarioManagerFactory._build_manager()._create(sc_config)
  293. snapshot = consumer.capture()
  294. # We expect two creation events since we have two data nodes:
  295. assert len(snapshot.collected_events) == 2
  296. assert snapshot.collected_events[0].operation == EventOperation.CREATION
  297. assert snapshot.collected_events[0].entity_type == EventEntityType.DATA_NODE
  298. assert snapshot.collected_events[0].metadata.get("config_id") in [output_config.id, input_config.id]
  299. assert snapshot.collected_events[1].operation == EventOperation.CREATION
  300. assert snapshot.collected_events[1].entity_type == EventEntityType.DATA_NODE
  301. assert snapshot.collected_events[1].metadata.get("config_id") in [output_config.id, input_config.id]
  302. # Delete scenario
  303. tp.delete(scenario.id)
  304. snapshot = consumer.capture()
  305. assert len(snapshot.collected_events) == 2
  306. assert snapshot.collected_events[0].operation == EventOperation.DELETION
  307. assert snapshot.collected_events[0].entity_type == EventEntityType.DATA_NODE
  308. assert snapshot.collected_events[1].operation == EventOperation.DELETION
  309. assert snapshot.collected_events[1].entity_type == EventEntityType.DATA_NODE
  310. consumer.stop()