air.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import asyncio
  2. import gzip
  3. import re
  4. from typing import Any, Dict, Optional
  5. import httpx
  6. import socketio
  7. import socketio.exceptions
  8. from . import background_tasks, core
  9. from .client import Client
  10. from .logging import log
  11. RELAY_HOST = 'https://on-air.nicegui.io/'
  12. class Air:
  13. def __init__(self, token: str) -> None:
  14. self.token = token
  15. self.relay = socketio.AsyncClient()
  16. self.client = httpx.AsyncClient(app=core.app)
  17. self.connecting = False
  18. @self.relay.on('http')
  19. async def _handle_http(data: Dict[str, Any]) -> Dict[str, Any]:
  20. headers: Dict[str, Any] = data['headers']
  21. headers.update({'Accept-Encoding': 'identity', 'X-Forwarded-Prefix': data['prefix']})
  22. url = 'http://test' + data['path']
  23. request = self.client.build_request(
  24. data['method'],
  25. url,
  26. params=data['params'],
  27. headers=headers,
  28. content=data['body'],
  29. )
  30. response = await self.client.send(request)
  31. instance_id = data['instance-id']
  32. content = response.content.replace(
  33. b'const extraHeaders = {};',
  34. (f'const extraHeaders = {{ "fly-force-instance-id" : "{instance_id}" }};').encode(),
  35. )
  36. match = re.search(b'const query = ({.*?})', content)
  37. if match:
  38. new_js_object = match.group(1).decode().rstrip('}') + ", 'fly_instance_id' : '" + instance_id + "'}"
  39. content = content.replace(match.group(0), f'const query = {new_js_object}'.encode())
  40. compressed = gzip.compress(content)
  41. response.headers.update({'content-encoding': 'gzip', 'content-length': str(len(compressed))})
  42. return {
  43. 'status_code': response.status_code,
  44. 'headers': response.headers.multi_items(),
  45. 'content': compressed,
  46. }
  47. @self.relay.on('ready')
  48. def _handle_ready(data: Dict[str, Any]) -> None:
  49. core.app.urls.add(data['device_url'])
  50. print(f'NiceGUI is on air at {data["device_url"]}', flush=True)
  51. @self.relay.on('error')
  52. def _handleerror(data: Dict[str, Any]) -> None:
  53. print('Error:', data['message'], flush=True)
  54. @self.relay.on('handshake')
  55. def _handle_handshake(data: Dict[str, Any]) -> bool:
  56. client_id = data['client_id']
  57. if client_id not in Client.instances:
  58. return False
  59. client = Client.instances[client_id]
  60. client.environ = data['environ']
  61. client.on_air = True
  62. client.handle_handshake()
  63. return True
  64. @self.relay.on('client_disconnect')
  65. def _handle_disconnect(data: Dict[str, Any]) -> None:
  66. client_id = data['client_id']
  67. if client_id not in Client.instances:
  68. return
  69. Client.instances[client_id].handle_disconnect()
  70. @self.relay.on('event')
  71. def _handle_event(data: Dict[str, Any]) -> None:
  72. client_id = data['client_id']
  73. if client_id not in Client.instances:
  74. return
  75. client = Client.instances[client_id]
  76. if isinstance(data['msg']['args'], dict) and 'socket_id' in data['msg']['args']:
  77. data['msg']['args']['socket_id'] = client_id # HACK: translate socket_id of ui.scene's init event
  78. client.handle_event(data['msg'])
  79. @self.relay.on('javascript_response')
  80. def _handle_javascript_response(data: Dict[str, Any]) -> None:
  81. client_id = data['client_id']
  82. if client_id not in Client.instances:
  83. return
  84. client = Client.instances[client_id]
  85. client.handle_javascript_response(data['msg'])
  86. @self.relay.on('out_of_time')
  87. async def _handle_out_of_time() -> None:
  88. print('Sorry, you have reached the time limit of this NiceGUI On Air preview.', flush=True)
  89. await self.connect()
  90. @self.relay.on('reconnect')
  91. async def _handle_reconnect(_: Dict[str, Any]) -> None:
  92. await self.connect()
  93. async def connect(self) -> None:
  94. """Connect to the NiceGUI On Air server."""
  95. if self.connecting:
  96. return
  97. self.connecting = True
  98. backoff_time = 1
  99. while True:
  100. try:
  101. if self.relay.connected:
  102. await self.relay.disconnect()
  103. await self.relay.connect(
  104. f'{RELAY_HOST}?device_token={self.token}',
  105. socketio_path='/on_air/socket.io',
  106. transports=['websocket', 'polling'], # favor websocket over polling
  107. )
  108. break
  109. except socketio.exceptions.ConnectionError:
  110. pass
  111. except ValueError: # NOTE this sometimes happens when the internal socketio client is not yet ready
  112. await self.relay.disconnect()
  113. except Exception:
  114. log.exception('Could not connect to NiceGUI On Air server.')
  115. await asyncio.sleep(backoff_time)
  116. backoff_time = min(backoff_time * 2, 32)
  117. self.connecting = False
  118. async def disconnect(self) -> None:
  119. """Disconnect from the NiceGUI On Air server."""
  120. await self.relay.disconnect()
  121. async def emit(self, message_type: str, data: Dict[str, Any], room: str) -> None:
  122. """Emit a message to the NiceGUI On Air server."""
  123. if self.relay.connected:
  124. await self.relay.emit('forward', {'event': message_type, 'data': data, 'room': room})
  125. @staticmethod
  126. def is_air_target(target_id: str) -> bool:
  127. """Whether the given target ID is an On Air client or a SocketIO room."""
  128. if target_id in Client.instances:
  129. return Client.instances[target_id].on_air
  130. return target_id in core.sio.manager.rooms
  131. instance: Optional[Air] = None
  132. def connect() -> None:
  133. """Connect to the NiceGUI On Air server if there is an air instance."""
  134. if instance:
  135. background_tasks.create(instance.connect())
  136. def disconnect() -> None:
  137. """Disconnect from the NiceGUI On Air server if there is an air instance."""
  138. if instance:
  139. background_tasks.create(instance.disconnect())