Browse Source

Add more streaming fixes

KernelDeimos 1 year ago
parent
commit
c3654ab148

+ 237 - 18
packages/phoenix/packages/pty/exports.js

@@ -16,19 +16,145 @@
  * You should have received a copy of the GNU Affero General Public License
  * along with this program.  If not, see <https://www.gnu.org/licenses/>.
  */
-import { raceCase } from '../../src/promise.js';
+import { TeePromise, raceCase } from '../../src/promise.js';
 
 const encoder = new TextEncoder();
 
 const CHAR_LF = '\n'.charCodeAt(0);
 const CHAR_CR = '\r'.charCodeAt(0);
 
+const DONE = Symbol('done');
+
+class Channel {
+    constructor () {
+        this.chunks_ = [];
+
+        globalThis.chnl = this;
+
+        const events = ['write','consume','change'];
+        for ( const event of events ) {
+            this[`on_${event}_`] = [];
+            this[`emit_${event}_`] = () => {
+                for ( const listener of this[`on_${event}_`] ) {
+                    listener();
+                }
+            };
+        }
+
+        this.on('write', () => { this.emit_change_(); });
+        this.on('consume', () => { this.emit_change_(); });
+    }
+
+    on (event, listener) {
+        this[`on_${event}_`].push(listener);
+    }
+
+    off (event, listener) {
+        const index = this[`on_${event}_`].indexOf(listener);
+        if ( index !== -1 ) {
+            this[`on_${event}_`].splice(index, 1);
+        }
+    }
+
+    get () {
+        const cancel = new TeePromise();
+        const data = new TeePromise();
+        const done = new TeePromise();
+
+        let called = 0;
+
+        const on_data = () => {
+            if ( this.chunks_.length > 0 ) {
+                if ( called > 0 ) {
+                    throw new Error('called more than once');
+                }
+                called++;
+                const chunk = this.chunks_.shift();
+                console.log('shifted off chunk', chunk);
+                ( chunk === DONE ? done : data ).resolve(chunk);
+                this.off('write', on_data);
+                this.emit_consume_();
+            }
+        };
+
+        console.log('this case', this.chunks_.length);
+
+        this.on('write', on_data);
+        on_data();
+
+        const to_return = {
+            cancel: () => {
+                console.log('cancel called');
+                this.off('write', on_data);
+                cancel.resolve();
+            },
+            promise: raceCase({
+                cancel,
+                data,
+                done,
+            }),
+        };
+
+        console.log('to_return?', to_return);
+        return to_return;
+    }
+
+    write (chunk) {
+        this.chunks_.push(chunk);
+        this.emit_write_();
+    }
+
+    pushback (...chunks) {
+        console.log('pushing back...', chunks)
+        for ( let i = chunks.length - 1; i >= 0; i-- ) {
+            console.log('unshifting ', i, chunks[i]);
+            console.log('chunks_ before unshift', this.chunks_.length);
+            this.chunks_.unshift(chunks[i]);
+            console.log('chunks_ after unshift', this.chunks_.length);
+
+        }
+        this.emit_write_();
+    }
+
+    is_empty () {
+        return this.chunks_.length === 0;
+    }
+}
+
 export class BetterReader {
     constructor ({ delegate }) {
         this.delegate = delegate;
         this.chunks_ = [];
+        this.channel_ = new Channel();
+
+        this._init();
+    }
+
+    _init () {
+        let working = Promise.resolve();
+        this.channel_.on('consume', async () => {
+            await working;
+            working = new TeePromise();
+            if ( this.channel_.is_empty() ) {
+                await this.intake_();
+            }
+            working.resolve();
+        });
+        this.intake_();
     }
 
+    async intake_ () {
+        const { value, done } = await this.delegate.read();
+        if ( done ) {
+        console.log('writing to channel <DONE>');
+            this.channel_.write(DONE);
+            return;
+        }
+        console.log('writing to channel', value);
+        this.channel_.write(value);
+    }
+
+
     _create_cancel_response () {
         return {
             chunk: null,
@@ -41,12 +167,75 @@ export class BetterReader {
         };
     }
 
-    async read_and_get_info (opt_buffer, cancel_state) {
+    read_and_get_info (opt_buffer, cancel_state) {
+        if ( ! opt_buffer ) {
+            const { promise, cancel } = this.channel_.get();
+            return {
+                cancel,
+                promise: promise.then(([which, chunk]) => {
+                    if ( which !== 'data' ) {
+                        return { done: true, value: null };
+                    }
+                    return { value: chunk };
+                }),
+
+            };
+        }
+        console.log('!!!');
+
+        const final_promise = new TeePromise();
+        let current_cancel_ = () => {};
+
+        (async () => {
+            console.log('STARTING BUFFER READ');
+            let n_read = 0;
+            const chunks = [];
+            while ( n_read < opt_buffer.length ) {
+                const { promise, cancel } = this.channel_.get();
+                current_cancel_ = cancel;
+
+                let [which, chunk] = await promise;
+                console.log('which', which, 'chunk', chunk)
+                if ( which === 'done' ) {
+                    break;
+                }
+                if ( which === 'cancel' ) {
+                    this.channel_.pushback(...chunks);
+                    return 
+                }
+                if ( n_read + chunk.length > opt_buffer.length ) {
+                    const diff = opt_buffer.length - n_read;
+                    this.channel_.pushback(chunk.subarray(diff));
+                    chunk = chunk.subarray(0, diff);
+                }
+                chunks.push(chunk);
+                console.log('calling set', chunk, n_read, opt_buffer.length);
+                opt_buffer.set(chunk, n_read);
+                n_read += chunk.length;
+            }
+
+            console.log('RESOLVING', opt_buffer);
+            console.log('-- and channel?', this.channel_.chunks_.length);
+
+            final_promise.resolve({ n_read });
+        })();
+
+        return {
+            cancel: () => {
+                current_cancel_();
+            },
+            promise: final_promise,
+        };
+
+        // --- everything below this line is being removed ---
+
+        /*
         if ( ! opt_buffer && this.chunks_.length === 0 ) {
             const chunk = await this.delegate.read();
             if ( cancel_state?.cancelled ) {
                 // push the chunk back onto the queue
-                this.chunks_.push(chunk.value);
+                console.log('aaa', chunk);
+                this.chunks_.push(chunk);
                 return this._create_cancel_response();
             }
             return {
@@ -56,8 +245,10 @@ export class BetterReader {
         }
 
         const chunk = await this.getChunk_();
+        console.log('what we got', chunk);
         if ( cancel_state?.cancelled ) {
             // push the chunk back onto the queue
+            console.log('bbb', chunk);
             this.chunks_.push(chunk);
             return this._create_cancel_response();
         }
@@ -70,30 +261,49 @@ export class BetterReader {
             return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } };
         }
 
+        console.log('ccc', chunk);
         this.chunks_.push(chunk);
 
+        let itermax = 20;
         while ( this.getTotalBytesReady_() < opt_buffer.length ) {
+            if ( --itermax < 0 ) {
+                throw new Error('too many iterations');
+            }
+            console.log('iter b');
             const read_chunk = await this.getChunk_();
             if ( cancel_state?.cancelled ) {
                 // push the chunk back onto the queue
+                console.log('ddd', chunk);
                 this.chunks_.push(read_chunk);
                 return this._create_cancel_response();
             }
             if ( ! read_chunk ) {
                 break;
             }
+            console.log('adding chunk', read_chunk)
             this.chunks_.push(read_chunk);
         }
 
         let offset = 0;
         while ( this.chunks_.length > 0 && offset < opt_buffer.length ) {
+            console.log('iter a')
             let item = this.chunks_.shift();
             if ( item === undefined ) {
+                console.log('undefined <A>', this.chunks_);
+                break;
+            }
+            if ( item.value === undefined ) {
+                console.log('undefined <B>', item, this.chunks_);
                 break;
             }
+            const is_done = item.done;
+            item = item.value;
             if ( offset + item.length > opt_buffer.length ) {
                 const diff = opt_buffer.length - offset;
-                this.chunks_.unshift(item.subarray(diff));
+                this.chunks_.unshift({
+                    done: is_done,
+                    value: item.subarray(diff),
+                });
                 item = item.subarray(0, diff);
             }
             opt_buffer.set(item, offset);
@@ -104,24 +314,28 @@ export class BetterReader {
             n_read: offset,
             debug_meta: { source: 'stored chunks', returning: 'byte count' },
         };
+        /**/
     }
 
     read_with_cancel (opt_buffer) {
-        const cancel_state = { cancelled: false };
-        const promise = (async () => {
-            const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state);
-            return opt_buffer ? n_read : chunk;
-        })();
+        console.log('read with cancel called');
+        const o = this.read_and_get_info(opt_buffer);
+        const { cancel, promise } = o;
+        promise.then(v => {
+            console.log('promise resolved', v);
+        });
+        // const promise = (async () => {
+        //     const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state);
+        //     return opt_buffer ? n_read : chunk;
+        // })();
         return {
-            canceller: () => {
-                cancel_state.cancelled = true;
-            },
+            cancel,
             promise,
         };
     }
 
     async read (opt_buffer) {
-        const { chunk, n_read } = await this.read_and_get_info(opt_buffer);
+        const { chunk, n_read } = await this.read_and_get_info(opt_buffer).promise;
         return opt_buffer ? n_read : chunk;
     }
 
@@ -133,17 +347,19 @@ export class BetterReader {
                 delegate: delegate_read,
                 buffer_not_empty: this.waitUntilDataAvailable(),
             });
+            console.log('which?', which);
             if (which === 'delegate') {
-                return result.value;
+                return result;
             }
+
             // There's a chunk in the buffer now, so we can use the regular path.
             // But first, make sure that once the delegate read completes, we save the chunk.
-            delegate_read.then((chunk) => {
-                this.chunks_.push(chunk.value);
-            })
+            console.log('result', result)
+            this.chunks_.push(result);
         }
 
         const len = this.getTotalBytesReady_();
