|
@@ -16,59 +16,233 @@
|
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
*/
|
|
|
+import { TeePromise, raceCase } from '../../src/promise.js';
|
|
|
+
|
|
|
const encoder = new TextEncoder();
|
|
|
|
|
|
const CHAR_LF = '\n'.charCodeAt(0);
|
|
|
const CHAR_CR = '\r'.charCodeAt(0);
|
|
|
|
|
|
+const DONE = Symbol('done');
|
|
|
+
|
|
|
+class Channel {
|
|
|
+ constructor () {
|
|
|
+ this.chunks_ = [];
|
|
|
+
|
|
|
+ globalThis.chnl = this;
|
|
|
+
|
|
|
+ const events = ['write','consume','change'];
|
|
|
+ for ( const event of events ) {
|
|
|
+ this[`on_${event}_`] = [];
|
|
|
+ this[`emit_${event}_`] = () => {
|
|
|
+ for ( const listener of this[`on_${event}_`] ) {
|
|
|
+ listener();
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ this.on('write', () => { this.emit_change_(); });
|
|
|
+ this.on('consume', () => { this.emit_change_(); });
|
|
|
+ }
|
|
|
+
|
|
|
+ on (event, listener) {
|
|
|
+ this[`on_${event}_`].push(listener);
|
|
|
+ }
|
|
|
+
|
|
|
+ off (event, listener) {
|
|
|
+ const index = this[`on_${event}_`].indexOf(listener);
|
|
|
+ if ( index !== -1 ) {
|
|
|
+ this[`on_${event}_`].splice(index, 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ get () {
|
|
|
+ const cancel = new TeePromise();
|
|
|
+ const data = new TeePromise();
|
|
|
+ const done = new TeePromise();
|
|
|
+
|
|
|
+ let called = 0;
|
|
|
+
|
|
|
+ const on_data = () => {
|
|
|
+ if ( this.chunks_.length > 0 ) {
|
|
|
+ if ( called > 0 ) {
|
|
|
+ throw new Error('called more than once');
|
|
|
+ }
|
|
|
+ called++;
|
|
|
+ const chunk = this.chunks_.shift();
|
|
|
+ ( chunk === DONE ? done : data ).resolve(chunk);
|
|
|
+ this.off('write', on_data);
|
|
|
+ this.emit_consume_();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ this.on('write', on_data);
|
|
|
+ on_data();
|
|
|
+
|
|
|
+ const to_return = {
|
|
|
+ cancel: () => {
|
|
|
+ this.off('write', on_data);
|
|
|
+ cancel.resolve();
|
|
|
+ },
|
|
|
+ promise: raceCase({
|
|
|
+ cancel,
|
|
|
+ data,
|
|
|
+ done,
|
|
|
+ }),
|
|
|
+ };
|
|
|
+
|
|
|
+ return to_return;
|
|
|
+ }
|
|
|
+
|
|
|
+ write (chunk) {
|
|
|
+ this.chunks_.push(chunk);
|
|
|
+ this.emit_write_();
|
|
|
+ }
|
|
|
+
|
|
|
+ pushback (...chunks) {
|
|
|
+ for ( let i = chunks.length - 1; i >= 0; i-- ) {
|
|
|
+ this.chunks_.unshift(chunks[i]);
|
|
|
+
|
|
|
+ }
|
|
|
+ this.emit_write_();
|
|
|
+ }
|
|
|
+
|
|
|
+ is_empty () {
|
|
|
+ return this.chunks_.length === 0;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
export class BetterReader {
|
|
|
constructor ({ delegate }) {
|
|
|
this.delegate = delegate;
|
|
|
this.chunks_ = [];
|
|
|
+ this.channel_ = new Channel();
|
|
|
+
|
|
|
+ this._init();
|
|
|
}
|
|
|
|
|
|
- async read (opt_buffer) {
|
|
|
- if ( ! opt_buffer && this.chunks_.length === 0 ) {
|
|
|
- return await this.delegate.read();
|
|
|
+ _init () {
|
|
|
+ let working = Promise.resolve();
|
|
|
+ this.channel_.on('consume', async () => {
|
|
|
+ await working;
|
|
|
+ working = new TeePromise();
|
|
|
+ if ( this.channel_.is_empty() ) {
|
|
|
+ await this.intake_();
|
|
|
+ }
|
|
|
+ working.resolve();
|
|
|
+ });
|
|
|
+ this.intake_();
|
|
|
+ }
|
|
|
+
|
|
|
+ async intake_ () {
|
|
|
+ const { value, done } = await this.delegate.read();
|
|
|
+ if ( done ) {
|
|
|
+ this.channel_.write(DONE);
|
|
|
+ return;
|
|
|
}
|
|
|
+ this.channel_.write(value);
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
- const chunk = await this.getChunk_();
|
|
|
+ _create_cancel_response () {
|
|
|
+ return {
|
|
|
+ chunk: null,
|
|
|
+ n_read: 0,
|
|
|
+ debug_meta: {
|
|
|
+ source: 'delegate',
|
|
|
+ returning: 'cancelled',
|
|
|
+ this_value_should_not_be_used: true,
|
|
|
+ },
|
|
|
+ };
|
|
|
+ }
|
|
|
|
|
|
+ read_and_get_info (opt_buffer, cancel_state) {
|
|
|
if ( ! opt_buffer ) {
|
|
|
- return chunk;
|
|
|
+ const { promise, cancel } = this.channel_.get();
|
|
|
+ return {
|
|
|
+ cancel,
|
|
|
+ promise: promise.then(([which, chunk]) => {
|
|
|
+ if ( which !== 'data' ) {
|
|
|
+ return { done: true, value: null };
|
|
|
+ }
|
|
|
+ return { value: chunk };
|
|
|
+ }),
|
|
|
+
|
|
|
+ };
|
|
|
}
|
|
|
|
|
|
- this.chunks_.push(chunk);
|
|
|
+ const final_promise = new TeePromise();
|
|
|
+ let current_cancel_ = () => {};
|
|
|
|
|
|
- while ( this.getTotalBytesReady_() < opt_buffer.length ) {
|
|
|
- this.chunks_.push(await this.getChunk_())
|
|
|
- }
|
|
|
+ (async () => {
|
|
|
+ let n_read = 0;
|
|
|
+ const chunks = [];
|
|
|
+ while ( n_read < opt_buffer.length ) {
|
|
|
+ const { promise, cancel } = this.channel_.get();
|
|
|
+ current_cancel_ = cancel;
|
|
|
|
|
|
- // TODO: need to handle EOT condition in this loop
|
|
|
- let offset = 0;
|
|
|
- for (;;) {
|
|
|
- let item = this.chunks_.shift();
|
|
|
- if ( item === undefined ) {
|
|
|
- throw new Error('calculation is wrong')
|
|
|
- }
|
|
|
- if ( offset + item.length > opt_buffer.length ) {
|
|
|
- const diff = opt_buffer.length - offset;
|
|
|
- this.chunks_.unshift(item.subarray(diff));
|
|
|
- item = item.subarray(0, diff);
|
|
|
+ let [which, chunk] = await promise;
|
|
|
+ if ( which === 'done' ) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if ( which === 'cancel' ) {
|
|
|
+ this.channel_.pushback(...chunks);
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if ( n_read + chunk.length > opt_buffer.length ) {
|
|
|
+ const diff = opt_buffer.length - n_read;
|
|
|
+ this.channel_.pushback(chunk.subarray(diff));
|
|
|
+ chunk = chunk.subarray(0, diff);
|
|
|
+ }
|
|
|
+ chunks.push(chunk);
|
|
|
+ opt_buffer.set(chunk, n_read);
|
|
|
+ n_read += chunk.length;
|
|
|
}
|
|
|
- opt_buffer.set(item, offset);
|
|
|
- offset += item.length;
|
|
|
|
|
|
- if ( offset == opt_buffer.length ) break;
|
|
|
- }
|
|
|
+ final_promise.resolve({ n_read });
|
|
|
+ })();
|
|
|
|
|
|
- // return opt_buffer.length;
|
|
|
+ return {
|
|
|
+ cancel: () => {
|
|
|
+ current_cancel_();
|
|
|
+ },
|
|
|
+ promise: final_promise,
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ read_with_cancel (opt_buffer) {
|
|
|
+ const o = this.read_and_get_info(opt_buffer);
|
|
|
+ const { cancel, promise } = o;
|
|
|
+ // const promise = (async () => {
|
|
|
+ // const { chunk, n_read } = await this.read_and_get_info(opt_buffer, cancel_state);
|
|
|
+ // return opt_buffer ? n_read : chunk;
|
|
|
+ // })();
|
|
|
+ return {
|
|
|
+ cancel,
|
|
|
+ promise,
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ async read (opt_buffer) {
|
|
|
+ const { chunk, n_read } = await this.read_and_get_info(opt_buffer).promise;
|
|
|
+ return opt_buffer ? n_read : chunk;
|
|
|
}
|
|
|
|
|
|
async getChunk_() {
|
|
|
if ( this.chunks_.length === 0 ) {
|
|
|
- const { value } = await this.delegate.read();
|
|
|
- return value;
|
|
|
+ // Wait for either a delegate read to happen, or for a chunk to be added to the buffer from a cancelled read.
|
|
|
+ const delegate_read = this.delegate.read();
|
|
|
+ const [which, result] = await raceCase({
|
|
|
+ delegate: delegate_read,
|
|
|
+ buffer_not_empty: this.waitUntilDataAvailable(),
|
|
|
+ });
|
|
|
+ if (which === 'delegate') {
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ // There's a chunk in the buffer now, so we can use the regular path.
|
|
|
+ // But first, make sure that once the delegate read completes, we save the chunk.
|
|
|
+ this.chunks_.push(result);
|
|
|
}
|
|
|
|
|
|
const len = this.getTotalBytesReady_();
|
|
@@ -85,7 +259,33 @@ export class BetterReader {
|
|
|
}
|
|
|
|
|
|
getTotalBytesReady_ () {
|
|
|
- return this.chunks_.reduce((sum, chunk) => sum + chunk.length, 0);
|
|
|
+ return this.chunks_.reduce((sum, chunk) => {
|
|
|
+ return sum + chunk.value.length
|
|
|
+ }, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ canRead() {
|
|
|
+ return this.getTotalBytesReady_() > 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ async waitUntilDataAvailable() {
|
|
|
+ let resolve_promise;
|
|
|
+ let reject_promise;
|
|
|
+ const promise = new Promise((resolve, reject) => {
|
|
|
+ resolve_promise = resolve;
|
|
|
+ reject_promise = reject;
|
|
|
+ });
|
|
|
+
|
|
|
+ const check = () => {
|
|
|
+ if (this.canRead()) {
|
|
|
+ resolve_promise();
|
|
|
+ } else {
|
|
|
+ setTimeout(check, 0);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ setTimeout(check, 0);
|
|
|
+
|
|
|
+ await promise;
|
|
|
}
|
|
|
}
|
|
|
|