|
@@ -2,7 +2,9 @@ import asyncio
|
|
import gzip
|
|
import gzip
|
|
import json
|
|
import json
|
|
import re
|
|
import re
|
|
-from typing import Any, Dict, Optional
|
|
|
|
|
|
+from dataclasses import dataclass
|
|
|
|
+from typing import Any, AsyncIterator, Dict, Optional
|
|
|
|
+from uuid import uuid4
|
|
|
|
|
|
import httpx
|
|
import httpx
|
|
import socketio
|
|
import socketio
|
|
@@ -10,18 +12,28 @@ import socketio.exceptions
|
|
|
|
|
|
from . import background_tasks, core
|
|
from . import background_tasks, core
|
|
from .client import Client
|
|
from .client import Client
|
|
|
|
+from .dataclasses import KWONLY_SLOTS
|
|
from .logging import log
|
|
from .logging import log
|
|
|
|
|
|
RELAY_HOST = 'https://on-air.nicegui.io/'
|
|
RELAY_HOST = 'https://on-air.nicegui.io/'
|
|
|
|
|
|
|
|
|
|
|
|
+@dataclass(**KWONLY_SLOTS)
|
|
|
|
+class Stream:
|
|
|
|
+ data: AsyncIterator[bytes]
|
|
|
|
+ response: httpx.Response
|
|
|
|
+
|
|
|
|
+
|
|
class Air:
|
|
class Air:
|
|
|
|
|
|
def __init__(self, token: str) -> None:
|
|
def __init__(self, token: str) -> None:
|
|
self.token = token
|
|
self.token = token
|
|
self.relay = socketio.AsyncClient()
|
|
self.relay = socketio.AsyncClient()
|
|
self.client = httpx.AsyncClient(app=core.app)
|
|
self.client = httpx.AsyncClient(app=core.app)
|
|
|
|
+ self.streaming_client = httpx.AsyncClient()
|
|
self.connecting = False
|
|
self.connecting = False
|
|
|
|
+ self.streams: Dict[str, Stream] = {}
|
|
|
|
+ self.remote_url: Optional[str] = None
|
|
|
|
|
|
@self.relay.on('http')
|
|
@self.relay.on('http')
|
|
async def _handle_http(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
async def _handle_http(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
@@ -53,9 +65,46 @@ class Air:
|
|
'content': compressed,
|
|
'content': compressed,
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @self.relay.on('range-request')
|
|
|
|
+ async def _handle_range_request(data: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
|
+ headers: Dict[str, Any] = data['headers']
|
|
|
|
+ url = list(u for u in core.app.urls if self.remote_url != u)[0] + data['path']
|
|
|
|
+ data['params']['nicegui_chunk_size'] = 1024
|
|
|
|
+ request = self.client.build_request(
|
|
|
|
+ data['method'],
|
|
|
|
+ url,
|
|
|
|
+ params=data['params'],
|
|
|
|
+ headers=headers,
|
|
|
|
+ )
|
|
|
|
+ response = await self.streaming_client.send(request, stream=True)
|
|
|
|
+ stream_id = str(uuid4())
|
|
|
|
+ self.streams[stream_id] = Stream(data=response.aiter_bytes(), response=response)
|
|
|
|
+ return {
|
|
|
|
+ 'status_code': response.status_code,
|
|
|
|
+ 'headers': response.headers.multi_items(),
|
|
|
|
+ 'stream_id': stream_id,
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @self.relay.on('read-stream')
|
|
|
|
+ async def _handle_read_stream(stream_id: str) -> Optional[bytes]:
|
|
|
|
+ try:
|
|
|
|
+ return await self.streams[stream_id].data.__anext__()
|
|
|
|
+ except StopAsyncIteration:
|
|
|
|
+ await _handle_close_stream(stream_id)
|
|
|
|
+ return None
|
|
|
|
+ except Exception:
|
|
|
|
+ await _handle_close_stream(stream_id)
|
|
|
|
+ raise
|
|
|
|
+
|
|
|
|
+ @self.relay.on('close-stream')
|
|
|
|
+ async def _handle_close_stream(stream_id: str) -> None:
|
|
|
|
+ await self.streams[stream_id].response.aclose()
|
|
|
|
+ del self.streams[stream_id]
|
|
|
|
+
|
|
@self.relay.on('ready')
|
|
@self.relay.on('ready')
|
|
def _handle_ready(data: Dict[str, Any]) -> None:
|
|
def _handle_ready(data: Dict[str, Any]) -> None:
|
|
core.app.urls.add(data['device_url'])
|
|
core.app.urls.add(data['device_url'])
|
|
|
|
+ self.remote_url = data['device_url']
|
|
if core.app.config.show_welcome_message:
|
|
if core.app.config.show_welcome_message:
|
|
print(f'NiceGUI is on air at {data["device_url"]}', flush=True)
|
|
print(f'NiceGUI is on air at {data["device_url"]}', flush=True)
|
|
|
|
|
|
@@ -139,6 +188,9 @@ class Air:
|
|
|
|
|
|
async def disconnect(self) -> None:
|
|
async def disconnect(self) -> None:
|
|
"""Disconnect from the NiceGUI On Air server."""
|
|
"""Disconnect from the NiceGUI On Air server."""
|
|
|
|
+ for stream in self.streams.values():
|
|
|
|
+ await stream.response.aclose()
|
|
|
|
+ self.streams.clear()
|
|
await self.relay.disconnect()
|
|
await self.relay.disconnect()
|
|
|
|
|
|
async def emit(self, message_type: str, data: Dict[str, Any], room: str) -> None:
|
|
async def emit(self, message_type: str, data: Dict[str, Any], room: str) -> None:
|