test_consumer__process_event.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from typing import Any, Dict, List, Optional
  12. from unittest import mock
  13. from taipy import Gui, Scenario
  14. from taipy.core.notification import Event, EventEntityType, EventOperation, _Topic
  15. from taipy.event._event_callback import _Callback
  16. from taipy.event.event_consumer import GuiEventConsumer
  17. collector: Dict[str, Any] = {"cb_0": 0, "cb_1": 0, "cb_2": 0, "cb_3": 0, "cb_for_state": 0,
  18. "cb_scenario_creation": 0, "cb_scenario_creation_with_state": 0}
  19. args_collector: Dict[str, List] = {}
  20. def init_collector():
  21. return {"cb_0": 0, "cb_1": 0, "cb_2": 0, "cb_3": 0, "cb_for_state": 0,
  22. "cb_scenario_creation": 0, "cb_scenario_creation_with_state": 0}, {}
  23. def cb_0(event: Event, gui: Optional[Gui], extra:str):
  24. collector["cb_0"]+=1
  25. if not args_collector.get("cb_0"):
  26. args_collector["cb_0"] = [extra]
  27. else:
  28. args_collector["cb_0"].append(extra)
  29. print(f"event created at {event.creation_date} triggered callback cb_0.") # noqa: T201
  30. def cb_1(event: Event, gui: Optional[Gui]):
  31. collector["cb_1"]+=1
  32. print(f"event created at {event.creation_date} triggered callback cb_1.") # noqa: T201
  33. def cb_2(event: Event, gui: Gui,):
  34. collector["cb_2"]+=1
  35. print(f"event created at {event.creation_date} triggered callback cb_2.") # noqa: T201
  36. def cb_3(event: Event, gui: Gui, ):
  37. collector["cb_3"]+=1
  38. print(f"event created at {event.creation_date} triggered callback cb_3.") # noqa: T201
  39. def cb_for_state(state, event: Event):
  40. collector["cb_for_state"]+=1
  41. print(f"event created at {event.creation_date} triggered callback cb_for_state.") # noqa: T201
  42. def cb_scenario_creation(event: Event, scenario: Scenario, gui: Gui, extra_arg: str):
  43. collector["cb_scenario_creation"]+=1
  44. print(f"scenario {scenario.id} created at {event.creation_date} with {extra_arg}.") # noqa: T201
  45. def cb_scenario_creation_with_state(state, event: Event, scenario: Scenario, extra_arg: str):
  46. collector["cb_scenario_creation_with_state"]+=1
  47. print(f"scenario {scenario.id} created at {event.creation_date} with {extra_arg}.") # noqa: T201
  48. def test_process_event(scenario):
  49. global collector
  50. global args_collector
  51. consumer = GuiEventConsumer()
  52. consumer.on_event(callback=cb_0, callback_args=["foo"])
  53. consumer.on_event(callback=cb_1, entity_type=EventEntityType.SCENARIO)
  54. consumer.on_event(callback=cb_2, entity_type=EventEntityType.SCENARIO, entity_id="bar")
  55. consumer.on_event(callback=cb_3, operation=EventOperation.CREATION)
  56. consumer.on_event(callback=cb_0, callback_args=["baz"], operation=EventOperation.CREATION)
  57. consumer.on_event(callback=cb_1, entity_type=EventEntityType.SEQUENCE, operation=EventOperation.SUBMISSION)
  58. consumer.on_event(callback=cb_1, entity_type=EventEntityType.JOB,
  59. operation=EventOperation.UPDATE, attribute_name="status")
  60. collector, args_collector = init_collector()
  61. event_1 = Event(
  62. entity_type=EventEntityType.SCENARIO,
  63. operation=EventOperation.CREATION,
  64. entity_id="bar",
  65. attribute_name=None,
  66. attribute_value=None,
  67. metadata={},
  68. )
  69. consumer.process_event(event_1)
  70. assert collector["cb_0"] == 2
  71. assert collector["cb_1"] == 1
  72. assert collector["cb_2"] == 1
  73. assert collector["cb_3"] == 1
  74. collector, args_collector = init_collector()
  75. event_2 = Event(
  76. entity_type=EventEntityType.SEQUENCE,
  77. operation=EventOperation.SUBMISSION,
  78. entity_id="quux",
  79. attribute_name=None,
  80. attribute_value=None,
  81. metadata={},
  82. )
  83. consumer.process_event(event_2)
  84. assert collector["cb_0"] == 1
  85. assert collector["cb_1"] == 1
  86. assert collector["cb_2"] == 0
  87. assert collector["cb_3"] == 0
  88. collector, args_collector = init_collector()
  89. collector, args_collector = init_collector()
  90. event_3 = Event(
  91. entity_type=EventEntityType.JOB,
  92. operation=EventOperation.UPDATE,
  93. entity_id="corge",
  94. attribute_name="status",
  95. attribute_value="COMPLETED",
  96. metadata={},
  97. )
  98. consumer.process_event(event_3)
  99. assert collector["cb_0"] == 1
  100. assert collector["cb_1"] == 1
  101. assert collector["cb_2"] == 0
  102. assert collector["cb_3"] == 0
  103. collector, args_collector = init_collector()
  104. def test_process_event_with_state():
  105. consumer = GuiEventConsumer(gui=Gui())
  106. consumer.broadcast_on_event(callback=cb_for_state)
  107. event_1 = Event(
  108. entity_type=EventEntityType.SCENARIO,
  109. operation=EventOperation.CREATION,
  110. entity_id="foo",
  111. attribute_name=None,
  112. attribute_value=None,
  113. metadata={},
  114. )
  115. with mock.patch("taipy.Gui.broadcast_callback") as mock_broadcast:
  116. consumer.process_event(event_1)
  117. mock_broadcast.assert_called_once_with(cb_for_state, [event_1])
  118. def test_process_event_with_filter():
  119. global collector
  120. global args_collector
  121. def filt(event: Event) -> bool:
  122. return event.metadata.get("foo") == "bar"
  123. consumer = GuiEventConsumer()
  124. consumer.on_event(callback=cb_0,
  125. callback_args=["foo"],
  126. entity_type=EventEntityType.SCENARIO,
  127. operation=EventOperation.CREATION,
  128. filter=filt)
  129. topic = _Topic(entity_type=EventEntityType.SCENARIO, operation=EventOperation.CREATION)
  130. assert len(consumer._topic_callbacks_map) == 1
  131. assert consumer._topic_callbacks_map[topic] == [_Callback(cb_0, ["foo"], False, filt)]
  132. collector, args_collector = init_collector()
  133. event_matching_filter = Event(
  134. entity_type=EventEntityType.SCENARIO,
  135. operation=EventOperation.CREATION,
  136. metadata={"foo": "bar"},
  137. )
  138. consumer.process_event(event_matching_filter)
  139. assert collector["cb_0"] == 1
  140. collector, args_collector = init_collector()
  141. event_not_matching_filter = Event(
  142. entity_type=EventEntityType.SCENARIO,
  143. operation=EventOperation.CREATION,
  144. metadata={"baz": "qux"},
  145. )
  146. consumer.process_event(event_not_matching_filter)
  147. assert collector["cb_0"] == 0
  148. collector, args_collector = init_collector()
  149. def test_process_event_with_predefined_args(scenario):
  150. global collector
  151. global args_collector
  152. consumer = GuiEventConsumer()
  153. consumer.on_event(callback=cb_scenario_creation, callback_args=["foo"])
  154. collector, args_collector = init_collector()
  155. event = Event(
  156. entity_type=EventEntityType.SCENARIO,
  157. operation=EventOperation.CREATION,
  158. entity_id="foo",
  159. attribute_name=None,
  160. attribute_value=None,
  161. metadata={"predefined_args": [scenario]},
  162. )
  163. consumer.process_event(event)
  164. assert collector["cb_scenario_creation"] == 1
  165. collector, args_collector = init_collector()
  166. def test_process_event_with_predefined_args_and_state(scenario):
  167. consumer = GuiEventConsumer(Gui())
  168. consumer.broadcast_on_event(callback=cb_scenario_creation_with_state, callback_args=["foo"])
  169. event = Event(
  170. entity_type=EventEntityType.SCENARIO,
  171. operation=EventOperation.CREATION,
  172. entity_id="foo",
  173. attribute_name=None,
  174. attribute_value=None,
  175. metadata={"predefined_args": [scenario]},
  176. )
  177. with mock.patch("taipy.Gui.broadcast_callback") as mock_broadcast:
  178. consumer.process_event(event)
  179. mock_broadcast.assert_called_once_with(cb_scenario_creation_with_state, [event, scenario, "foo"])