/*
* Copyright (C) 2024 Puter Technologies Inc.
*
* This file is part of Puter.
*
* Puter is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
class TeePromise {
static STATUS_PENDING = Symbol('pending');
static STATUS_RUNNING = {};
static STATUS_DONE = Symbol('done');
constructor () {
this.status_ = this.constructor.STATUS_PENDING;
this.donePromise = new Promise((resolve, reject) => {
this.doneResolve = resolve;
this.doneReject = reject;
});
}
get status () {
return this.status_;
}
set status (status) {
this.status_ = status;
if ( status === this.constructor.STATUS_DONE ) {
this.doneResolve();
}
}
resolve (value) {
this.status_ = this.constructor.STATUS_DONE;
this.doneResolve(value);
}
awaitDone () {
return this.donePromise;
}
then (fn, ...a) {
return this.donePromise.then(fn, ...a);
}
reject (err) {
this.status_ = this.constructor.STATUS_DONE;
this.doneReject(err);
}
/**
* @deprecated use then() instead
*/
onComplete(fn) {
return this.then(fn);
}
}
class Lock {
constructor() {
this._locked = false;
this._waiting = [];
}
async acquire(callback) {
await new Promise(resolve => {
if ( ! this._locked ) {
this._locked = true;
resolve();
} else {
this._waiting.push({
resolve,
});
}
})
if ( callback ) {
let retval;
try {
retval = await callback();
} finally {
this.release();
}
return retval;
}
}
release() {
if (this._waiting.length > 0) {
const { resolve } = this._waiting.shift();
resolve();
} else {
this._locked = false;
}
}
}
/**
* @callback behindScheduleCallback
* @param {number} drift - The number of milliseconds that the callback was
* called behind schedule.
* @returns {boolean} - If the callback returns true, the timer will be
* cancelled.
*/
/**
* When passing an async callback to setInterval, it's possible for the
* callback to be called again before the previous invocation has finished.
*
* This function wraps setInterval and ensures that the callback is not
* called again until the previous invocation has finished.
*
* @param {Function} callback - The function to call when the timer elapses.
* @param {number} delay - The minimum number of milliseconds between invocations.
* @param {?Array} args - Additional arguments to pass to setInterval.
* @param {?Object} options - Additional options.
* @param {behindScheduleCallback} options.onBehindSchedule - A callback to call when the callback is called behind schedule.
*/
const asyncSafeSetInterval = async (callback, delay, args, options) => {
args = args ?? [];
options = options ?? {};
const { onBehindSchedule } = options;
const sleep = (ms) => new Promise(rslv => setTimeout(rslv, ms));
for ( ;; ) {
await sleep(delay);
const ts_start = Date.now();
await callback(...args);
const ts_end = Date.now();
const runtime = ts_end - ts_start;
const sleep_time = delay - runtime;
if ( sleep_time < 0 ) {
if ( onBehindSchedule ) {
const cancel = await onBehindSchedule(-sleep_time);
if ( cancel ) {
return;
}
}
} else {
await sleep(sleep_time);
}
}
}
/**
* raceCase is like Promise.race except it takes an object instead of
* an array, and returns the key of the promise that resolves first
* as well as the value that it resolved to.
*
* @param {Object.} promise_map
*
* @returns {Promise.<[string, any]>}
*/
const raceCase = async (promise_map) => {
return Promise.race(Object.entries(promise_map).map(
([key, promise]) => promise.then(value => [key, value])));
};
module.exports = {
TeePromise,
Lock,
asyncSafeSetInterval,
raceCase,
};