air.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import asyncio
  2. import gzip
  3. import re
  4. from typing import Any, Dict
  5. import httpx
  6. import socketio
  7. from socketio import AsyncClient
  8. from . import globals # pylint: disable=redefined-builtin
  9. from .nicegui import handle_disconnect, handle_event, handle_handshake, handle_javascript_response
  10. RELAY_HOST = 'https://on-air.nicegui.io/'
  11. class Air:
  12. def __init__(self, token: str) -> None:
  13. self.token = token
  14. self.relay = AsyncClient()
  15. self.client = httpx.AsyncClient(app=globals.app)
  16. self.connecting = False
  17. @self.relay.on('http')
  18. async def on_http(data: Dict[str, Any]) -> Dict[str, Any]:
  19. headers: Dict[str, Any] = data['headers']
  20. headers.update({'Accept-Encoding': 'identity', 'X-Forwarded-Prefix': data['prefix']})
  21. url = 'http://test' + data['path']
  22. request = self.client.build_request(
  23. data['method'],
  24. url,
  25. params=data['params'],
  26. headers=headers,
  27. content=data['body'],
  28. )
  29. response = await self.client.send(request)
  30. instance_id = data['instance-id']
  31. content = response.content.replace(
  32. b'const extraHeaders = {};',
  33. (f'const extraHeaders = {{ "fly-force-instance-id" : "{instance_id}" }};').encode(),
  34. )
  35. match = re.search(b'const query = ({.*?})', content)
  36. if match:
  37. new_js_object = match.group(1).decode().rstrip('}') + ", 'fly_instance_id' : '" + instance_id + "'}"
  38. content = content.replace(match.group(0), f'const query = {new_js_object}'.encode())
  39. compressed = gzip.compress(content)
  40. response.headers.update({'content-encoding': 'gzip', 'content-length': str(len(compressed))})
  41. return {
  42. 'status_code': response.status_code,
  43. 'headers': response.headers.multi_items(),
  44. 'content': compressed,
  45. }
  46. @self.relay.on('ready')
  47. def on_ready(data: Dict[str, Any]) -> None:
  48. globals.app.urls.add(data['device_url'])
  49. print(f'NiceGUI is on air at {data["device_url"]}', flush=True)
  50. @self.relay.on('error')
  51. def on_error(data: Dict[str, Any]) -> None:
  52. print('Error:', data['message'], flush=True)
  53. @self.relay.on('handshake')
  54. def on_handshake(data: Dict[str, Any]) -> bool:
  55. client_id = data['client_id']
  56. if client_id not in globals.clients:
  57. return False
  58. client = globals.clients[client_id]
  59. client.environ = data['environ']
  60. client.on_air = True
  61. handle_handshake(client)
  62. return True
  63. @self.relay.on('client_disconnect')
  64. def on_disconnect(data: Dict[str, Any]) -> None:
  65. client_id = data['client_id']
  66. if client_id not in globals.clients:
  67. return
  68. client = globals.clients[client_id]
  69. handle_disconnect(client)
  70. @self.relay.on('event')
  71. def on_event(data: Dict[str, Any]) -> None:
  72. client_id = data['client_id']
  73. if client_id not in globals.clients:
  74. return
  75. client = globals.clients[client_id]
  76. if isinstance(data['msg']['args'], dict) and 'socket_id' in data['msg']['args']:
  77. data['msg']['args']['socket_id'] = client_id # HACK: translate socket_id of ui.scene's init event
  78. handle_event(client, data['msg'])
  79. @self.relay.on('javascript_response')
  80. def on_javascript_response(data: Dict[str, Any]) -> None:
  81. client_id = data['client_id']
  82. if client_id not in globals.clients:
  83. return
  84. client = globals.clients[client_id]
  85. handle_javascript_response(client, data['msg'])
  86. @self.relay.on('out_of_time')
  87. async def on_move() -> None:
  88. print('Sorry, you have reached the time limit of this NiceGUI On Air preview.', flush=True)
  89. await self.connect()
  90. @self.relay.on('reconnect')
  91. async def on_reconnect(_: Dict[str, Any]) -> None:
  92. await self.connect()
  93. async def connect(self) -> None:
  94. if self.connecting:
  95. return
  96. self.connecting = True
  97. backoff_time = 1
  98. while True:
  99. try:
  100. if self.relay.connected:
  101. await self.relay.disconnect()
  102. await self.relay.connect(
  103. f'{RELAY_HOST}?device_token={self.token}',
  104. socketio_path='/on_air/socket.io',
  105. transports=['websocket', 'polling'], # favor websocket over polling
  106. )
  107. break
  108. except socketio.exceptions.ConnectionError:
  109. pass
  110. except ValueError: # NOTE this sometimes happens when the internal socketio client is not yet ready
  111. await self.relay.disconnect()
  112. except Exception:
  113. globals.log.exception('Could not connect to NiceGUI On Air server.')
  114. await asyncio.sleep(backoff_time)
  115. backoff_time = min(backoff_time * 2, 32)
  116. self.connecting = False
  117. async def disconnect(self) -> None:
  118. await self.relay.disconnect()
  119. async def emit(self, message_type: str, data: Dict[str, Any], room: str) -> None:
  120. if self.relay.connected:
  121. await self.relay.emit('forward', {'event': message_type, 'data': data, 'room': room})