helpers.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. # Copyright 2021-2025 Avaiga Private Limited
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  4. # the License. You may obtain a copy of the License at
  5. #
  6. # http://www.apache.org/licenses/LICENSE-2.0
  7. #
  8. # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
  9. # an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
  10. # specific language governing permissions and limitations under the License.
  11. import json
  12. import logging
  13. import socket
  14. import time
  15. import typing as t
  16. import urllib.request
  17. import warnings
  18. import socketio
  19. from taipy.gui import Gui, Html, Markdown
  20. from taipy.gui._renderers.builder import _Builder
  21. from taipy.gui._warnings import TaipyGuiWarning
  22. from taipy.gui.servers.utils import get_server_type
  23. from taipy.gui.utils._variable_directory import _reset_name_map
  24. from taipy.gui.utils.expr_var_name import _reset_expr_var_name
  25. class WebSocketTestClient:
  26. def __init__(self, url: str = "http://localhost:5000", auto_connect=True):
  27. self.url = url
  28. self.sio = socketio.Client()
  29. self.messages: t.List = []
  30. # Register event handlers
  31. self.sio.on("message", self._on_message)
  32. # Auto connect if specified
  33. if auto_connect:
  34. self.connect()
  35. def _on_message(self, data):
  36. self.messages.append(data)
  37. def connect(self):
  38. self.sio.connect(f"{self.url}/socket.io")
  39. def disconnect(self):
  40. self.sio.disconnect()
  41. def emit(self, *args, **kwargs):
  42. self.sio.emit(*args, **kwargs)
  43. self.sio.sleep(1)
  44. def get_received(self, timeout=1):
  45. self.sio.sleep(timeout)
  46. # wrap the messages in a dict to match the expected format
  47. wrapped_messages = []
  48. wrapped_messages.extend({"name": "message", "args": message} for message in self.messages)
  49. self.messages = []
  50. return wrapped_messages
  51. def get(self, url):
  52. with urllib.request.urlopen(self.url + url) as response:
  53. return response
  54. def get_sid(self) -> t.Optional[str]:
  55. return self.sio.get_sid()
  56. class Helpers:
  57. @staticmethod
  58. def test_cleanup():
  59. _Builder._reset_key()
  60. _reset_name_map()
  61. _reset_expr_var_name()
  62. Gui._Gui__extensions.clear()
  63. Gui._Gui__shared_variables.clear()
  64. Gui._Gui__content_providers.clear()
  65. @staticmethod
  66. def test_control_md(gui: Gui, md_string: str, expected_values: t.Union[str, t.List]):
  67. gui.add_page("test", Markdown(md_string, frame=None))
  68. Helpers._test_control(gui, expected_values)
  69. @staticmethod
  70. def test_control_html(gui: Gui, html_string: str, expected_values: t.Union[str, t.List]):
  71. gui.add_page("test", Html(html_string, frame=None))
  72. Helpers._test_control(gui, expected_values)
  73. @staticmethod
  74. def test_control_builder(gui: Gui, builder_page, expected_values: t.Union[str, t.List]):
  75. gui.add_page("test", builder_page)
  76. Helpers._test_control(gui, expected_values)
  77. @staticmethod
  78. def _test_control(gui: Gui, expected_values: t.Union[str, t.List]):
  79. gui.run(run_server=False, single_client=True, stylekit=False)
  80. client = gui._server.test_client()
  81. response = client.get("/taipy-jsx/test")
  82. assert response.status_code == 200, f"response.status_code {response.status_code} != 200"
  83. response_data = Helpers.get_response_data(response)
  84. assert isinstance(response_data, t.Dict), "response_data is not Dict"
  85. assert "jsx" in response_data, "jsx not in response_data"
  86. jsx = response_data["jsx"]
  87. logging.getLogger().debug(jsx)
  88. if isinstance(expected_values, str):
  89. assert jsx == expected_values, f"{jsx} != {expected_values}"
  90. elif isinstance(expected_values, list):
  91. for expected_value in expected_values:
  92. assert expected_value in jsx, f"{expected_value} not in {jsx}"
  93. @staticmethod
  94. def assert_outward_ws_message(received_message, type, varname, value):
  95. assert isinstance(received_message, dict)
  96. assert "name" in received_message and received_message["name"] == "message"
  97. assert "args" in received_message
  98. args = received_message["args"]
  99. assert "type" in args and args["type"] == type
  100. assert "payload" in args
  101. payload_arr = args["payload"]
  102. found_payload = False
  103. for payload in payload_arr:
  104. if "name" in payload and varname in payload["name"]:
  105. assert "payload" in payload and "value" in payload["payload"] and payload["payload"]["value"] == value
  106. found_payload = True
  107. logging.getLogger().debug(payload["payload"]["value"])
  108. assert found_payload
  109. @staticmethod
  110. def assert_outward_simple_ws_message(received_message, type, varname, value):
  111. assert isinstance(received_message, dict)
  112. assert "name" in received_message and received_message["name"] == "message"
  113. assert "args" in received_message
  114. args = received_message["args"]
  115. assert "type" in args and args["type"] == type
  116. assert "name" in args and args["name"] == varname
  117. assert "payload" in args
  118. payload = args["payload"]
  119. assert "value" in payload and payload["value"] == value
  120. logging.getLogger().debug(payload["value"])
  121. @staticmethod
  122. def assert_outward_ws_simple_message(received_message, aType, values):
  123. assert isinstance(received_message, dict)
  124. assert "name" in received_message and received_message["name"] == "message"
  125. assert "args" in received_message
  126. args = received_message["args"]
  127. assert "type" in args and args["type"] == aType
  128. for k, v in values.items():
  129. assert k in args and args[k] == v
  130. logging.getLogger().debug(f"{k}: {args[k]}")
  131. @staticmethod
  132. def assert_outward_ws_multiple_message(received_message, type, array_len: int):
  133. assert isinstance(received_message, dict)
  134. assert "name" in received_message and received_message["name"] == "message"
  135. assert "args" in received_message
  136. args = received_message["args"]
  137. assert "type" in args and args["type"] == type
  138. assert "payload" in args
  139. payload = args["payload"]
  140. assert isinstance(payload, list)
  141. assert len(payload) == array_len
  142. logging.getLogger().debug(payload)
  143. @staticmethod
  144. def create_scope_and_get_sid(gui: Gui) -> str:
  145. sid = "test"
  146. gui._bindings()._get_or_create_scope(sid)
  147. return sid
  148. @staticmethod
  149. def port_check():
  150. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  151. s.settimeout(1)
  152. if s.connect_ex(("127.0.0.1", 5000)) == 0:
  153. s.close()
  154. return True
  155. else:
  156. s.close()
  157. return False
  158. @staticmethod
  159. def run_e2e(gui, **kwargs):
  160. kwargs["run_in_thread"] = True
  161. kwargs["single_client"] = True
  162. kwargs["run_browser"] = False
  163. kwargs["stylekit"] = kwargs.get("stylekit", False)
  164. with warnings.catch_warnings(record=True):
  165. gui.run(**kwargs)
  166. while not Helpers.port_check():
  167. time.sleep(0.1)
  168. @staticmethod
  169. def run_e2e_multi_client(gui: Gui):
  170. with warnings.catch_warnings(record=True):
  171. gui.run(run_server=False, run_browser=False, single_client=False, stylekit=False)
  172. gui._server.run(
  173. host=gui._get_config("host", "127.0.0.1"),
  174. port=gui._get_config("port", 5000),
  175. client_url=gui._get_config("client_url", "http://localhost:{port}"),
  176. debug=False,
  177. use_reloader=False,
  178. server_log=False,
  179. run_in_thread=True,
  180. allow_unsafe_werkzeug=False,
  181. notebook_proxy=False,
  182. port_auto_ranges=gui._get_config("port_auto_ranges", None),
  183. )
  184. while not Helpers.port_check():
  185. time.sleep(0.1)
  186. @staticmethod
  187. def get_taipy_warnings(warns: t.List[warnings.WarningMessage]) -> t.List[warnings.WarningMessage]:
  188. return [w for w in warns if issubclass(w.category, TaipyGuiWarning)]
  189. @staticmethod
  190. def get_response_data(response):
  191. if get_server_type() == "flask":
  192. return json.loads(response.get_data().decode("utf-8", "ignore"))
  193. elif get_server_type() == "fastapi":
  194. return response.json()
  195. return None
  196. @staticmethod
  197. def get_response_raw_data(response):
  198. if get_server_type() == "flask":
  199. return response.get_data().decode("utf-8", "ignore")
  200. elif get_server_type() == "fastapi":
  201. return response.text
  202. return None
  203. @staticmethod
  204. def get_socketio_test_client():
  205. return WebSocketTestClient()