+        console.log('len', len);
         const merged = new Uint8Array(len);
         let offset = 0;
         for ( const item of this.chunks_ ) {
@@ -157,7 +373,10 @@ export class BetterReader {
     }
 
     getTotalBytesReady_ () {
-        return this.chunks_.reduce((sum, chunk) => sum + chunk.length, 0);
+        return this.chunks_.reduce((sum, chunk) => {
+            console.log('sum', sum, 'chunk', chunk);
+            return sum + chunk.value.length
+        }, 0);
     }
 
     canRead() {

+ 7 - 4
packages/phoenix/src/ansi-shell/pipeline/Coupler.js

@@ -47,20 +47,22 @@ export class Coupler {
     async listenLoop_ () {
         this.active = true;
         for (;;) {
-            let canceller = () => {};
+            let cancel = () => {};
             let promise;
             if ( this.source.read_with_cancel !== undefined ) {
-                ({ canceller, promise } = this.source.read_with_cancel());
+                ({ cancel, promise } = this.source.read_with_cancel());
             } else {
                 promise = this.source.read();
             }
-            const [which, { value, done }] = await raceCase({
+            const [which, result] = await raceCase({
                 source: promise,
                 closed: this.closed_,
             });
+            console.log('result?', which, result);
+            const { value, done } = result;
             if ( done ) {
                 if ( which === 'closed' ) {
-                    canceller();
+                    cancel();
                 }
                 this.source = null;
                 this.target = null;
@@ -69,6 +71,7 @@ export class Coupler {
                 break;
             }
             if ( this.on_ ) {
+                if ( ! value ) debugger;
                 await this.target.write(value);
             }
         }

+ 7 - 0
packages/phoenix/src/ansi-shell/pipeline/Pipeline.js

@@ -285,7 +285,9 @@ export class PreparedCommand {
 
         let exit_code = 0;
         try {
+            console.log(`awaiting execute for ${command.name}`)
             await execute(ctx);
+            console.log(`DONE execute for ${command.name}`)
             valve.close();
         } catch (e) {
             if ( e instanceof Exit ) {
@@ -304,10 +306,12 @@ export class PreparedCommand {
                 );
                 ctx.locals.exit = -1;
             }
+            if ( ! (e instanceof Exit) ) console.error(e);
         }
 
         // ctx.externs.in?.close?.();
         // ctx.externs.out?.close?.();
+        console.log(`calling close for ${command.name}`, ctx.externs.out);
         await ctx.externs.out.close();
 
         // TODO: need write command from puter-shell before this can be done
@@ -382,8 +386,11 @@ export class Pipeline {
             const command = preparedCommands[i];
             commandPromises.push(command.execute());
         }
+        console.log('command promises', commandPromises);
         await Promise.all(commandPromises);
 
+        console.log('|AWAIT COUPLER');
         await coupler.isDone;
+        console.log('|DONE AWAIT COUPLER');
     }
 }

+ 1 - 0
packages/phoenix/src/ansi-shell/readline/readline.js

@@ -58,6 +58,7 @@ const ReadlineProcessorBuilder = builder => builder
 
         const byteBuffer = new Uint8Array(1);
         await externs.in_.read(byteBuffer);
+        console.log('got a byte!', byteBuffer[0]);
         locals.byteBuffer = byteBuffer;
         locals.byte = byteBuffer[0];
     })