Просмотр исходного кода

Add a valve and internal pipe to commands

KernelDeimos 1 год назад
Родитель
Сommit
da208e23f5

+ 14 - 1
packages/phoenix/src/ansi-shell/pipeline/Coupler.js

@@ -16,6 +16,8 @@
  * You should have received a copy of the GNU Affero General Public License
  * You should have received a copy of the GNU Affero General Public License
  * along with this program.  If not, see <https://www.gnu.org/licenses/>.
  * along with this program.  If not, see <https://www.gnu.org/licenses/>.
  */
  */
+import { TeePromise } from "../../promise.js";
+
 export class Coupler {
 export class Coupler {
     static description = `
     static description = `
         Connects a read stream to a write stream.
         Connects a read stream to a write stream.
@@ -26,6 +28,7 @@ export class Coupler {
         this.source = source;
         this.source = source;
         this.target = target;
         this.target = target;
         this.on_ = true;
         this.on_ = true;
+        this.closed_ = new TeePromise();
         this.isDone = new Promise(rslv => {
         this.isDone = new Promise(rslv => {
             this.resolveIsDone = rslv;
             this.resolveIsDone = rslv;
         })
         })
@@ -35,10 +38,20 @@ export class Coupler {
     off () { this.on_ = false; }
     off () { this.on_ = false; }
     on () { this.on_ = true; }
     on () { this.on_ = true; }
 
 
+    close () {
+        this.closed_.resolve({
+            value: undefined,
+            done: true,
+        });
+    }
+
     async listenLoop_ () {
     async listenLoop_ () {
         this.active = true;
         this.active = true;
         for (;;) {
         for (;;) {
-            const { value, done } = await this.source.read();
+            const { value, done } = await Promise.race([
+                this.closed_,
+                this.source.read(),
+            ]);
             if ( done ) {
             if ( done ) {
                 this.source = null;
                 this.source = null;
                 this.target = null;
                 this.target = null;

+ 5 - 0
packages/phoenix/src/ansi-shell/pipeline/Pipeline.js

@@ -202,6 +202,10 @@ export class PreparedCommand {
             in_ = new MemReader(response);
             in_ = new MemReader(response);
         }
         }
 
 
+        const internal_input_pipe = new Pipe();
+        const valve = new Coupler(in_, internal_input_pipe.in);
+        in_ = internal_input_pipe.out;
+
         // simple naive implementation for now
         // simple naive implementation for now
         const sig = {
         const sig = {
             listeners_: [],
             listeners_: [],
@@ -297,6 +301,7 @@ export class PreparedCommand {
         let exit_code = 0;
         let exit_code = 0;
         try {
         try {
             await execute(ctx);
             await execute(ctx);
+            valve.close();
         } catch (e) {
         } catch (e) {
             if ( e instanceof Exit ) {
             if ( e instanceof Exit ) {
                 exit_code = e.code;
                 exit_code = e.code;

+ 43 - 0
packages/phoenix/src/promise.js

@@ -0,0 +1,43 @@
+export class TeePromise {
+    static STATUS_PENDING = Symbol('pending');
+    static STATUS_RUNNING = {};
+    static STATUS_DONE = Symbol('done');
+    constructor () {
+        this.status_ = this.constructor.STATUS_PENDING;
+        this.donePromise = new Promise((resolve, reject) => {
+            this.doneResolve = resolve;
+            this.doneReject = reject;
+        });
+    }
+    get status () {
+        return this.status_;
+    }
+    set status (status) {
+        this.status_ = status;
+        if ( status === this.constructor.STATUS_DONE ) {
+            this.doneResolve();
+        }
+    }
+    resolve (value) {
+        this.status_ = this.constructor.STATUS_DONE;
+        this.doneResolve(value);
+    }
+    awaitDone () {
+        return this.donePromise;
+    }
+    then (fn, ...a) {
+        return this.donePromise.then(fn, ...a);
+    }
+
+    reject (err) {
+        this.status_ = this.constructor.STATUS_DONE;
+        this.doneReject(err);
+    }
+
+    /**
+     * @deprecated use then() instead
+     */
+    onComplete(fn) {
+        return this.then(fn);
+    }
+}