event_processor.py 55 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. from typing import Callable, Dict, List, Optional, Union
  12. from taipy import DataNode, Gui, Scenario, Submission, SubmissionStatus
  13. from taipy.common._check_dependencies import EnterpriseEditionUtils
  14. from taipy.common.logger._taipy_logger import _TaipyLogger
  15. from taipy.core.common._utils import _load_fct
  16. from taipy.core.config import DataNodeConfig, ScenarioConfig, TaskConfig
  17. from taipy.core.notification import (
  18. Event,
  19. EventEntityType,
  20. EventOperation,
  21. Notifier,
  22. _Registration,
  23. _Topic,
  24. )
  25. from taipy.core.notification._core_event_consumer import _CoreEventConsumerBase
  26. from taipy.event._event_callback import _Callback
  27. from taipy.event._event_processor import _AbstractEventProcessor, _EventProcessor
  28. from taipy.event.exceptions.exceptions import NoGuiDefinedInEventProcessor
  29. class EventProcessor(_CoreEventConsumerBase):
  30. """The Taipy event processor service.
  31. This service listens for events in a Taipy application and triggers callback
  32. executions when events matching specific topics are produced. The service handle
  33. both cases where callbacks are broadcast to all states or executed once on the
  34. server side.
  35. The main method to use is `on_event()`, that registers a callback to a topic.
  36. Before starting the event processor service, register each callback to a topic.
  37. The topics are defined by the entity type, the entity id, the operation, and the
  38. attribute name of the events. If an event matching the provided topic is produced,
  39. the callback execution is triggered.
  40. For more information about the event attributes please refer to the `Event^` class.
  41. !!! note "Filters"
  42. For each registered callback, you can specify a custom filter function in addition
  43. to the topic. This is mostly useful when your filter is more complex than the
  44. topic. The filter must accept an event as the only argument and return a
  45. boolean. If the filter returns False on an event, the callback is not triggered.
  46. See an example below.
  47. !!! note "Callback extra arguments"
  48. For each registered callback, you can also specify extra arguments to be passed to
  49. the callback function in addition to the event. The extra arguments must be provided
  50. as a list of values.
  51. !!! note "Broadcast a callback to all states"
  52. When registering a callback, you can specify if the callback is automatically
  53. broadcast to all states. In this case, the first argument of the callback must be
  54. the state otherwise it is the `Gui^` instance. The second argument is the event.
  55. Optionally, the callback can accept more extra arguments (see the `callback_args`
  56. argument).
  57. !!! example
  58. === "One callback to match all events"
  59. ```python
  60. from taipy import Event, EventProcessor, Gui
  61. def event_received(gui: Gui, event: Event):
  62. print(f"Received event created at : {event.creation_date}")
  63. if __name__ == "__main__":
  64. event_processor = EventProcessor()
  65. event_processor.on_event(callback=event_received)
  66. event_processor.start()
  67. ```
  68. === "Two callbacks to match different topics"
  69. ```python
  70. from taipy import Event, EventProcessor, Gui
  71. def on_entity_creation(event: Event, gui: Gui):
  72. print(f" {event.entity_type} entity created at {event.creation_date}")
  73. def on_scenario(event: Event, gui: Gui):
  74. print(f"Scenario '{event.entity_id}' processed for a '{event.operation}' operation.")
  75. if __name__ == "__main__":
  76. event_processor = EventProcessor()
  77. event_processor.on_event(callback=on_entity_creation, operation=EventOperation.CREATION)
  78. event_processor.on_event(callback=scenario_event, entity_type=EventEntityType.SCENARIO)
  79. event_processor.start()
  80. ```
  81. === "Callbacks to be broadcast to all states"
  82. ```python
  83. import taipy as tp
  84. from taipy import Event, EventProcessor, Gui
  85. def event_received(state, event: Event):
  86. scenario = tp.get(event.entity_id)
  87. print(f"Received event created at : {event.creation_date} for scenario '{scenario.name}'.")
  88. if __name__ == "__main__":
  89. gui = Gui()
  90. event_processor = EventProcessor(gui)
  91. event_processor.broadcast_on_event(callback=event_received)
  92. event_processor.start()
  93. taipy.run(gui)
  94. ```
  95. === "Two callbacks for scenario creations"
  96. ```python
  97. import taipy as tp
  98. from taipy import Event, EventProcessor, Gui, State
  99. def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
  100. print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
  101. def store_latest_scenario(state: State, event: Event, scenario: Scenario):
  102. print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
  103. state.latest_scenario = scenario
  104. if __name__ == "__main__":
  105. gui = Gui()
  106. event_processor = EventProcessor(gui)
  107. event_processor.on_scenario_created(callback=print_scenario_created)
  108. event_processor.broadcast_on_scenario_created(callback=store_latest_scenario)
  109. event_processor.start()
  110. ...
  111. taipy.run(gui)
  112. ```
  113. === "With specific filters"
  114. ```python
  115. import taipy as tp
  116. from taipy import Event, EventProcessor, Gui
  117. def cycle_filter(event: Event, gui: Gui):
  118. scenario = tp.get(event.entity_id)
  119. return scenario.cycle.name == "2023"
  120. def event_received(state, event: Event):
  121. scenario = tp.get(event.entity_id)
  122. cycle = scenario.cycle
  123. print(f"Received event for scenario '{scenario.name}' in cycle 'cycle.name'.")
  124. if __name__ == "__main__":
  125. gui = Gui()
  126. event_processor = EventProcessor(gui)
  127. event_processor.broadcast_on_event(
  128. callback=event_received,
  129. entity_type=EventEntityType.SCENARIO,
  130. filter=cycle_filter)
  131. event_processor.start()
  132. taipy.run(gui)
  133. ```
  134. Others methods such as `on_data_node_written()` or `on_submission_finished()` are
  135. utility methods as shortcuts to easily register callbacks for predefined topics and
  136. filters.
  137. """
  138. def __init__(self, gui: Optional[Gui] = None) -> None:
  139. """Initialize the Event Processor service.
  140. Arguments:
  141. gui (Gui): The Gui instance used to broadcast the callbacks to all states.
  142. """
  143. self._registration = _Registration()
  144. self._topic_callbacks_map: Dict[_Topic, List[_Callback]] = {}
  145. self._gui = gui
  146. self.event_processor: _AbstractEventProcessor = _EventProcessor()
  147. if EnterpriseEditionUtils._using_enterprise():
  148. self.event_processor = _load_fct(
  149. EnterpriseEditionUtils._TAIPY_ENTERPRISE_EVENT_PACKAGE + "._event_processor",
  150. "_AuthorizedEventProcessor",
  151. )()
  152. super().__init__(self._registration.registration_id, self._registration.queue)
  153. def on_event(
  154. self,
  155. callback: Callable,
  156. callback_args: Optional[List] = None,
  157. entity_type: Optional[EventEntityType] = None,
  158. entity_id: Optional[str] = None,
  159. operation: Optional[EventOperation] = None,
  160. attribute_name: Optional[str] = None,
  161. filter: Optional[Callable[[Event], bool]] = None,
  162. ) -> "EventProcessor":
  163. """Register a callback to be executed on a specific event.
  164. Arguments:
  165. callback (callable): The callback to be executed when the event is produced.
  166. The callback takes the event as the first argument and the GUI instance as
  167. the second argument.
  168. ```python
  169. def on_event_received(event: Event, gui: Gui):
  170. ...
  171. ```
  172. Optionally, the callback can accept extra arguments (see the `callback_args`
  173. argument).
  174. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  175. function in addition to the event and the GUI.
  176. entity_type (Optional[EventEntityType]): The entity type of the event.
  177. If None, the callback is registered for all entity types.
  178. entity_id (Optional[str]): The entity id of the event.
  179. If None, the callback is registered for all entities.
  180. operation (Optional[EventOperation]): The operation of the event.
  181. If None, the callback is registered for all operations.
  182. attribute_name (Optional[str]): The attribute name of an update event.
  183. If None, the callback is registered for all attribute names.
  184. filter (Optional[Callable[[Event], bool]]): A custom filter to apply to
  185. the event before triggering the callback. The filter must accept an event
  186. as the only argument and return a boolean. If the filter returns False, the
  187. callback is not triggered.
  188. Returns:
  189. EventProcessor: The current instance of the `EventProcessor` service.
  190. """
  191. return self.__on_event(
  192. callback=callback,
  193. callback_args=callback_args,
  194. entity_type=entity_type,
  195. entity_id=entity_id,
  196. operation=operation,
  197. attribute_name=attribute_name,
  198. filter=filter,
  199. broadcast=False,
  200. )
  201. def broadcast_on_event(
  202. self,
  203. callback: Callable,
  204. callback_args: Optional[List] = None,
  205. entity_type: Optional[EventEntityType] = None,
  206. entity_id: Optional[str] = None,
  207. operation: Optional[EventOperation] = None,
  208. attribute_name: Optional[str] = None,
  209. filter: Optional[Callable[[Event], bool]] = None,
  210. ) -> "EventProcessor":
  211. """Register a callback to be broadcast to all states on a specific event.
  212. Arguments:
  213. callback (callable): The callback to be executed for each state when the
  214. event is produced. The callback takes the state as the first argument
  215. and the event as the second argument.
  216. ```python
  217. def on_event_received(state, event: Event):
  218. ...
  219. ```
  220. Optionally, the callback can accept extra arguments (see the `callback_args`
  221. argument).
  222. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  223. function in addition to the state and the event.
  224. entity_type (Optional[EventEntityType]): The entity type of the event.
  225. If None, the callback is registered for all entity types.
  226. entity_id (Optional[str]): The entity id of the event.
  227. If None, the callback is registered for all entities.
  228. operation (Optional[EventOperation]): The operation of the event.
  229. If None, the callback is registered for all operations.
  230. attribute_name (Optional[str]): The attribute name of an update event.
  231. If None, the callback is registered for all attribute names.
  232. filter (Optional[Callable[[Event], bool]]): A custom filter to apply to
  233. the event before triggering the callback. The filter must accept an event
  234. as the only argument and return a boolean. If the filter returns False, the
  235. callback is not triggered.
  236. Returns:
  237. EventProcessor: The current instance of the `EventProcessor` service.
  238. """
  239. return self.__on_event(
  240. callback=callback,
  241. callback_args=callback_args,
  242. entity_type=entity_type,
  243. entity_id=entity_id,
  244. operation=operation,
  245. attribute_name=attribute_name,
  246. filter=filter,
  247. broadcast=True,
  248. )
  249. def __on_event(
  250. self,
  251. callback: Callable,
  252. callback_args: Optional[List] = None,
  253. entity_type: Optional[EventEntityType] = None,
  254. entity_id: Optional[str] = None,
  255. operation: Optional[EventOperation] = None,
  256. attribute_name: Optional[str] = None,
  257. filter: Optional[Callable[[Event], bool]] = None,
  258. broadcast: bool = False,
  259. ) -> "EventProcessor":
  260. topic = self.__build_topic(entity_type, entity_id, operation, attribute_name)
  261. cb = self.__build_callback(callback, callback_args, filter, broadcast)
  262. self.__register_callback(topic, cb)
  263. return self
  264. def on_scenario_created(self,
  265. callback: Callable,
  266. callback_args: Optional[List] = None,
  267. scenario_config: Union[str, ScenarioConfig, List, None] = None,
  268. ) -> "EventProcessor":
  269. """ Register a callback for scenario creation events.
  270. !!! example:
  271. === "A callback for all scenario creations"
  272. ```python
  273. import taipy as tp
  274. from taipy import Event, EventProcessor, Gui, State
  275. def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
  276. print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
  277. if __name__ == "__main__":
  278. gui = Gui()
  279. event_processor = EventProcessor(gui)
  280. event_processor.on_scenario_created(callback=print_scenario_created)
  281. event_processor.start()
  282. ...
  283. taipy.run(gui)
  284. ```
  285. === "One callback for a specific scenario configuration"
  286. ```python
  287. import taipy as tp
  288. from taipy import Event, EventProcessor, Gui
  289. def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
  290. print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
  291. if __name__ == "__main__":
  292. event_processor = EventProcessor()
  293. event_processor.on_scenario_created(callback=print_scenario_created, scenario_config="my_cfg")
  294. event_processor.start()
  295. ...
  296. ```
  297. Arguments:
  298. callback (callable):The callback to be executed when consuming the event.
  299. ```python
  300. def on_event_received(event: Event, scenario: Scenario, gui: Gui):
  301. ...
  302. ```
  303. The callback is triggered when a scenario is created. It takes the event
  304. the scenario, and the GUI instance as arguments. It can also accept extra
  305. arguments (see the `callback_args` argument).
  306. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  307. function in addition to the event, the scenario and the GUI.
  308. scenario_config (Union[str, ScenarioConfig, List, None]): The
  309. optional scenario configuration ids or scenario configurations
  310. for which the callback is registered. If None, the callback is registered
  311. for all scenario configurations.
  312. Returns:
  313. EventProcessor: The current instance of the `EventProcessor` service.
  314. """
  315. return self.__on_scenario_created(
  316. callback=callback,
  317. callback_args=callback_args,
  318. scenario_config=scenario_config,
  319. broadcast=False,
  320. )
  321. def broadcast_on_scenario_created(self,
  322. callback: Callable,
  323. callback_args: Optional[List] = None,
  324. scenario_config: Union[str, ScenarioConfig, List, None] = None,
  325. ) -> "EventProcessor":
  326. """ Register a callback executed for all states on scenario creation events.
  327. !!! example:
  328. === "Two callbacks for all scenario creations"
  329. ```python
  330. import taipy as tp
  331. from taipy import Event, EventProcessor, Gui, State
  332. def store_latest_scenario(state: State, event: Event, scenario: Scenario):
  333. print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
  334. state.latest_scenario = scenario
  335. if __name__ == "__main__":
  336. gui = Gui()
  337. event_processor = EventProcessor(gui)
  338. event_processor.broadcast_on_scenario_created(callback=store_latest_scenario)
  339. event_processor.start()
  340. ...
  341. taipy.run(gui)
  342. ```
  343. === "One callback for a specific scenario configuration"
  344. ```python
  345. import taipy as tp
  346. from taipy import Event, EventProcessor, Gui
  347. def scenario_created(state, event: Event, scenario: Scenario):
  348. print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
  349. state.latest_scenario = scenario
  350. if __name__ == "__main__":
  351. event_processor = EventProcessor()
  352. event_processor.broadcast_on_scenario_created(callback=scenario_created, scenario_config="my_cfg")
  353. event_processor.start()
  354. ...
  355. ```
  356. Arguments:
  357. callback (callable):The callback to be executed for each state when
  358. a scenario creation event occurs.
  359. ```python
  360. def on_event_received(state: State, event: Event, scenario: Scenario):
  361. ...
  362. ```
  363. The callback takes the state, the event, and the scenario as arguments.
  364. Optionally, the callback can accept extra arguments (see the
  365. `callback_args` argument).
  366. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  367. function in addition to the state, the event, and the scenario.
  368. scenario_config (Union[str, ScenarioConfig, List, None]): The
  369. optional scenario configuration ids or scenario configurations
  370. for which the callback is registered. If None, the callback is registered
  371. for all scenario configurations.
  372. Returns:
  373. EventProcessor: The current instance of the `EventProcessor` service.
  374. """
  375. return self.__on_scenario_created(
  376. callback=callback,
  377. callback_args=callback_args,
  378. scenario_config=scenario_config,
  379. broadcast=True,
  380. )
  381. def __on_scenario_created(self,
  382. callback: Callable,
  383. callback_args: Optional[List] = None,
  384. scenario_config: Union[str, ScenarioConfig, List, None] = None,
  385. broadcast: bool = False,
  386. ) -> "EventProcessor":
  387. scenario_config = self.__format_configs_parameter(ScenarioConfig, scenario_config)
  388. def _filter(event: Event) -> bool:
  389. if not event.entity_id:
  390. return False
  391. import taipy as tp
  392. sc = tp.get(event.entity_id)
  393. if not isinstance(sc, Scenario):
  394. return False
  395. if scenario_config and sc.config_id not in scenario_config: # type: ignore[union-attr]
  396. return False
  397. event.metadata["predefined_args"] = [sc]
  398. return True
  399. self.__on_event(callback=callback,
  400. callback_args=callback_args,
  401. entity_type=EventEntityType.SCENARIO,
  402. operation=EventOperation.CREATION,
  403. filter=_filter,
  404. broadcast=broadcast)
  405. return self
  406. def on_scenario_deleted(self,
  407. callback: Callable,
  408. callback_args: Optional[List] = None,
  409. scenario_config: Union[str, ScenarioConfig, List, None] = None,
  410. ) -> "EventProcessor":
  411. """ Register a callback for scenario deletion events.
  412. !!! example:
  413. ```python
  414. import taipy as tp
  415. from taipy import Event, EventProcessor, Gui, State
  416. def print_scenario_deleted(event: Event, scenario_id: str, gui: Gui):
  417. print(f"A scenario has been deleted at '{event.creation_date}'.")
  418. if __name__ == "__main__":
  419. gui = Gui()
  420. event_processor = EventProcessor(gui)
  421. event_processor.on_scenario_deleted(callback=print_scenario_)
  422. event_processor.on_scenario_deleted(callback=print_scenario_deleted)
  423. event_processor.start()
  424. ...
  425. taipy.run(gui)
  426. ```
  427. Arguments:
  428. callback (callable):The callback to be executed on scenario deletion event.
  429. ```python
  430. def on_event_received(event: Event, scenario_id: str, gui: Gui):
  431. ...
  432. ```
  433. The callback takes the event, the scenario id, and the GUI instance as
  434. arguments. Optionally, it can also accept extra arguments (see the
  435. `callback_args` argument).
  436. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  437. function in addition to the event, the scenario id, and the GUI.
  438. scenario_config (Union[str, ScenarioConfig, List, None]): The
  439. optional scenario configuration ids or scenario configurations
  440. for which the callback is registered. If None, the callback is registered
  441. for all scenario configurations.
  442. Returns:
  443. EventProcessor: The current instance of the `EventProcessor` service.
  444. """
  445. return self.__on_scenario_deleted(
  446. callback=callback,
  447. callback_args=callback_args,
  448. scenario_config=scenario_config,
  449. broadcast=False,
  450. )
  451. def broadcast_on_scenario_deleted(self,
  452. callback: Callable,
  453. callback_args: Optional[List] = None,
  454. scenario_config: Union[str, ScenarioConfig, List, None] = None,
  455. ) -> "EventProcessor":
  456. """ Register a callback executed for all states on scenario deletion events.
  457. !!! example:
  458. ```python
  459. import taipy as tp
  460. from taipy import Event, EventProcessor, Gui, State
  461. from taipy.gui import notify
  462. def on_scenario_deleted(state: State, event: Event, scenario_id: str):
  463. notify(state, f"A scenario has been deleted at '{event.creation_date}'.")
  464. if __name__ == "__main__":
  465. gui = Gui()
  466. event_processor = EventProcessor(gui)
  467. event_processor.broadcast_on_scenario_deleted(callback=on_scenario_deleted)
  468. event_processor.start()
  469. ...
  470. taipy.run(gui)
  471. ```
  472. Arguments:
  473. callback (Callable):The callback to be executed for each state on scenario
  474. deletion event.
  475. ```python
  476. def on_event_received(state: State, event: Event, scenario_id: str):
  477. ...
  478. ```
  479. The callback takes the state, the event, and the scenario id as arguments.
  480. Optionally, it can also accept extra arguments (see the `callback_args` argument).
  481. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  482. function in addition to the state, the event, and the scenario id.
  483. scenario_config (Union[str, ScenarioConfig, List, None]): The
  484. optional scenario configuration ids or scenario configurations
  485. for which the callback is registered. If None, the callback is registered
  486. for all scenario configurations.
  487. Returns:
  488. EventProcessor: The current instance of the `EventProcessor` service.
  489. """
  490. return self.__on_scenario_deleted(
  491. callback=callback,
  492. callback_args=callback_args,
  493. scenario_config=scenario_config,
  494. broadcast=True,
  495. )
  496. def __on_scenario_deleted(self,
  497. callback: Callable,
  498. callback_args: Optional[List] = None,
  499. scenario_config: Union[str, ScenarioConfig, List, None] = None,
  500. broadcast: bool = False
  501. ) -> "EventProcessor":
  502. scenario_config = self.__format_configs_parameter(ScenarioConfig, scenario_config)
  503. def _filter(event: Event) -> bool:
  504. if not scenario_config:
  505. event.metadata["predefined_args"] = [event.entity_id]
  506. return True
  507. for cfg_id in scenario_config:
  508. if cfg_id in str(event.entity_id):
  509. event.metadata["predefined_args"] = [event.entity_id]
  510. return True
  511. return False
  512. self.__on_event(callback=callback,
  513. callback_args=callback_args,
  514. entity_type=EventEntityType.SCENARIO,
  515. operation=EventOperation.DELETION,
  516. filter=_filter,
  517. broadcast=broadcast)
  518. return self
  519. def on_datanode_written(self,
  520. callback: Callable,
  521. callback_args: Optional[List] = None,
  522. datanode_config: Union[str, DataNodeConfig, List, None] = None,
  523. ) -> "EventProcessor":
  524. """ Register a callback for data node written events.
  525. The callback is triggered when a datanode is written (see methods
  526. `DataNode.write()^` or `DataNode.append()^`).
  527. !!! example:
  528. ```python
  529. import taipy as tp
  530. from taipy import Event, EventProcessor, Gui, State
  531. def last_data_edition(event: Event, datanode: DataNode, data: Any, gui: Gui):
  532. print(f"Datanode written at '{event.creation_date}'.")
  533. state.last_data_edition.append[datanode.id]
  534. if __name__ == "__main__":
  535. gui = Gui()
  536. event_processor = EventProcessor(gui)
  537. event_processor.on_datanode_written(callback=last_data_edition, broadcast=True)
  538. event_processor.start()
  539. ...
  540. taipy.run(gui)
  541. ```
  542. Arguments:
  543. callback (callable):The callback to be executed when consuming the event.
  544. ```python
  545. def on_event_received(event: Event,
  546. datanode: DataNode,
  547. data: Any,
  548. gui: Gui):
  549. ...
  550. ```
  551. The callback takes the event, the datanode, the data, and the GUI instance as
  552. arguments. Optionally, the callback can accept extra arguments (see the
  553. `callback_args` argument).
  554. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  555. function in addition to the event, the datanode, the data, and the GUI.
  556. datanode_config (Union[str, DataNodeConfig, List, None]): The
  557. optional datanode configuration ids or datanode configurations
  558. for which the callback is registered. If None, the callback is registered
  559. for all datanode configurations.
  560. Returns:
  561. EventProcessor: The current instance of the `EventProcessor` service.
  562. """
  563. return self.__on_datanode_written(
  564. callback=callback,
  565. callback_args=callback_args,
  566. datanode_config=datanode_config,
  567. broadcast=False,
  568. )
  569. def broadcast_on_datanode_written(self,
  570. callback: Callable,
  571. callback_args: Optional[List] = None,
  572. datanode_config: Union[str, DataNodeConfig, List, None] = None,
  573. ) -> "EventProcessor":
  574. """ Register a callback for data node written events.
  575. The callback is triggered when a datanode is written (see methods
  576. `DataNode.write()^` or `DataNode.append()^`).
  577. !!! example:
  578. ```python
  579. import taipy as tp
  580. from taipy import Event, EventProcessor, Gui, State
  581. def last_data_edition(state: State, event: Event, datanode: DataNode, data: Any):
  582. print(f"Datanode written at '{event.creation_date}'.")
  583. state.last_data_edition.append[datanode.id]
  584. if __name__ == "__main__":
  585. gui = Gui()
  586. event_processor = EventProcessor(gui)
  587. event_processor.broadcast_on_datanode_written(callback=last_data_edition)
  588. event_processor.start()
  589. ...
  590. taipy.run(gui)
  591. ```
  592. Arguments:
  593. callback (callable): The callback to be executed for all states on data node
  594. written events.
  595. ```python
  596. def on_event_received(state: State, event: Event, datanode: DataNode, data: Any):
  597. ...
  598. ```
  599. The callback takes the state, the event, the datanode, the data, and the GUI
  600. instance as arguments. Optionally, the callback can accept extra arguments
  601. (see the `callback_args` argument).
  602. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  603. function in addition to the state, the event, the datanode, and the data.
  604. datanode_config (Union[str, DataNodeConfig, List, None]): The
  605. optional datanode configuration ids or datanode configurations
  606. for which the callback is registered. If None, the callback is registered
  607. for all datanode configurations.
  608. Returns:
  609. EventProcessor: The current instance of the `EventProcessor` service.
  610. """
  611. return self.__on_datanode_written(
  612. callback=callback,
  613. callback_args=callback_args,
  614. datanode_config=datanode_config,
  615. broadcast=True,
  616. )
  617. def __on_datanode_written(self,
  618. callback: Callable,
  619. callback_args: Optional[List] = None,
  620. datanode_config: Union[str, DataNodeConfig, List, None] = None,
  621. broadcast: bool = False
  622. ) -> "EventProcessor":
  623. datanode_config = self.__format_configs_parameter(DataNodeConfig, datanode_config)
  624. def _filter(event: Event) -> bool:
  625. if not event.entity_id:
  626. return False
  627. import taipy as tp
  628. dn = tp.get(event.entity_id)
  629. if not isinstance(dn, DataNode):
  630. return False
  631. if datanode_config and dn.config_id not in datanode_config:
  632. return False
  633. event.metadata["predefined_args"] = [dn, dn.read()]
  634. return True
  635. self.__on_event(callback=callback,
  636. callback_args=callback_args,
  637. entity_type=EventEntityType.DATA_NODE,
  638. operation=EventOperation.UPDATE,
  639. attribute_name="last_edit_date",
  640. filter=_filter,
  641. broadcast=broadcast)
  642. return self
  643. def on_datanode_deleted(self,
  644. callback: Callable,
  645. callback_args: Optional[List] = None,
  646. datanode_config: Union[str, DataNodeConfig, List, None] = None,
  647. ) -> "EventProcessor":
  648. """ Register a callback for data node deletion events.
  649. !!! example:
  650. ```python
  651. import taipy as tp
  652. from taipy import Event, EventProcessor, Gui, State
  653. def on_deletions(event: Event, datanode_id: str, gui: Gui):
  654. print(f"Datanode deleted at '{event.creation_date}'.")
  655. if __name__ == "__main__":
  656. gui = Gui()
  657. event_processor = EventProcessor(gui)
  658. event_processor.on_datanode_deleted(callback=record_deletions)
  659. event_processor.start()
  660. ...
  661. taipy.run(gui)
  662. ```
  663. Arguments:
  664. callback (callable):The callback to be executed when consuming the event.
  665. ```python
  666. def on_event_received(event: Event, datanode_id: str, gui: Gui):
  667. ...
  668. ```
  669. The callback takes the event, the datanode id, and the GUI instance as
  670. arguments. Optionally, it can accept extra arguments (see the
  671. `callback_args` argument).
  672. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  673. function in addition to the event, the datanode id, and the GUI.
  674. datanode_config (Union[str, DataNodeConfig, List, None]): The
  675. optional datanode configuration ids or datanode configurations
  676. for which the callback is registered. If None, the callback is registered
  677. for all datanode configurations.
  678. Returns:
  679. EventProcessor: The current instance of the `EventProcessor` service.
  680. """
  681. return self.__on_datanode_deleted(
  682. callback=callback,
  683. callback_args=callback_args,
  684. datanode_config=datanode_config,
  685. broadcast=False,
  686. )
  687. def broadcast_on_datanode_deleted(self,
  688. callback: Callable,
  689. callback_args: Optional[List] = None,
  690. datanode_config: Union[str, DataNodeConfig, List, None] = None,
  691. ) -> "EventProcessor":
  692. """ Register a callback for each state on data node deletion events.
  693. !!! example:
  694. ```python
  695. import taipy as tp
  696. from taipy import Event, EventProcessor, Gui, State
  697. def record_deletions(state: State, event: Event, datanode_id: str):
  698. print(f"Datanode deleted at '{event.creation_date}'.")
  699. state.deleted_datanodes.append[datanode_id]
  700. if __name__ == "__main__":
  701. gui = Gui()
  702. event_processor = EventProcessor(gui)
  703. event_processor.broadcast_on_datanode_deleted(callback=record_deletions)
  704. event_processor.start()
  705. ...
  706. taipy.run(gui)
  707. ```
  708. Arguments:
  709. callback (callable): The callback to be executed for each state on data node
  710. deletion events.
  711. ```python
  712. def on_event_received(state: State, event: Event, datanode_id: str):
  713. ...
  714. ```
  715. The callback takes the state, the event, the datanode id, and the GUI
  716. instance as arguments. Optionally, it can accept extra arguments (see the
  717. `callback_args` argument).
  718. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  719. function in addition to the state, the event, and the datanode id.
  720. datanode_config (Union[str, DataNodeConfig, List, None]): The
  721. optional datanode configuration ids or datanode configurations
  722. for which the callback is registered. If None, the callback is registered
  723. for all datanode configurations.
  724. Returns:
  725. EventProcessor: The current instance of the `EventProcessor` service.
  726. """
  727. return self.__on_datanode_deleted(
  728. callback=callback,
  729. callback_args=callback_args,
  730. datanode_config=datanode_config,
  731. broadcast=True,
  732. )
  733. def __on_datanode_deleted(self,
  734. callback: Callable,
  735. callback_args: Optional[List] = None,
  736. datanode_config: Union[str, DataNodeConfig, List, None] = None,
  737. broadcast: bool = False
  738. ) -> "EventProcessor":
  739. datanode_config = self.__format_configs_parameter(DataNodeConfig, datanode_config)
  740. def _filter(event: Event) -> bool:
  741. if not datanode_config:
  742. event.metadata["predefined_args"] = [event.entity_id]
  743. return True
  744. for cfg_id in datanode_config:
  745. if cfg_id in str(event.entity_id):
  746. event.metadata["predefined_args"] = [event.entity_id]
  747. return True
  748. return False
  749. self.__on_event(callback=callback,
  750. callback_args=callback_args,
  751. entity_type=EventEntityType.DATA_NODE,
  752. operation=EventOperation.DELETION,
  753. filter=_filter,
  754. broadcast=broadcast)
  755. return self
  756. def on_datanode_created(self,
  757. callback: Callable,
  758. callback_args: Optional[List] = None,
  759. datanode_config: Union[str, DataNodeConfig, List, None] = None,
  760. ) -> "EventProcessor":
  761. """ Register a callback to be executed on data node creation event.
  762. !!! example:
  763. ```python
  764. import taipy as tp
  765. from taipy import Event, EventProcessor, Gui, State
  766. def on_datanode_creations(event: Event, datanode: DataNode, gui: Gui):
  767. print(f"Datanode created at '{event.creation_date}'.")
  768. if __name__ == "__main__":
  769. gui = Gui()
  770. event_processor = EventProcessor(gui)
  771. event_processor.on_datanode_created(callback=record_creations)
  772. event_processor.start()
  773. ...
  774. taipy.run(gui)
  775. ```
  776. Arguments:
  777. callback (callable):The callback to be executed on data node creation events.
  778. ```python
  779. def on_event_received(event: Event, datanode: DataNode, gui: Gui):
  780. ...
  781. ```
  782. The callback takes the event, the datanode, and the GUI instance as
  783. arguments. Optionally, the callback can accept extra arguments (see the
  784. `callback_args` argument).
  785. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  786. function in addition to the event, the datanode, and the GUI.
  787. datanode_config (Union[str, ScenarioConfig, List, None]): The
  788. optional datanode configuration ids or datanode configurations
  789. for which the callback is registered. If None, the callback is registered
  790. for all datanode configurations.
  791. Returns:
  792. EventProcessor: The current instance of the `EventProcessor` service.
  793. """
  794. return self.__on_datanode_created(
  795. callback=callback,
  796. callback_args=callback_args,
  797. datanode_config=datanode_config,
  798. broadcast=False,
  799. )
  800. def broadcast_on_datanode_created(self,
  801. callback: Callable,
  802. callback_args: Optional[List] = None,
  803. datanode_config: Union[str, DataNodeConfig, List, None] = None,
  804. ) -> "EventProcessor":
  805. """ Register a callback to be executed for each state on data node creation event.
  806. !!! example:
  807. ```python
  808. import taipy as tp
  809. from taipy import Event, EventProcessor, Gui, State
  810. from taipy.gui import notify
  811. def on_datanode_creations(state: State, event: Event, datanode: DataNode):
  812. print(f"Datanode created at '{event.creation_date}'.")
  813. notify(state, f"Datanode '{datanode.id}' created.")
  814. if __name__ == "__main__":
  815. gui = Gui()
  816. event_processor = EventProcessor(gui)
  817. event_processor.broadcast_on_datanode_created(callback=record_creations)
  818. event_processor.start()
  819. ...
  820. taipy.run(gui)
  821. ```
  822. Arguments:
  823. callback (callable):The callback to be executed on data node creation events.
  824. ```python
  825. def on_event_received(state: State, event: Event, datanode: DataNode):
  826. ...
  827. ```
  828. The callback takes the state, the event, the datanode as arguments.
  829. Optionally, the callback can accept extra arguments (see the
  830. `callback_args` argument).
  831. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  832. function in addition to the state, the event, and the datanode.
  833. datanode_config (Union[str, ScenarioConfig, List, None]): The
  834. optional datanode configuration ids or datanode configurations
  835. for which the callback is registered. If None, the callback is registered
  836. for all datanode configurations.
  837. Returns:
  838. EventProcessor: The current instance of the `EventProcessor` service.
  839. """
  840. return self.__on_datanode_created(
  841. callback=callback,
  842. callback_args=callback_args,
  843. datanode_config=datanode_config,
  844. broadcast=True,
  845. )
  846. def __on_datanode_created(self,
  847. callback: Callable,
  848. callback_args: Optional[List] = None,
  849. datanode_config: Union[str, DataNodeConfig, List, None] = None,
  850. broadcast: bool = False
  851. ) -> "EventProcessor":
  852. datanode_config = self.__format_configs_parameter(DataNodeConfig, datanode_config)
  853. def _filter(event: Event) -> bool:
  854. if not event.entity_id:
  855. return False
  856. import taipy as tp
  857. dn = tp.get(event.entity_id)
  858. if not isinstance(dn, DataNode):
  859. return False
  860. if datanode_config and dn.config_id not in datanode_config:
  861. return False
  862. event.metadata["predefined_args"] = [dn]
  863. return True
  864. self.__on_event(callback=callback,
  865. callback_args=callback_args,
  866. entity_type=EventEntityType.DATA_NODE,
  867. operation=EventOperation.CREATION,
  868. filter=_filter,
  869. broadcast=broadcast)
  870. return self
  871. def on_submission_finished(self,
  872. callback: Callable,
  873. callback_args: Optional[List] = None,
  874. config_ids: Union[str, ScenarioConfig, TaskConfig, List, None] = None,
  875. ) -> "EventProcessor":
  876. """Register a callback for submission finished events.
  877. !!! example:
  878. ```python
  879. import taipy as tp
  880. from taipy import Event, EventProcessor, Gui, State
  881. def record_submissions(event: Event, submittable: Submittable, submission: Submission, gui: Gui):
  882. if submission.submission_status == SubmissionStatus.COMPLETED:
  883. print(f"Submission completed at '{event.creation_date}'. Status: '{submission.submission_status}'")
  884. elif submission.submission_status == SubmissionStatus.FAILED:
  885. print(f"Submission failed at '{event.creation_date}'. Status: '{submission.submission_status}'")
  886. if __name__ == "__main__":
  887. gui = Gui()
  888. event_processor = EventProcessor(gui)
  889. event_processor.on_submission_finished(callback=record_submissions)
  890. event_processor.start()
  891. ...
  892. taipy.run(gui)
  893. ```
  894. Arguments:
  895. callback (callable): The callback to be executed on submission finished
  896. events.
  897. ```python
  898. def on_event_received(event: Event, submittable: Submittable, submission: Submission, gui: Gui):
  899. ...
  900. ```
  901. The callback takes the event, the submittable (scenario, sequence or task),
  902. the submission, and the GUI instance as arguments. Optionally, the
  903. callback can accept extra arguments (see the `callback_args` argument).
  904. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  905. function in addition to the event, the submittable, the submission, and the GUI.
  906. config_ids (Union[str, ScenarioConfig, TaskConfig, List, None]): The
  907. optional scenario configuration ids or task configuration ids or scenario
  908. configurations or task configurations for which the callback is registered.
  909. If None, the callback is registered for any submittable.
  910. Returns:
  911. EventProcessor: The current instance of the `EventProcessor` service.
  912. """
  913. return self.__on_submission_finished(
  914. callback=callback,
  915. callback_args=callback_args,
  916. config_ids=config_ids,
  917. broadcast=False,
  918. )
  919. def broadcast_on_submission_finished(self,
  920. callback: Callable,
  921. callback_args: Optional[List] = None,
  922. config_ids: Union[str, ScenarioConfig, TaskConfig, List, None] = None,
  923. ) -> "EventProcessor":
  924. """Register a callback to be executed for each state on submission finished events.
  925. !!! example
  926. :
  927. ```python
  928. import taipy as tp
  929. from taipy import Event, EventProcessor, Gui, State
  930. def record_submissions(state: State, event: Event, submittable: Submittable, submission: Submission):
  931. print(f"Submission finished at '{event.creation_date}'. Status: '{submission.submission_status}'")
  932. if submission.submission_status == SubmissionStatus.COMPLETED:
  933. state.completed.append[submittable.id]
  934. elif submission.submission_status == SubmissionStatus.FAILED:
  935. state.failed.append[submittable.id]
  936. if __name__ == "__main__":
  937. gui = Gui()
  938. event_processor = EventProcessor(gui)
  939. event_processor.on_submission_finished(callback=record_submissions, broadcast=True)
  940. event_processor.start()
  941. ...
  942. taipy.run(gui)
  943. ```
  944. Arguments:
  945. callback (callable): The callback to be executed for each state on submission
  946. finished events.
  947. ```python
  948. def on_event_received(state: State, event: Event, submittable: Submittable, submission: Submission):
  949. ...
  950. ```
  951. The callback takes the state, the event, the submittable (scenario, sequence
  952. or task), and the submission. Optionally, the callback can accept extra
  953. arguments (see the `callback_args` argument).
  954. callback_args (List[AnyOf]): The extra arguments to be passed to the callback
  955. function in addition to the state, the event, the submittable, and the
  956. submission.
  957. config_ids (Union[str, ScenarioConfig, TaskConfig, List, None]): The
  958. optional scenario configuration ids or task configuration ids or scenario
  959. configurations or task configurations for which the callback is registered.
  960. If None, the callback is registered for any submittable.
  961. Returns:
  962. EventProcessor: The current instance of the `EventProcessor` service.
  963. """
  964. return self.__on_submission_finished(
  965. callback=callback,
  966. callback_args=callback_args,
  967. config_ids=config_ids,
  968. broadcast=True,
  969. )
  970. def __on_submission_finished(self,
  971. callback: Callable,
  972. callback_args: Optional[List] = None,
  973. config_ids: Union[str, ScenarioConfig, TaskConfig, List, None] = None,
  974. broadcast: bool = False
  975. ) -> "EventProcessor":
  976. if isinstance(config_ids, str):
  977. config_ids = [config_ids]
  978. if isinstance(config_ids, TaskConfig):
  979. config_ids = [config_ids.id]
  980. if isinstance(config_ids, ScenarioConfig):
  981. config_ids = [config_ids.id]
  982. if isinstance(config_ids, list):
  983. res = []
  984. for cfg in config_ids:
  985. if isinstance(cfg, TaskConfig):
  986. res.append(cfg.id)
  987. elif isinstance(cfg, ScenarioConfig):
  988. res.append(cfg.id)
  989. else:
  990. res.append(cfg)
  991. config_ids = res
  992. def _filter(event: Event) -> bool:
  993. finished_statuses = {SubmissionStatus.COMPLETED, SubmissionStatus.FAILED, SubmissionStatus.CANCELED}
  994. if not event.entity_id or not event.attribute_value or event.attribute_value not in finished_statuses:
  995. return False
  996. import taipy as tp
  997. submission = tp.get(event.entity_id)
  998. if not isinstance(submission, Submission):
  999. return False
  1000. if config_ids:
  1001. # We are filtering on a specific config
  1002. if not submission.entity_config_id:
  1003. # It is a submission for a sequence that does not have configs
  1004. return False
  1005. if submission.entity_config_id not in config_ids:
  1006. # It is a submission for a config that is not in the list
  1007. return False
  1008. submittable = tp.get(submission.entity_id) # type: ignore[arg-type]
  1009. event.metadata["predefined_args"] = [submittable, submission]
  1010. return True
  1011. self.__on_event(callback=callback,
  1012. callback_args=callback_args,
  1013. entity_type=EventEntityType.SUBMISSION,
  1014. operation=EventOperation.UPDATE,
  1015. attribute_name="submission_status",
  1016. filter=_filter,
  1017. broadcast=broadcast)
  1018. return self
  1019. def process_event(self, event: Event) -> None:
  1020. """Process an event.
  1021. This method is responsible for processing the incoming event.
  1022. Args:
  1023. event (Event): The event to be processed.
  1024. """
  1025. self.event_processor.process_event(self, event)
  1026. def start(self):
  1027. """Start the event processor thread."""
  1028. Notifier._register_from_registration(self._registration)
  1029. super().start()
  1030. def stop(self):
  1031. """Stop the event processor thread."""
  1032. super().stop()
  1033. Notifier.unregister(self._registration.registration_id)
  1034. @staticmethod
  1035. def __format_configs_parameter(clazz, parameter) -> List[str]:
  1036. if isinstance(parameter, str):
  1037. parameter = [parameter]
  1038. if isinstance(parameter, clazz):
  1039. parameter = [parameter.id] # type: ignore[attr-defined]
  1040. if isinstance(parameter, list):
  1041. parameter = [cfg.id if isinstance(cfg, clazz) else cfg for cfg in parameter] # type: ignore[attr-defined]
  1042. return parameter
  1043. def __build_topic(self, entity_type, entity_id, operation, attribute_name):
  1044. topic = _Topic(entity_type, entity_id, operation, attribute_name)
  1045. self._registration.add_topic(
  1046. entity_type=topic.entity_type,
  1047. entity_id=topic.entity_id,
  1048. operation=topic.operation,
  1049. attribute_name=topic.attribute_name
  1050. )
  1051. return topic
  1052. def __build_callback(self, callback, callback_args, filter, broadcast):
  1053. if broadcast and self._gui is None:
  1054. _TaipyLogger._get_logger().error(
  1055. "A callback is set to be broadcast to all states of "
  1056. "the GUI but no GUI is provided to the event processor."
  1057. )
  1058. raise NoGuiDefinedInEventProcessor()
  1059. if callback_args is None:
  1060. callback_args = []
  1061. cb = _Callback(callback, args=callback_args, broadcast=broadcast, filter=filter)
  1062. return cb
  1063. def __register_callback(self, topic, cb):
  1064. if self._topic_callbacks_map.get(topic) is None:
  1065. self._topic_callbacks_map[topic] = []
  1066. self._topic_callbacks_map[topic].append(cb)
  1067. def _process_event(self, event: Event) -> None:
  1068. for topic, cbs in self._topic_callbacks_map.items():
  1069. if Notifier._is_matching(event, topic):
  1070. for cb in cbs:
  1071. if not cb.filter or cb.filter(event):
  1072. self.__do_process(cb, event)
  1073. def __do_process(self, cb, event: Event) -> None:
  1074. predefined_args = event.metadata.pop("predefined_args", [])
  1075. if cb.broadcast:
  1076. if not self._gui:
  1077. _TaipyLogger._get_logger().error(
  1078. "A callback is set to be broadcast to all states of "
  1079. "the GUI but no GUI is provided to the event processor."
  1080. )
  1081. return
  1082. self._gui.broadcast_callback(cb.callback, [event, *predefined_args, *cb.args])
  1083. else:
  1084. cb.callback(event, *predefined_args, self._gui, *cb.args)