Przeglądaj źródła

Make BetterReader buffer and cancel, to fix stdin data loss

BetterReader.read_with_cancel() returns both the read promise, and a
function that can be used to cancel the read. A cancelled read is
placed back into the BetterReader's chunk buffer, to be consumed by the
next user that requests a read.

This is used by Coupler so that when the coupler is closed, its pending
read() call does not consume the next batch of input.

This fixes the problem we were having with child applications consuming
one chunk of stdin after they are closed, meaning the first key you
press after an app exits would disappear.

Co-authored-by: KernelDeimos <eric.alex.dube@gmail.com>
Sam Atkins 1 rok temu
rodzic
commit
072dbe8db5

+ 108 - 12
packages/phoenix/packages/pty/exports.js

@@ -16,6 +16,8 @@
  * You should have received a copy of the GNU Affero General Public License
  * along with this program.  If not, see <https://www.gnu.org/licenses/>.
  */
+import { raceCase } from '../../src/promise.js';
+
 const encoder = new TextEncoder();
 
 const CHAR_LF = '\n'.charCodeAt(0);
@@ -27,29 +29,67 @@ export class BetterReader {
         this.chunks_ = [];
     }
 
-    async read (opt_buffer) {
+    _create_cancel_response () {
+        return {
+            chunk: null,
+            n_read: 0,
+            debug_meta: {
+                source: 'delegate',
+                returning: 'cancelled',
+                this_value_should_not_be_used: true,
+            },
+        };
+    }
+
+    async read_and_get_info (opt_buffer, cancel_state) {
         if ( ! opt_buffer && this.chunks_.length === 0 ) {
-            return await this.delegate.read();
+            const chunk = await this.delegate.read();
+            if ( cancel_state?.cancelled ) {
+                // push the chunk back onto the queue
+                this.chunks_.push(chunk.value);
+                return this._create_cancel_response();
+            }
+            return {
+                chunk,
+                debug_meta: { source: 'delegate' },
+            };
         }
 
         const chunk = await this.getChunk_();
+        if ( cancel_state?.cancelled ) {
+            // push the chunk back onto the queue
+            this.chunks_.push(chunk);
+            return this._create_cancel_response();
+        }
 
         if ( ! opt_buffer ) {
-            return chunk;
+            return { chunk, debug_meta: { source: 'stored chunks', returning: 'chunk' } };
+        }
+
+        if ( ! chunk ) {
+            return { n_read: 0, debug_meta: { source: 'nothing', returning: 'byte count' } };
         }
 
         this.chunks_.push(chunk);
 
         while ( this.getTotalBytesReady_() < opt_buffer.length ) {
-            this.chunks_.push(await this.getChunk_())
+            const read_chunk = await this.getChunk_();
+            if ( cancel_state?.cancelled ) {
+                // push the chunk back onto the queue
+                this.chunks_.push(read_chunk);
+                return this._create_cancel_response();
+            }
+            if ( ! read_chunk ) {
+                break;
+            }
+            this.chunks_.push(read_chunk);
         }
 
-        // TODO: need to handle EOT condition in this loop
         let offset = 0;
-        for (;;) {
+        while ( this.chunks_.length > 0 && offset < opt_buffer.length ) {
             let item = this.chunks_.shift();
             if ( item === undefined ) {
-                throw new Error('calculation is wrong')
+                break;
             }
             if ( offset + item.length > opt_buffer.length ) {
                 const diff = opt_buffer.length - offset;
@@ -58,17 +98,49 @@ export class BetterReader {
             }
             opt_buffer.set(item, offset);
             offset += item.length;
-
-            if ( offset == opt_buffer.length ) break;
         }
 
-        // return opt_buffer.length;
+        return {
+            n_read: offset,
+            debug_meta: { source: 'stored chunks', returning: 'byte count' },
+        };
+    }
+
+    read_with_cancel (opt_buffer) {
+        const cancel_state = { cancelled: false };
+        const promise = (async () => {
+            const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state);
+            return opt_buffer ? n_read : chunk;
+        })();
+        return {
+            canceller: () => {
+                cancel_state.cancelled = true;
+            },
+            promise,
+        };
+    }
+
+    async read (opt_buffer) {
+        const { chunk, n_read } = await this.read_and_get_info(opt_buffer);
+        return opt_buffer ? n_read : chunk;
     }
 
     async getChunk_() {
         if ( this.chunks_.length === 0 ) {
-            const { value } = await this.delegate.read();
-            return value;
+            // Wait for either a delegate read to happen, or for a chunk to be added to the buffer from a cancelled read.
+            const delegate_read = this.delegate.read();
+            const [which, result] = await raceCase({
+                delegate: delegate_read,
+                buffer_not_empty: this.waitUntilDataAvailable(),
+            });
+            if (which === 'delegate') {
+                return result.value;
+            }
+            // There's a chunk in the buffer now, so we can use the regular path.
+            // But first, make sure that once the delegate read completes, we save the chunk.
+            delegate_read.then((chunk) => {
+                this.chunks_.push(chunk.value);
+            })
         }
 
         const len = this.getTotalBytesReady_();
@@ -87,6 +159,30 @@ export class BetterReader {
     getTotalBytesReady_ () {
         return this.chunks_.reduce((sum, chunk) => sum + chunk.length, 0);
     }
+
+    canRead() {
+        return this.getTotalBytesReady_() > 0;
+    }
+
+    async waitUntilDataAvailable() {
+        let resolve_promise;
+        let reject_promise;
+        const promise = new Promise((resolve, reject) => {
+            resolve_promise = resolve;
+            reject_promise = reject;
+        });
+
+        const check = () => {
+            if (this.canRead()) {
+                resolve_promise();
+            } else {
+                setTimeout(check, 0);
+            }
+        };
+        setTimeout(check, 0);
+
+        await promise;
+    }
 }
 
 /**

+ 15 - 6
packages/phoenix/src/ansi-shell/pipeline/Coupler.js

@@ -16,7 +16,7 @@
  * You should have received a copy of the GNU Affero General Public License
  * along with this program.  If not, see <https://www.gnu.org/licenses/>.
  */
-import { TeePromise } from "../../promise.js";
+import { TeePromise, raceCase } from "../../promise.js";
 
 export class Coupler {
     static description = `
@@ -40,7 +40,6 @@ export class Coupler {
 
     close () {
         this.closed_.resolve({
-            value: undefined,
             done: true,
         });
     }
@@ -48,11 +47,21 @@ export class Coupler {
     async listenLoop_ () {
         this.active = true;
         for (;;) {
-            const { value, done } = await Promise.race([
-                this.closed_,
-                this.source.read(),
-            ]);
+            let canceller = () => {};
+            let promise;
+            if ( this.source.read_with_cancel !== undefined ) {
+                ({ canceller, promise } = this.source.read_with_cancel());
+            } else {
+                promise = this.source.read();
+            }
+            const [which, { value, done }] = await raceCase({
+                source: promise,
+                closed: this.closed_,
+            });
             if ( done ) {
+                if ( which === 'closed' ) {
+                    canceller();
+                }
                 this.source = null;
                 this.target = null;
                 this.active = false;

+ 14 - 0
packages/phoenix/src/promise.js

@@ -41,3 +41,17 @@ export class TeePromise {
         return this.then(fn);
     }
 }
+
+/**
+ * raceCase is like Promise.race except it takes an object instead of
+ * an array, and returns the key of the promise that resolves first
+ * as well as the value that it resolved to.
+ * 
+ * @param {Object.<string, Promise>} promise_map 
+ * 
+ * @returns {Promise.<[string, any]>}
+ */
+export const raceCase = async (promise_map) => {
+    return Promise.race(Object.entries(promise_map).map(
+        ([key, promise]) => promise.then(value => [key, value])));
+};