air.py 8.6 KB


  1. import asyncio
  2. import gzip
  3. import json
  4. import re
  5. import signal
  6. from dataclasses import dataclass
  7. from typing import Any, AsyncIterator, Dict, Optional
  8. from uuid import uuid4
  9. import httpx
  10. import socketio
  11. import socketio.exceptions
  12. from . import background_tasks, core
  13. from .client import Client
  14. from .dataclasses import KWONLY_SLOTS
  15. from .logging import log
  16. RELAY_HOST = 'https://on-air.nicegui.io/'
  17. @dataclass(**KWONLY_SLOTS)
  18. class Stream:
  19. data: AsyncIterator[bytes]
  20. response: httpx.Response
  21. class Air:
  22. def __init__(self, token: str) -> None:
  23. self.token = token
  24. self.relay = socketio.AsyncClient()
  25. self.client = httpx.AsyncClient(app=core.app)
  26. self.streaming_client = httpx.AsyncClient()
  27. self.connecting = False
  28. self.streams: Dict[str, Stream] = {}
  29. self.remote_url: Optional[str] = None
  30. @self.relay.on('http')
  31. async def _handle_http(data: Dict[str, Any]) -> Dict[str, Any]:
  32. headers: Dict[str, Any] = data['headers']
  33. headers.update({'Accept-Encoding': 'identity', 'X-Forwarded-Prefix': data['prefix']})
  34. url = 'http://test' + data['path']
  35. request = self.client.build_request(
  36. data['method'],
  37. url,
  38. params=data['params'],
  39. headers=headers,
  40. content=data['body'],
  41. )
  42. response = await self.client.send(request)
  43. instance_id = data['instance-id']
  44. content = response.content.replace(
  45. b'const extraHeaders = {};',
  46. (f'const extraHeaders = {{ "fly-force-instance-id" : "{instance_id}" }};').encode(),
  47. )
  48. match = re.search(b'const query = ({.*?})', content)
  49. if match:
  50. new_js_object = match.group(1).decode().rstrip('}') + ", 'fly_instance_id' : '" + instance_id + "'}"
  51. content = content.replace(match.group(0), f'const query = {new_js_object}'.encode())
  52. compressed = gzip.compress(content)
  53. response.headers.update({'content-encoding': 'gzip', 'content-length': str(len(compressed))})
  54. return {
  55. 'status_code': response.status_code,
  56. 'headers': response.headers.multi_items(),
  57. 'content': compressed,
  58. }
  59. @self.relay.on('range-request')
  60. async def _handle_range_request(data: Dict[str, Any]) -> Dict[str, Any]:
  61. headers: Dict[str, Any] = data['headers']
  62. url = list(u for u in core.app.urls if self.remote_url != u)[0] + data['path']
  63. data['params']['nicegui_chunk_size'] = 1024
  64. request = self.client.build_request(
  65. data['method'],
  66. url,
  67. params=data['params'],
  68. headers=headers,
  69. )
  70. response = await self.streaming_client.send(request, stream=True)
  71. stream_id = str(uuid4())
  72. self.streams[stream_id] = Stream(data=response.aiter_bytes(), response=response)
  73. return {
  74. 'status_code': response.status_code,
  75. 'headers': response.headers.multi_items(),
  76. 'stream_id': stream_id,
  77. }
  78. @self.relay.on('read-stream')
  79. async def _handle_read_stream(stream_id: str) -> Optional[bytes]:
  80. try:
  81. return await self.streams[stream_id].data.__anext__()
  82. except StopAsyncIteration:
  83. await _handle_close_stream(stream_id)
  84. return None
  85. except Exception:
  86. await _handle_close_stream(stream_id)
  87. raise
  88. @self.relay.on('close-stream')
  89. async def _handle_close_stream(stream_id: str) -> None:
  90. await self.streams[stream_id].response.aclose()
  91. del self.streams[stream_id]
  92. @self.relay.on('ready')
  93. def _handle_ready(data: Dict[str, Any]) -> None:
  94. core.app.urls.add(data['device_url'])
  95. self.remote_url = data['device_url']
  96. if core.app.config.show_welcome_message:
  97. print(f'NiceGUI is on air at {data["device_url"]}', flush=True)
  98. @self.relay.on('error')
  99. def _handleerror(data: Dict[str, Any]) -> None:
  100. print('Error:', data['message'], flush=True)
  101. @self.relay.on('handshake')
  102. def _handle_handshake(data: Dict[str, Any]) -> bool:
  103. client_id = data['client_id']
  104. if client_id not in Client.instances:
  105. return False
  106. client = Client.instances[client_id]
  107. client.environ = data['environ']
  108. client.on_air = True
  109. client.handle_handshake()
  110. return True
  111. @self.relay.on('client_disconnect')
  112. def _handle_disconnect(data: Dict[str, Any]) -> None:
  113. client_id = data['client_id']
  114. if client_id not in Client.instances:
  115. return
  116. Client.instances[client_id].handle_disconnect()
  117. @self.relay.on('event')
  118. def _handle_event(data: Dict[str, Any]) -> None:
  119. client_id = data['client_id']
  120. if client_id not in Client.instances:
  121. return
  122. client = Client.instances[client_id]
  123. if data['msg']['args'] and data['msg']['args'][0].startswith('{"socket_id":'):
  124. args = json.loads(data['msg']['args'][0])
  125. args['socket_id'] = client_id # HACK: translate socket_id of ui.scene's init event
  126. data['msg']['args'][0] = json.dumps(args)
  127. client.handle_event(data['msg'])
  128. @self.relay.on('javascript_response')
  129. def _handle_javascript_response(data: Dict[str, Any]) -> None:
  130. client_id = data['client_id']
  131. if client_id not in Client.instances:
  132. return
  133. client = Client.instances[client_id]
  134. client.handle_javascript_response(data['msg'])
  135. @self.relay.on('out_of_time')
  136. async def _handle_out_of_time() -> None:
  137. print('Sorry, you have reached the time limit of this NiceGUI On Air preview.', flush=True)
  138. await self.connect()
  139. @self.relay.on('reconnect')
  140. async def _handle_reconnect(_: Dict[str, Any]) -> None:
  141. await self.connect()
  142. async def connect(self) -> None:
  143. """Connect to the NiceGUI On Air server."""
  144. # ensure that the connection is closed when the process is terminated
  145. signal.signal(signal.SIGINT, disconnect)
  146. signal.signal(signal.SIGTERM, disconnect)
  147. if self.connecting:
  148. return
  149. self.connecting = True
  150. backoff_time = 1
  151. while True:
  152. try:
  153. if self.relay.connected:
  154. await self.relay.disconnect()
  155. await self.relay.connect(
  156. f'{RELAY_HOST}?device_token={self.token}',
  157. socketio_path='/on_air/socket.io',
  158. transports=['websocket', 'polling'], # favor websocket over polling
  159. )
  160. break
  161. except socketio.exceptions.ConnectionError:
  162. pass
  163. except ValueError: # NOTE this sometimes happens when the internal socketio client is not yet ready
  164. await self.relay.disconnect()
  165. except Exception:
  166. log.exception('Could not connect to NiceGUI On Air server.')
  167. await asyncio.sleep(backoff_time)
  168. backoff_time = min(backoff_time * 2, 32)
  169. self.connecting = False
  170. async def disconnect(self) -> None:
  171. """Disconnect from the NiceGUI On Air server."""
  172. for stream in self.streams.values():
  173. await stream.response.aclose()
  174. self.streams.clear()
  175. await self.relay.disconnect()
  176. async def emit(self, message_type: str, data: Dict[str, Any], room: str) -> None:
  177. """Emit a message to the NiceGUI On Air server."""
  178. if self.relay.connected:
  179. await self.relay.emit('forward', {'event': message_type, 'data': data, 'room': room})
  180. @staticmethod
  181. def is_air_target(target_id: str) -> bool:
  182. """Whether the given target ID is an On Air client or a SocketIO room."""
  183. if target_id in Client.instances:
  184. return Client.instances[target_id].on_air
  185. return target_id in core.sio.manager.rooms
  186. def connect() -> None:
  187. """Connect to the NiceGUI On Air server if there is an air instance."""
  188. if core.air:
  189. background_tasks.create(core.air.connect())
  190. def disconnect() -> None:
  191. """Disconnect from the NiceGUI On Air server if there is an air instance."""
  192. if core.air:
  193. background_tasks.create(core.air.disconnect())