BatchExecutor.js 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. /*
  2. * Copyright (C) 2024 Puter Technologies Inc.
  3. *
  4. * This file is part of Puter.
  5. *
  6. * Puter is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU Affero General Public License as published
  8. * by the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Affero General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Affero General Public License
  17. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. */
  19. const { AdvancedBase } = require('puter-js-common');
  20. const PathResolver = require('../../routers/filesystem_api/batch/PathResolver');
  21. const commands = require('./commands').commands;
  22. const { WorkUnit } = require('../../services/runtime-analysis/ExpectationService');
  23. const APIError = require('../../api/APIError');
  24. const { Context } = require('../../util/context');
  25. const config = require('../../config');
  26. const { TeePromise } = require('../../util/promise');
  27. class BatchExecutor extends AdvancedBase {
  28. constructor (x, { user, log, errors }) {
  29. super();
  30. this.x = x;
  31. this.user = user;
  32. this.pathResolver = new PathResolver({ user });
  33. this.expectations = x.get('services').get('expectations');
  34. this.log = log;
  35. this.errors = errors;
  36. this.responsePromises = [];
  37. this.hasError = false;
  38. this.total_tbd = true;
  39. this.total = 0;
  40. this.counter = 0;
  41. this.concurrent_ops = 0;
  42. this.max_concurrent_ops = 20;
  43. this.ops_promise = null;
  44. }
  45. async ready_for_more () {
  46. if ( this.ops_promise === null ) {
  47. this.ops_promise = new TeePromise();
  48. }
  49. await this.ops_promise;
  50. }
  51. async exec_op (req, op, file) {
  52. while ( this.concurrent_ops >= this.max_concurrent_ops ) {
  53. await this.ready_for_more();
  54. }
  55. this.concurrent_ops++;
  56. if ( config.env == 'dev' ) {
  57. const wid = this.x.get('dev_batch-widget');
  58. wid.ops++;
  59. }
  60. const { expectations } = this;
  61. const command_cls = commands[op.op];
  62. console.log(command_cls, JSON.stringify(op, null, 2));
  63. delete op.op;
  64. const workUnit = WorkUnit.create();
  65. expectations.expect_eventually({
  66. workUnit,
  67. checkpoint: 'operation responded'
  68. });
  69. // TEMP: event service will handle this
  70. op.original_client_socket_id = req.body.original_client_socket_id;
  71. op.socket_id = req.body.socket_id;
  72. // run the operation
  73. let p = this.x.arun(async () => {
  74. const x= Context.get();
  75. if ( ! x ) throw new Error('no context');
  76. try {
  77. if ( file ) workUnit.checkpoint(
  78. 'about to run << ' +
  79. (file.originalname ?? file.name) +
  80. ' >> ' +
  81. JSON.stringify(op)
  82. );
  83. const command_ins = await command_cls.run({
  84. getFile: () => file,
  85. pathResolver: this.pathResolver,
  86. user: this.user
  87. }, op);
  88. workUnit.checkpoint('operation invoked');
  89. const res = await command_ins.awaitValue('result');
  90. // const res = await opctx.awaitValue('response');
  91. workUnit.checkpoint('operation responded');
  92. return res;
  93. } catch (e) {
  94. this.hasError = true;
  95. if ( ! ( e instanceof APIError ) ) {
  96. // TODO: alarm condition
  97. this.errors.report('batch-operation', {
  98. source: e,
  99. trace: true,
  100. alarm: true,
  101. });
  102. e = APIError.adapt(e);
  103. }
  104. // Consume stream if there's a file
  105. if ( file ) {
  106. try {
  107. // read entire stream
  108. await new Promise((resolve, reject) => {
  109. file.stream.on('end', resolve);
  110. file.stream.on('error', reject);
  111. file.stream.resume();
  112. });
  113. } catch (e) {
  114. this.errors.report('batch-operation-2', {
  115. source: e,
  116. trace: true,
  117. alarm: true,
  118. });
  119. }
  120. }
  121. if ( config.env == 'dev' ) {
  122. console.error(e);
  123. // process.exit(1);
  124. }
  125. const serialized_error = e.serialize();
  126. return serialized_error;
  127. } finally {
  128. if ( config.env == 'dev' ) {
  129. const wid = x.get('dev_batch-widget');
  130. wid.ops--;
  131. }
  132. this.concurrent_ops--;
  133. if ( this.ops_promise && this.concurrent_ops < this.max_concurrent_ops ) {
  134. this.ops_promise.resolve();
  135. this.ops_promise = null;
  136. }
  137. }
  138. });
  139. // decorate with logging
  140. p = p.then(result => {
  141. this.counter++;
  142. const { log, total, total_tbd, counter } = this;
  143. const total_str = total_tbd ? `TBD(>${total})` : `${total}`;
  144. log.noticeme(`Batch Progress: ${counter} / ${total_str} operations`);
  145. return result;
  146. });
  147. // this.responsePromises.push(p);
  148. // It doesn't really matter whether or not `await` is here
  149. // (that's a design flaw in the Promise API; what if you
  150. // want a promise that returns a promise?)
  151. const result = await p;
  152. return result;
  153. }
  154. }
  155. module.exports = {
  156. BatchExecutor,
  157. };