瀏覽代碼

feat: add streaming to XHR driver client

KernelDeimos 9 月之前
父節點
當前提交
7600d9b07c
共有 1 個文件被更改,包括 67 次插入0 次删除
  1. 67 0
      src/puter-js/src/lib/utils.js

+ 67 - 0
src/puter-js/src/lib/utils.js

@@ -268,9 +268,76 @@ async function driverCall_(
     if ( settings.responseType ) {
         xhr.responseType = settings.responseType;
     }
+    
+    // ===============================================
+    // TO UNDERSTAND THIS CODE, YOU MUST FIRST
+    // UNDERSTAND THE FOLLOWING TEXT:
+    //
+    // Everything between here and the comment reading
+    // "=== END OF STREAMING ===" is ONLY for handling
+    // requests with content type "application/x-ndjson"
+    // ===============================================
+    
+    let is_stream = false;
+    let got_headers = false;
+    let signal_stream_update = null;
+    let lastLength = 0;
+    let response_complete = false;
+    const parts_received = [];
+    xhr.onreadystatechange = () => {
+        if ( got_headers ) return;
+        got_headers = true;
+        if ( xhr.readyState >= 2 ) {
+            if ( xhr.getResponseHeader("Content-Type") !==
+                'application/x-ndjson'
+            ) return;
+            is_stream = true;
+            const Stream = async function* Stream () {
+                while ( ! response_complete ) {
+                    const tp = new TeePromise();
+                    signal_stream_update = tp.resolve.bind(tp);
+                    await tp;
+                    if ( response_complete ) break;
+                    while ( parts_received.length > 0 ) {
+                        const value = parts_received.pop();
+                        const parts = value.split('\n');
+                        for ( const part of parts ) {
+                            if ( part.trim() === '' ) continue;
+                            yield JSON.parse(part);
+                        }
+                    }
+                }
+            }
+        
+            return resolve_func(Stream());
+        }
+    };
+
+    xhr.onprogress = function() {
+        if ( ! signal_stream_update ) return;
+
+        const newText = xhr.responseText.slice(lastLength);
+        lastLength = xhr.responseText.length; // Update lastLength to the current length
+        
+        parts_received.push(newText);
+        signal_stream_update();
+    };
+    
+    xhr.addEventListener('load', () => {
+        response_complete = true;
+    });
+
+    // ========================
+    // === END OF STREAMING ===
+    // ========================
 
     // load: success or error
     xhr.addEventListener('load', async function(response){
+        response_complete = true;
+        if ( is_stream ) {
+            signal_stream_update?.();
+            return;
+        }
         const resp = await parseResponse(response.target);
         // HTTP Error - unauthorized
         if(response.status === 401 || resp?.code === "token_auth_failed"){