1
0

Pipeline.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. /*
  2. * Copyright (C) 2024 Puter Technologies Inc.
  3. *
  4. * This file is part of Phoenix Shell.
  5. *
  6. * Phoenix Shell is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU Affero General Public License as published
  8. * by the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Affero General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Affero General Public License
  17. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. */
  19. import { SyncLinesReader } from "../ioutil/SyncLinesReader.js";
  20. import { TOKENS } from "../readline/readtoken.js";
  21. import { ByteWriter } from "../ioutil/ByteWriter.js";
  22. import { Coupler } from "./Coupler.js";
  23. import { CommandStdinDecorator } from "./iowrappers.js";
  24. import { Pipe } from "./Pipe.js";
  25. import { MemReader } from "../ioutil/MemReader.js";
  26. import { MemWriter } from "../ioutil/MemWriter.js";
  27. import { MultiWriter } from "../ioutil/MultiWriter.js";
  28. import { NullifyWriter } from "../ioutil/NullifyWriter.js";
  29. import { ConcreteSyntaxError } from "../ConcreteSyntaxError.js";
  30. import { SignalReader } from "../ioutil/SignalReader.js";
  31. import { Exit } from "../../puter-shell/coreutils/coreutil_lib/exit.js";
  32. import { resolveRelativePath } from '../../util/path.js';
  33. import { printUsage } from '../../puter-shell/coreutils/coreutil_lib/help.js';
  34. class Token {
  35. static createFromAST (ctx, ast) {
  36. if ( ast.$ !== 'token' ) {
  37. throw new Error('expected token node');
  38. }
  39. console.log('ast has cst?',
  40. ast,
  41. ast.components?.[0]?.$cst
  42. )
  43. return new Token(ast);
  44. }
  45. constructor (ast) {
  46. this.ast = ast;
  47. this.$cst = ast.components?.[0]?.$cst;
  48. }
  49. maybeStaticallyResolve (ctx) {
  50. // If the only components are of type 'symbol' and 'string.segment'
  51. // then we can statically resolve the value of the token.
  52. console.log('checking viability of static resolve', this.ast)
  53. const isStatic = this.ast.components.every(c => {
  54. return c.$ === 'symbol' || c.$ === 'string.segment';
  55. });
  56. if ( ! isStatic ) return;
  57. console.log('doing static thing', this.ast)
  58. // TODO: Variables can also be statically resolved, I think...
  59. let value = '';
  60. for ( const component of this.ast.components ) {
  61. console.log('component', component);
  62. value += component.text;
  63. }
  64. return value;
  65. }
  66. async resolve (ctx) {
  67. let value = '';
  68. for ( const component of this.ast.components ) {
  69. if ( component.$ === 'string.segment' || component.$ === 'symbol' ) {
  70. value += component.text;
  71. continue;
  72. }
  73. if ( component.$ === 'pipeline' ) {
  74. const pipeline = await Pipeline.createFromAST(ctx, component);
  75. const memWriter = new MemWriter();
  76. const cmdCtx = { externs: { out: memWriter } }
  77. const subCtx = ctx.sub(cmdCtx);
  78. await pipeline.execute(subCtx);
  79. value += memWriter.getAsString().trimEnd();
  80. continue;
  81. }
  82. }
  83. // const name_subst = await PreparedCommand.createFromAST(this.ctx, command);
  84. // const memWriter = new MemWriter();
  85. // const cmdCtx = { externs: { out: memWriter } }
  86. // const ctx = this.ctx.sub(cmdCtx);
  87. // name_subst.setContext(ctx);
  88. // await name_subst.execute();
  89. // const cmd = memWriter.getAsString().trimEnd();
  90. return value;
  91. }
  92. }
  93. export class PreparedCommand {
  94. static async createFromAST (ctx, ast) {
  95. if ( ast.$ !== 'command' ) {
  96. throw new Error('expected command node');
  97. }
  98. ast = { ...ast };
  99. const command_token = Token.createFromAST(ctx, ast.tokens.shift());
  100. // TODO: check that node for command name is of a
  101. // supported type - maybe use adapt pattern
  102. console.log('ast?', ast);
  103. const cmd = command_token.maybeStaticallyResolve(ctx);
  104. const { commands } = ctx.registries;
  105. const { commandProvider } = ctx.externs;
  106. const command = cmd
  107. ? await commandProvider.lookup(cmd, { ctx })
  108. : command_token;
  109. if ( command === undefined ) {
  110. console.log('command token?', command_token);
  111. throw new ConcreteSyntaxError(
  112. `no command: ${JSON.stringify(cmd)}`,
  113. command_token.$cst,
  114. );
  115. throw new Error('no command: ' + JSON.stringify(cmd));
  116. }
  117. // TODO: test this
  118. console.log('ast?', ast);
  119. const inputRedirect = ast.inputRedirects.length > 0 ? (() => {
  120. const token = Token.createFromAST(ctx, ast.inputRedirects[0]);
  121. return token.maybeStaticallyResolve(ctx) ?? token;
  122. })() : null;
  123. // TODO: test this
  124. const outputRedirects = ast.outputRedirects.map(rdirNode => {
  125. const token = Token.createFromAST(ctx, rdirNode);
  126. return token.maybeStaticallyResolve(ctx) ?? token;
  127. });
  128. return new PreparedCommand({
  129. command,
  130. args: ast.tokens.map(node => Token.createFromAST(ctx, node)),
  131. // args: ast.args.map(node => node.text),
  132. inputRedirect,
  133. outputRedirects,
  134. });
  135. }
  136. constructor ({ command, args, inputRedirect, outputRedirects }) {
  137. this.command = command;
  138. this.args = args;
  139. this.inputRedirect = inputRedirect;
  140. this.outputRedirects = outputRedirects;
  141. }
  142. setContext (ctx) {
  143. this.ctx = ctx;
  144. }
  145. async execute () {
  146. let { command, args } = this;
  147. // If we have an AST node of type `command` it means we
  148. // need to run that command to get the name of the
  149. // command to run.
  150. if ( command instanceof Token ) {
  151. const cmd = await command.resolve(this.ctx);
  152. console.log('RUNNING CMD?', cmd)
  153. const { commandProvider } = this.ctx.externs;
  154. command = await commandProvider.lookup(cmd, { ctx: this.ctx });
  155. if ( command === undefined ) {
  156. throw new Error('no command: ' + JSON.stringify(cmd));
  157. }
  158. }
  159. args = await Promise.all(args.map(async node => {
  160. if ( node instanceof Token ) {
  161. return await node.resolve(this.ctx);
  162. }
  163. return node.text;
  164. }));
  165. const { argparsers } = this.ctx.registries;
  166. const { decorators } = this.ctx.registries;
  167. let in_ = this.ctx.externs.in_;
  168. if ( this.inputRedirect ) {
  169. const { filesystem } = this.ctx.platform;
  170. const dest_path = this.inputRedirect instanceof Token
  171. ? await this.inputRedirect.resolve(this.ctx)
  172. : this.inputRedirect;
  173. const response = await filesystem.read(
  174. resolveRelativePath(this.ctx.vars, dest_path));
  175. in_ = new MemReader(response);
  176. }
  177. const internal_input_pipe = new Pipe();
  178. const valve = new Coupler(in_, internal_input_pipe.in);
  179. in_ = internal_input_pipe.out;
  180. // simple naive implementation for now
  181. const sig = {
  182. listeners_: [],
  183. emit (signal) {
  184. for ( const listener of this.listeners_ ) {
  185. listener(signal);
  186. }
  187. },
  188. on (listener) {
  189. this.listeners_.push(listener);
  190. }
  191. };
  192. in_ = new SignalReader({ delegate: in_, sig });
  193. if ( command.input?.syncLines ) {
  194. in_ = new SyncLinesReader({ delegate: in_ });
  195. }
  196. in_ = new CommandStdinDecorator(in_);
  197. let out = this.ctx.externs.out;
  198. const outputMemWriters = [];
  199. if ( this.outputRedirects.length > 0 ) {
  200. for ( let i=0 ; i < this.outputRedirects.length ; i++ ) {
  201. outputMemWriters.push(new MemWriter());
  202. }
  203. out = new NullifyWriter({ delegate: out });
  204. out = new MultiWriter({
  205. delegates: [...outputMemWriters, out],
  206. });
  207. }
  208. const ctx = this.ctx.sub({
  209. externs: {
  210. in_,
  211. out,
  212. sig,
  213. },
  214. cmdExecState: {
  215. valid: true,
  216. printHelpAndExit: false,
  217. },
  218. locals: {
  219. command,
  220. args,
  221. outputIsRedirected: this.outputRedirects.length > 0,
  222. }
  223. });
  224. if ( command.args ) {
  225. const argProcessorId = command.args.$;
  226. const argProcessor = argparsers[argProcessorId];
  227. const spec = { ...command.args };
  228. delete spec.$;
  229. await argProcessor.process(ctx, spec);
  230. }
  231. if ( ! ctx.cmdExecState.valid ) {
  232. ctx.locals.exit = -1;
  233. await ctx.externs.out.close();
  234. return;
  235. }
  236. if ( ctx.cmdExecState.printHelpAndExit ) {
  237. ctx.locals.exit = 0;
  238. await printUsage(command, ctx.externs.out, ctx.vars);
  239. await ctx.externs.out.close();
  240. return;
  241. }
  242. let execute = command.execute.bind(command);
  243. if ( command.decorators ) {
  244. for ( const decoratorId in command.decorators ) {
  245. const params = command.decorators[decoratorId];
  246. const decorator = decorators[decoratorId];
  247. execute = decorator.decorate(execute, {
  248. command, params, ctx
  249. });
  250. }
  251. }
  252. // FIXME: This is really sketchy...
  253. // `await execute(ctx);` should automatically throw any promise rejections,
  254. // but for some reason Node crashes first, unless we set this handler,
  255. // EVEN IF IT DOES NOTHING. I also can't find a place to safely remove it,
  256. // so apologies if it makes debugging promises harder.
  257. if (ctx.platform.name === 'node') {
  258. const rejectionCatcher = (reason, promise) => {
  259. };
  260. process.on('unhandledRejection', rejectionCatcher);
  261. }
  262. let exit_code = 0;
  263. try {
  264. await execute(ctx);
  265. valve.close();
  266. } catch (e) {
  267. if ( e instanceof Exit ) {
  268. exit_code = e.code;
  269. } else if ( e.code ) {
  270. await ctx.externs.err.write(
  271. '\x1B[31;1m' +
  272. command.name + ': ' +
  273. e.message + '\x1B[0m\n'
  274. );
  275. } else {
  276. await ctx.externs.err.write(
  277. '\x1B[31;1m' +
  278. command.name + ': ' +
  279. e.toString() + '\x1B[0m\n'
  280. );
  281. ctx.locals.exit = -1;
  282. }
  283. }
  284. // ctx.externs.in?.close?.();
  285. // ctx.externs.out?.close?.();
  286. await ctx.externs.out.close();
  287. // TODO: need write command from puter-shell before this can be done
  288. for ( let i=0 ; i < this.outputRedirects.length ; i++ ) {
  289. console.log('output redirect??', this.outputRedirects[i]);
  290. const { filesystem } = this.ctx.platform;
  291. const outputRedirect = this.outputRedirects[i];
  292. const dest_path = outputRedirect instanceof Token
  293. ? await outputRedirect.resolve(this.ctx)
  294. : outputRedirect;
  295. const path = resolveRelativePath(ctx.vars, dest_path);
  296. console.log('it should work?', {
  297. path,
  298. outputMemWriters,
  299. })
  300. // TODO: error handling here
  301. await filesystem.write(path, outputMemWriters[i].getAsBlob());
  302. }
  303. console.log('OUTPUT WRITERS', outputMemWriters);
  304. }
  305. }
  306. export class Pipeline {
  307. static async createFromAST (ctx, ast) {
  308. if ( ast.$ !== 'pipeline' ) {
  309. throw new Error('expected pipeline node');
  310. }
  311. const preparedCommands = [];
  312. for ( const cmdNode of ast.commands ) {
  313. const command = await PreparedCommand.createFromAST(ctx, cmdNode);
  314. preparedCommands.push(command);
  315. }
  316. return new Pipeline({ preparedCommands });
  317. }
  318. constructor ({ preparedCommands }) {
  319. this.preparedCommands = preparedCommands;
  320. }
  321. async execute (ctx) {
  322. const preparedCommands = this.preparedCommands;
  323. let nextIn = ctx.externs.in;
  324. let lastPipe = null;
  325. // TOOD: this will eventually defer piping of certain
  326. // sub-pipelines to the Puter Shell.
  327. for ( let i=0 ; i < preparedCommands.length ; i++ ) {
  328. const command = preparedCommands[i];
  329. // if ( command.command.input?.syncLines ) {
  330. // nextIn = new SyncLinesReader({ delegate: nextIn });
  331. // }
  332. const cmdCtx = { externs: { in_: nextIn } };
  333. const pipe = new Pipe();
  334. lastPipe = pipe;
  335. let cmdOut = pipe.in;
  336. cmdOut = new ByteWriter({ delegate: cmdOut });
  337. cmdCtx.externs.out = cmdOut;
  338. cmdCtx.externs.commandProvider = ctx.externs.commandProvider;
  339. nextIn = pipe.out;
  340. // TODO: need to consider redirect from out to err
  341. cmdCtx.externs.err = ctx.externs.out;
  342. command.setContext(ctx.sub(cmdCtx));
  343. }
  344. const coupler = new Coupler(lastPipe.out, ctx.externs.out);
  345. const commandPromises = [];
  346. for ( let i = preparedCommands.length - 1 ; i >= 0 ; i-- ) {
  347. const command = preparedCommands[i];
  348. commandPromises.push(command.execute());
  349. }
  350. await Promise.all(commandPromises);
  351. console.log('PIPELINE DONE');
  352. await coupler.isDone;
  353. }
  354. }