chat_streaming.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. # Copyright 2021-2024 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. # -----------------------------------------------------------------------------------------
  12. # To execute this script, make sure that the taipy-gui package is installed in your
  13. # Python environment and run:
  14. # python <script>
  15. # -----------------------------------------------------------------------------------------
  16. import datetime
  17. import re
  18. import time
  19. import typing as t
  20. import requests # type: ignore[import-untyped]
  21. from taipy.gui import Gui, Icon, State, get_state_id, invoke_callback, invoke_long_callback
  22. # The Wikipedia API used to generate content for a date
  23. wiki_url = "https://en.wikipedia.org/api/rest_v1/feed/onthisday/{type}/{month}/{day}"
  24. event_types = {
  25. "happen": "events",
  26. "passé": "events",
  27. "born": "births",
  28. "né": "births",
  29. "dead": "deaths",
  30. "mort": "deaths",
  31. }
  32. user_agent = "https://taipy.io/demo"
  33. # The list of messages
  34. messages: list[tuple[str, str, str]] = [] # (Message id, message, sender)
  35. # The two users of this app
  36. users = [
  37. ["wikipedia", Icon("https://www.wikipedia.org/static/apple-touch/wikipedia.png", "Wikipedia")],
  38. ["taipy", Icon("https://docs.taipy.io/en/latest/assets/images/favicon.png", "Taipy")],
  39. ]
  40. # Initialize the user state
  41. def on_init(state: State):
  42. # Messages are for this user only
  43. state.messages = []
  44. # Add the image if there is one in the Wikipedia returned data
  45. def add_image_to_message(state: State, idx: int, text: str, image_url: str):
  46. msg_content: str = state.messages[idx][1]
  47. if (pos := msg_content.find(text)) > -1:
  48. msg_content = msg_content[: pos + len(text)] + f"\n\n![{text}]({image_url})" + msg_content[pos + len(text) :]
  49. set_message(state, msg_content, idx)
  50. # Invoked by update_message through a thread
  51. def update_message_with_image(gui: Gui, state_id: str, message_idx: int, text: str, image: dict):
  52. if src := image.get("source"):
  53. time.sleep(0.2) # Apply the typewriter effect
  54. invoke_callback(
  55. gui,
  56. state_id,
  57. add_image_to_message,
  58. [message_idx, text, src],
  59. )
  60. # Invoked by query_wikipedia()
  61. def update_message(state: State, json, event_type: str, for_date: str, idx: int):
  62. if isinstance(json, dict):
  63. # Initial response content
  64. set_message(state, f"{event_type} for {for_date}:\n", idx)
  65. for event in json.get(event_type, []):
  66. time.sleep(0.2) # Apply the typewriter effect
  67. # Update response text
  68. append_to_message(state, f"\n* {event.get('year', '')}: {event.get('text', '')}", idx)
  69. # Invoke update_message_with_image() in a separated thread
  70. invoke_long_callback(
  71. state=state,
  72. user_function=update_message_with_image,
  73. user_function_args=[
  74. state.get_gui(),
  75. get_state_id(state),
  76. idx,
  77. event.get("text", ""),
  78. pages[0].get("thumbnail", {}) if (pages := event.get("pages", [])) and len(pages) else {},
  79. ],
  80. )
  81. # Set a new message or append to an existing message.
  82. # Return the message index in the list.
  83. def set_message(state: State, message: str, idx: t.Optional[int] = None):
  84. if idx is not None and idx < len(state.messages):
  85. msg = state.messages[idx]
  86. state.messages[idx] = (msg[0], message, msg[2])
  87. else:
  88. idx = len(state.messages)
  89. state.messages.append((f"{len(state.messages)}", message, users[0][0]))
  90. state.refresh("messages")
  91. return idx
  92. # Append text to an existing message
  93. def append_to_message(state: State, message: str, idx: int):
  94. if idx < len(state.messages):
  95. msg = state.messages[idx]
  96. state.messages[idx] = (msg[0], f"{msg[1]}{message}", msg[2])
  97. state.refresh("messages")
  98. return idx
  99. # Invoke the Wikipedia API. This is invoked by send_message()
  100. def request_wikipedia(gui: Gui, state_id: str, event_type: str, month: str, day: str):
  101. # Let the user known that a query was sent
  102. idx = invoke_callback(
  103. gui,
  104. state_id,
  105. set_message,
  106. ["Fetching information from Wikipedia ..."],
  107. )
  108. request = wiki_url.format(type=event_type, month=month, day=day)
  109. req = requests.get(request, headers={"accept": "application/json; charset=utf-8;", "User-Agent": user_agent})
  110. # Handle the response
  111. if req.status_code == 200:
  112. # Display the response
  113. invoke_callback(
  114. gui,
  115. state_id,
  116. update_message,
  117. [req.json(), event_type, f"{day}/{month}", idx],
  118. )
  119. else:
  120. # Display the error
  121. invoke_callback(
  122. gui,
  123. state_id,
  124. set_message,
  125. [f"Wikipedia API call failed: {req.status_code}", idx],
  126. )
  127. # Invoked by the 'on_action' callback of the chat control when the user presses the Send button
  128. def send_message(state: State, id: str, payload: dict):
  129. args = payload.get("args", [])
  130. # Display the request
  131. state.messages.append((f"{len(state.messages)}", args[2], args[3]))
  132. state.refresh("messages")
  133. # Analyse the request
  134. request = args[2].lower()
  135. type_event = None
  136. for word in event_types:
  137. if word in request:
  138. type_event = event_types[word]
  139. break
  140. type_event = type_event if type_event else "events"
  141. month = None
  142. day = None
  143. for m in re.finditer(r"(\d\d?)", request):
  144. if month is None:
  145. month = m.group()
  146. elif day is None:
  147. day = m.group()
  148. break
  149. if month is None:
  150. month = f"{datetime.datetime.now().month}"
  151. if day is None:
  152. day = f"{datetime.datetime.now().day}"
  153. # Process the request
  154. invoke_long_callback(
  155. state=state,
  156. user_function=request_wikipedia,
  157. user_function_args=[state.get_gui(), get_state_id(state), type_event, month, day],
  158. )
  159. page = """
  160. <|{messages}|chat|users={users}|on_action=send_message|height=80vh|>
  161. """
  162. if __name__ == "__main__":
  163. Gui(page).run(title="Chat - Ask Wikipedia")