air.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import asyncio
  2. import gzip
  3. import json
  4. import re
  5. from typing import Any, Dict, Optional
  6. import httpx
  7. import socketio
  8. import socketio.exceptions
  9. from . import background_tasks, core
  10. from .client import Client
  11. from .logging import log
  12. RELAY_HOST = 'https://on-air.nicegui.io/'
  13. class Air:
  14. def __init__(self, token: str) -> None:
  15. self.token = token
  16. self.relay = socketio.AsyncClient()
  17. self.client = httpx.AsyncClient(app=core.app)
  18. self.connecting = False
  19. @self.relay.on('http')
  20. async def _handle_http(data: Dict[str, Any]) -> Dict[str, Any]:
  21. headers: Dict[str, Any] = data['headers']
  22. headers.update({'Accept-Encoding': 'identity', 'X-Forwarded-Prefix': data['prefix']})
  23. url = 'http://test' + data['path']
  24. request = self.client.build_request(
  25. data['method'],
  26. url,
  27. params=data['params'],
  28. headers=headers,
  29. content=data['body'],
  30. )
  31. response = await self.client.send(request)
  32. instance_id = data['instance-id']
  33. content = response.content.replace(
  34. b'const extraHeaders = {};',
  35. (f'const extraHeaders = {{ "fly-force-instance-id" : "{instance_id}" }};').encode(),
  36. )
  37. match = re.search(b'const query = ({.*?})', content)
  38. if match:
  39. new_js_object = match.group(1).decode().rstrip('}') + ", 'fly_instance_id' : '" + instance_id + "'}"
  40. content = content.replace(match.group(0), f'const query = {new_js_object}'.encode())
  41. compressed = gzip.compress(content)
  42. response.headers.update({'content-encoding': 'gzip', 'content-length': str(len(compressed))})
  43. return {
  44. 'status_code': response.status_code,
  45. 'headers': response.headers.multi_items(),
  46. 'content': compressed,
  47. }
  48. @self.relay.on('ready')
  49. def _handle_ready(data: Dict[str, Any]) -> None:
  50. core.app.urls.add(data['device_url'])
  51. print(f'NiceGUI is on air at {data["device_url"]}', flush=True)
  52. @self.relay.on('error')
  53. def _handleerror(data: Dict[str, Any]) -> None:
  54. print('Error:', data['message'], flush=True)
  55. @self.relay.on('handshake')
  56. def _handle_handshake(data: Dict[str, Any]) -> bool:
  57. client_id = data['client_id']
  58. if client_id not in Client.instances:
  59. return False
  60. client = Client.instances[client_id]
  61. client.environ = data['environ']
  62. client.on_air = True
  63. client.handle_handshake()
  64. return True
  65. @self.relay.on('client_disconnect')
  66. def _handle_disconnect(data: Dict[str, Any]) -> None:
  67. client_id = data['client_id']
  68. if client_id not in Client.instances:
  69. return
  70. Client.instances[client_id].handle_disconnect()
  71. @self.relay.on('event')
  72. def _handle_event(data: Dict[str, Any]) -> None:
  73. client_id = data['client_id']
  74. if client_id not in Client.instances:
  75. return
  76. client = Client.instances[client_id]
  77. if data['msg']['args'] and data['msg']['args'][0].startswith('{"socket_id":'):
  78. args = json.loads(data['msg']['args'][0])
  79. args['socket_id'] = client_id # HACK: translate socket_id of ui.scene's init event
  80. data['msg']['args'][0] = json.dumps(args)
  81. client.handle_event(data['msg'])
  82. @self.relay.on('javascript_response')
  83. def _handle_javascript_response(data: Dict[str, Any]) -> None:
  84. client_id = data['client_id']
  85. if client_id not in Client.instances:
  86. return
  87. client = Client.instances[client_id]
  88. client.handle_javascript_response(data['msg'])
  89. @self.relay.on('out_of_time')
  90. async def _handle_out_of_time() -> None:
  91. print('Sorry, you have reached the time limit of this NiceGUI On Air preview.', flush=True)
  92. await self.connect()
  93. @self.relay.on('reconnect')
  94. async def _handle_reconnect(_: Dict[str, Any]) -> None:
  95. await self.connect()
  96. async def connect(self) -> None:
  97. """Connect to the NiceGUI On Air server."""
  98. if self.connecting:
  99. return
  100. self.connecting = True
  101. backoff_time = 1
  102. while True:
  103. try:
  104. if self.relay.connected:
  105. await self.relay.disconnect()
  106. await self.relay.connect(
  107. f'{RELAY_HOST}?device_token={self.token}',
  108. socketio_path='/on_air/socket.io',
  109. transports=['websocket', 'polling'], # favor websocket over polling
  110. )
  111. break
  112. except socketio.exceptions.ConnectionError:
  113. pass
  114. except ValueError: # NOTE this sometimes happens when the internal socketio client is not yet ready
  115. await self.relay.disconnect()
  116. except Exception:
  117. log.exception('Could not connect to NiceGUI On Air server.')
  118. await asyncio.sleep(backoff_time)
  119. backoff_time = min(backoff_time * 2, 32)
  120. self.connecting = False
  121. async def disconnect(self) -> None:
  122. """Disconnect from the NiceGUI On Air server."""
  123. await self.relay.disconnect()
  124. async def emit(self, message_type: str, data: Dict[str, Any], room: str) -> None:
  125. """Emit a message to the NiceGUI On Air server."""
  126. if self.relay.connected:
  127. await self.relay.emit('forward', {'event': message_type, 'data': data, 'room': room})
  128. @staticmethod
  129. def is_air_target(target_id: str) -> bool:
  130. """Whether the given target ID is an On Air client or a SocketIO room."""
  131. if target_id in Client.instances:
  132. return Client.instances[target_id].on_air
  133. return target_id in core.sio.manager.rooms
  134. instance: Optional[Air] = None
  135. def connect() -> None:
  136. """Connect to the NiceGUI On Air server if there is an air instance."""
  137. if instance:
  138. background_tasks.create(instance.connect())
  139. def disconnect() -> None:
  140. """Disconnect from the NiceGUI On Air server if there is an air instance."""
  141. if instance:
  142. background_tasks.create(instance.disconnect())