test_events_published.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. # Copyright 2023 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from dataclasses import dataclass, field
  12. from math import exp
  13. from queue import SimpleQueue
  14. from colorama import init
  15. from taipy.config import Config, Frequency
  16. from taipy.core import taipy as tp
  17. from taipy.core.config import scenario_config
  18. from taipy.core.job.status import Status
  19. from taipy.core.notification.core_event_consumer import CoreEventConsumerBase
  20. from taipy.core.notification.event import Event, EventEntityType, EventOperation
  21. from taipy.core.notification.notifier import Notifier
  22. from tests.core.utils import assert_true_after_time
  23. class Snapshot:
  24. """
  25. A captured snapshot of the recording core events consumer.
  26. """
  27. def __init__(self):
  28. self.collected_events = []
  29. self.entity_type_collected = {}
  30. self.operation_collected = {}
  31. self.attr_name_collected = {}
  32. def capture_event(self, event):
  33. self.collected_events.append(event)
  34. self.entity_type_collected[event.entity_type] = self.entity_type_collected.get(event.entity_type, 0) + 1
  35. self.operation_collected[event.operation] = self.operation_collected.get(event.operation, 0) + 1
  36. if event.attribute_name:
  37. self.attr_name_collected[event.attribute_name] = self.attr_name_collected.get(event.attribute_name, 0) + 1
  38. class RecordingConsumer(CoreEventConsumerBase):
  39. """
  40. A straightforward and no-thread core events consumer that allows to
  41. capture snapshots of received events.
  42. """
  43. def __init__(self, registration_id: str, queue: SimpleQueue):
  44. super().__init__(registration_id, queue)
  45. def capture(self) -> Snapshot:
  46. """
  47. Capture a snapshot of events received between the previous snapshot
  48. (or from the start of this consumer).
  49. """
  50. snapshot = Snapshot()
  51. while not self.queue.empty():
  52. event = self.queue.get()
  53. snapshot.capture_event(event)
  54. return snapshot
  55. def process_event(self, event: Event):
  56. # Nothing to do
  57. pass
  58. def start(self):
  59. # Nothing to do here
  60. pass
  61. def stop(self):
  62. # Nothing to do here either
  63. pass
  64. def identity(x):
  65. return x
  66. def test_events_published_for_scenario_creation():
  67. input_config = Config.configure_data_node("the_input")
  68. output_config = Config.configure_data_node("the_output")
  69. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  70. sc_config = Config.configure_scenario(
  71. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  72. )
  73. register_id_0, register_queue_0 = Notifier.register()
  74. all_evts = RecordingConsumer(register_id_0, register_queue_0)
  75. all_evts.start()
  76. # Create a scenario only trigger 6 creation events (for cycle, data node(x2), task, sequence and scenario)
  77. tp.create_scenario(sc_config)
  78. snapshot = all_evts.capture()
  79. assert len(snapshot.collected_events) == 6
  80. assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 1
  81. assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 2
  82. assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 1
  83. assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 1
  84. assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
  85. assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 6
  86. all_evts.stop()
  87. def test_no_event_published_for_getting_scenario():
  88. input_config = Config.configure_data_node("the_input")
  89. output_config = Config.configure_data_node("the_output")
  90. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  91. sc_config = Config.configure_scenario(
  92. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  93. )
  94. scenario = tp.create_scenario(sc_config)
  95. register_id_0, register_queue_0 = Notifier.register()
  96. all_evts = RecordingConsumer(register_id_0, register_queue_0)
  97. all_evts.start()
  98. # Get all scenarios does not trigger any event
  99. tp.get_scenarios()
  100. snapshot = all_evts.capture()
  101. assert len(snapshot.collected_events) == 0
  102. # Get one scenario does not trigger any event
  103. tp.get(scenario.id)
  104. snapshot = all_evts.capture()
  105. assert len(snapshot.collected_events) == 0
  106. all_evts.stop()
  107. def test_events_published_for_writing_dn():
  108. input_config = Config.configure_data_node("the_input")
  109. output_config = Config.configure_data_node("the_output")
  110. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  111. sc_config = Config.configure_scenario(
  112. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  113. )
  114. scenario = tp.create_scenario(sc_config)
  115. register_id_0, register_queue_0 = Notifier.register()
  116. all_evts = RecordingConsumer(register_id_0, register_queue_0)
  117. all_evts.start()
  118. # Write input manually trigger 4 data node update events
  119. # for last_edit_date, editor_id, editor_expiration_date and edit_in_progress
  120. scenario.the_input.write("test")
  121. snapshot = all_evts.capture()
  122. assert len(snapshot.collected_events) == 4
  123. assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 0
  124. assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 4
  125. assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 0
  126. assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 0
  127. assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 0
  128. assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 0
  129. assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 4
  130. all_evts.stop()
  131. def test_events_published_for_scenario_submission():
  132. input_config = Config.configure_data_node("the_input")
  133. output_config = Config.configure_data_node("the_output")
  134. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  135. sc_config = Config.configure_scenario(
  136. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  137. )
  138. scenario = tp.create_scenario(sc_config)
  139. scenario.the_input.write("test")
  140. register_id_0, register_queue_0 = Notifier.register()
  141. all_evts = RecordingConsumer(register_id_0, register_queue_0)
  142. all_evts.start()
  143. # Submit a scenario triggers:
  144. # 1 scenario submission event
  145. # 7 dn update events (for last_edit_date, editor_id(x2), editor_expiration_date(x2) and edit_in_progress(x2))
  146. # 1 job creation event
  147. # 3 job update events (for status: PENDING, RUNNING and COMPLETED)
  148. # 1 submission creation event
  149. # 1 submission update event for jobs
  150. # 3 submission update events (for status: PENDING, RUNNING and COMPLETED)
  151. scenario.submit()
  152. snapshot = all_evts.capture()
  153. assert len(snapshot.collected_events) == 17
  154. assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 0
  155. assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 7
  156. assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 0
  157. assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 0
  158. assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
  159. assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 4
  160. assert snapshot.entity_type_collected.get(EventEntityType.SUBMISSION, 0) == 5
  161. assert snapshot.operation_collected.get(EventOperation.CREATION, 0) == 2
  162. assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 14
  163. assert snapshot.operation_collected.get(EventOperation.SUBMISSION, 0) == 1
  164. assert snapshot.attr_name_collected["last_edit_date"] == 1
  165. assert snapshot.attr_name_collected["editor_id"] == 2
  166. assert snapshot.attr_name_collected["editor_expiration_date"] == 2
  167. assert snapshot.attr_name_collected["edit_in_progress"] == 2
  168. assert snapshot.attr_name_collected["status"] == 3
  169. assert snapshot.attr_name_collected["jobs"] == 1
  170. assert snapshot.attr_name_collected["submission_status"] == 3
  171. all_evts.stop()
  172. def test_events_published_for_scenario_deletion():
  173. input_config = Config.configure_data_node("the_input")
  174. output_config = Config.configure_data_node("the_output")
  175. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  176. sc_config = Config.configure_scenario(
  177. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  178. )
  179. scenario = tp.create_scenario(sc_config)
  180. scenario.the_input.write("test")
  181. scenario.submit()
  182. register_id_0, register_queue_0 = Notifier.register()
  183. all_evts = RecordingConsumer(register_id_0, register_queue_0)
  184. all_evts.start()
  185. # Delete a scenario trigger 8 deletion events
  186. # 1 scenario deletion event
  187. # 1 cycle deletion event
  188. # 2 dn deletion events (for input and output)
  189. # 1 task deletion event
  190. # 1 sequence deletion event
  191. # 1 job deletion event
  192. # 1 submission deletion event
  193. tp.delete(scenario.id)
  194. snapshot = all_evts.capture()
  195. assert len(snapshot.collected_events) == 8
  196. assert snapshot.entity_type_collected.get(EventEntityType.CYCLE, 0) == 1
  197. assert snapshot.entity_type_collected.get(EventEntityType.DATA_NODE, 0) == 2
  198. assert snapshot.entity_type_collected.get(EventEntityType.TASK, 0) == 1
  199. assert snapshot.entity_type_collected.get(EventEntityType.SEQUENCE, 0) == 1
  200. assert snapshot.entity_type_collected.get(EventEntityType.SCENARIO, 0) == 1
  201. assert snapshot.entity_type_collected.get(EventEntityType.SUBMISSION, 0) == 1
  202. assert snapshot.entity_type_collected.get(EventEntityType.JOB, 0) == 1
  203. assert snapshot.operation_collected.get(EventOperation.UPDATE, 0) == 0
  204. assert snapshot.operation_collected.get(EventOperation.SUBMISSION, 0) == 0
  205. assert snapshot.operation_collected.get(EventOperation.DELETION, 0) == 8
  206. all_evts.stop()
  207. def test_job_events():
  208. input_config = Config.configure_data_node("the_input")
  209. output_config = Config.configure_data_node("the_output")
  210. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  211. sc_config = Config.configure_scenario(
  212. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  213. )
  214. register_id, register_queue = Notifier.register(entity_type=EventEntityType.JOB)
  215. consumer = RecordingConsumer(register_id, register_queue)
  216. consumer.start()
  217. # Create scenario
  218. scenario = tp.create_scenario(sc_config)
  219. snapshot = consumer.capture()
  220. assert len(snapshot.collected_events) == 0
  221. # Submit scenario
  222. scenario.submit()
  223. snapshot = consumer.capture()
  224. # 2 events expected: one for creation, another for status update
  225. assert len(snapshot.collected_events) == 2
  226. assert snapshot.collected_events[0].operation == EventOperation.CREATION
  227. assert snapshot.collected_events[0].entity_type == EventEntityType.JOB
  228. assert snapshot.collected_events[0].metadata.get("task_config_id") == task_config.id
  229. assert snapshot.collected_events[1].operation == EventOperation.UPDATE
  230. assert snapshot.collected_events[1].entity_type == EventEntityType.JOB
  231. assert snapshot.collected_events[1].metadata.get("task_config_id") == task_config.id
  232. assert snapshot.collected_events[1].attribute_name == "status"
  233. assert snapshot.collected_events[1].attribute_value == Status.BLOCKED
  234. job = tp.get_jobs()[0]
  235. tp.cancel_job(job)
  236. snapshot = consumer.capture()
  237. assert len(snapshot.collected_events) == 1
  238. event = snapshot.collected_events[0]
  239. assert event.metadata.get("task_config_id") == task_config.id
  240. assert event.attribute_name == "status"
  241. assert event.attribute_value == Status.CANCELED
  242. consumer.stop()
  243. def test_scenario_events():
  244. input_config = Config.configure_data_node("the_input")
  245. output_config = Config.configure_data_node("the_output")
  246. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  247. sc_config = Config.configure_scenario(
  248. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  249. )
  250. register_id, register_queue = Notifier.register(entity_type=EventEntityType.SCENARIO)
  251. consumer = RecordingConsumer(register_id, register_queue)
  252. consumer.start()
  253. scenario = tp.create_scenario(sc_config)
  254. snapshot = consumer.capture()
  255. assert len(snapshot.collected_events) == 1
  256. assert snapshot.collected_events[0].operation == EventOperation.CREATION
  257. assert snapshot.collected_events[0].entity_type == EventEntityType.SCENARIO
  258. assert snapshot.collected_events[0].metadata.get("config_id") == scenario.config_id
  259. scenario.submit()
  260. snapshot = consumer.capture()
  261. assert len(snapshot.collected_events) == 1
  262. assert snapshot.collected_events[0].operation == EventOperation.SUBMISSION
  263. assert snapshot.collected_events[0].entity_type == EventEntityType.SCENARIO
  264. assert snapshot.collected_events[0].metadata.get("config_id") == scenario.config_id
  265. # Delete scenario
  266. tp.delete(scenario.id)
  267. snapshot = consumer.capture()
  268. assert len(snapshot.collected_events) == 1
  269. assert snapshot.collected_events[0].operation == EventOperation.DELETION
  270. assert snapshot.collected_events[0].entity_type == EventEntityType.SCENARIO
  271. consumer.stop()
  272. def test_data_node_events():
  273. input_config = Config.configure_data_node("the_input")
  274. output_config = Config.configure_data_node("the_output")
  275. task_config = Config.configure_task("the_task", identity, input=input_config, output=output_config)
  276. sc_config = Config.configure_scenario(
  277. "the_scenario", task_configs=[task_config], frequency=Frequency.DAILY, sequences={"the_seq": [task_config]}
  278. )
  279. register_id, register_queue = Notifier.register(entity_type=EventEntityType.DATA_NODE)
  280. consumer = RecordingConsumer(register_id, register_queue)
  281. consumer.start()
  282. scenario = tp.create_scenario(sc_config)
  283. snapshot = consumer.capture()
  284. # We expect two creation events since we have two data nodes:
  285. assert len(snapshot.collected_events) == 2
  286. assert snapshot.collected_events[0].operation == EventOperation.CREATION
  287. assert snapshot.collected_events[0].entity_type == EventEntityType.DATA_NODE
  288. assert snapshot.collected_events[0].metadata.get("config_id") in [output_config.id, input_config.id]
  289. assert snapshot.collected_events[1].operation == EventOperation.CREATION
  290. assert snapshot.collected_events[1].entity_type == EventEntityType.DATA_NODE
  291. assert snapshot.collected_events[1].metadata.get("config_id") in [output_config.id, input_config.id]
  292. # Delete scenario
  293. tp.delete(scenario.id)
  294. snapshot = consumer.capture()
  295. assert len(snapshot.collected_events) == 2
  296. assert snapshot.collected_events[0].operation == EventOperation.DELETION
  297. assert snapshot.collected_events[0].entity_type == EventEntityType.DATA_NODE
  298. assert snapshot.collected_events[1].operation == EventOperation.DELETION
  299. assert snapshot.collected_events[1].entity_type == EventEntityType.DATA_NODE
  300. consumer.stop()