Pipeline.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. /*
  2. * Copyright (C) 2024 Puter Technologies Inc.
  3. *
  4. * This file is part of Phoenix Shell.
  5. *
  6. * Phoenix Shell is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU Affero General Public License as published
  8. * by the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Affero General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Affero General Public License
  17. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. */
  19. import { SyncLinesReader } from "../ioutil/SyncLinesReader.js";
  20. import { TOKENS } from "../readline/readtoken.js";
  21. import { ByteWriter } from "../ioutil/ByteWriter.js";
  22. import { Coupler } from "./Coupler.js";
  23. import { CommandStdinDecorator } from "./iowrappers.js";
  24. import { Pipe } from "./Pipe.js";
  25. import { MemReader } from "../ioutil/MemReader.js";
  26. import { MemWriter } from "../ioutil/MemWriter.js";
  27. import { MultiWriter } from "../ioutil/MultiWriter.js";
  28. import { NullifyWriter } from "../ioutil/NullifyWriter.js";
  29. import { ConcreteSyntaxError } from "../ConcreteSyntaxError.js";
  30. import { SignalReader } from "../ioutil/SignalReader.js";
  31. import { Exit } from "../../puter-shell/coreutils/coreutil_lib/exit.js";
  32. import { resolveRelativePath } from '../../util/path.js';
  33. import { printUsage } from '../../puter-shell/coreutils/coreutil_lib/help.js';
  34. class Token {
  35. static createFromAST (ctx, ast) {
  36. if ( ast.$ !== 'token' ) {
  37. throw new Error('expected token node');
  38. }
  39. return new Token(ast);
  40. }
  41. constructor (ast) {
  42. this.ast = ast;
  43. this.$cst = ast.components?.[0]?.$cst;
  44. }
  45. maybeStaticallyResolve (ctx) {
  46. // If the only components are of type 'symbol' and 'string.segment'
  47. // then we can statically resolve the value of the token.
  48. const isStatic = this.ast.components.every(c => {
  49. return c.$ === 'symbol' || c.$ === 'string.segment';
  50. });
  51. if ( ! isStatic ) return;
  52. // TODO: Variables can also be statically resolved, I think...
  53. let value = '';
  54. for ( const component of this.ast.components ) {
  55. value += component.text;
  56. }
  57. return value;
  58. }
  59. async resolve (ctx) {
  60. let value = '';
  61. for ( const component of this.ast.components ) {
  62. if ( component.$ === 'string.segment' || component.$ === 'symbol' ) {
  63. value += component.text;
  64. continue;
  65. }
  66. if ( component.$ === 'pipeline' ) {
  67. const pipeline = await Pipeline.createFromAST(ctx, component);
  68. const memWriter = new MemWriter();
  69. const cmdCtx = { externs: { out: memWriter } }
  70. const subCtx = ctx.sub(cmdCtx);
  71. await pipeline.execute(subCtx);
  72. value += memWriter.getAsString().trimEnd();
  73. continue;
  74. }
  75. }
  76. // const name_subst = await PreparedCommand.createFromAST(this.ctx, command);
  77. // const memWriter = new MemWriter();
  78. // const cmdCtx = { externs: { out: memWriter } }
  79. // const ctx = this.ctx.sub(cmdCtx);
  80. // name_subst.setContext(ctx);
  81. // await name_subst.execute();
  82. // const cmd = memWriter.getAsString().trimEnd();
  83. return value;
  84. }
  85. }
  86. export class PreparedCommand {
  87. static async createFromAST (ctx, ast) {
  88. if ( ast.$ !== 'command' ) {
  89. throw new Error('expected command node');
  90. }
  91. ast = { ...ast };
  92. const command_token = Token.createFromAST(ctx, ast.tokens.shift());
  93. // TODO: check that node for command name is of a
  94. // supported type - maybe use adapt pattern
  95. const cmd = command_token.maybeStaticallyResolve(ctx);
  96. const { commands } = ctx.registries;
  97. const { commandProvider } = ctx.externs;
  98. const command = cmd
  99. ? await commandProvider.lookup(cmd, { ctx })
  100. : command_token;
  101. if ( command === undefined ) {
  102. throw new ConcreteSyntaxError(
  103. `no command: ${JSON.stringify(cmd)}`,
  104. command_token.$cst,
  105. );
  106. }
  107. // TODO: test this
  108. const inputRedirect = ast.inputRedirects.length > 0 ? (() => {
  109. const token = Token.createFromAST(ctx, ast.inputRedirects[0]);
  110. return token.maybeStaticallyResolve(ctx) ?? token;
  111. })() : null;
  112. // TODO: test this
  113. const outputRedirects = ast.outputRedirects.map(rdirNode => {
  114. const token = Token.createFromAST(ctx, rdirNode);
  115. return token.maybeStaticallyResolve(ctx) ?? token;
  116. });
  117. return new PreparedCommand({
  118. command,
  119. args: ast.tokens.map(node => Token.createFromAST(ctx, node)),
  120. // args: ast.args.map(node => node.text),
  121. inputRedirect,
  122. outputRedirects,
  123. });
  124. }
  125. constructor ({ command, args, inputRedirect, outputRedirects }) {
  126. this.command = command;
  127. this.args = args;
  128. this.inputRedirect = inputRedirect;
  129. this.outputRedirects = outputRedirects;
  130. }
  131. setContext (ctx) {
  132. this.ctx = ctx;
  133. }
  134. async execute () {
  135. let { command, args } = this;
  136. // If we have an AST node of type `command` it means we
  137. // need to run that command to get the name of the
  138. // command to run.
  139. if ( command instanceof Token ) {
  140. const cmd = await command.resolve(this.ctx);
  141. const { commandProvider } = this.ctx.externs;
  142. command = await commandProvider.lookup(cmd, { ctx: this.ctx });
  143. if ( command === undefined ) {
  144. throw new Error('no command: ' + JSON.stringify(cmd));
  145. }
  146. }
  147. args = await Promise.all(args.map(async node => {
  148. if ( node instanceof Token ) {
  149. return await node.resolve(this.ctx);
  150. }
  151. return node.text;
  152. }));
  153. const { argparsers } = this.ctx.registries;
  154. const { decorators } = this.ctx.registries;
  155. let in_ = this.ctx.externs.in_;
  156. if ( this.inputRedirect ) {
  157. const { filesystem } = this.ctx.platform;
  158. const dest_path = this.inputRedirect instanceof Token
  159. ? await this.inputRedirect.resolve(this.ctx)
  160. : this.inputRedirect;
  161. const response = await filesystem.read(
  162. resolveRelativePath(this.ctx.vars, dest_path));
  163. in_ = new MemReader(response);
  164. }
  165. const internal_input_pipe = new Pipe();
  166. const valve = new Coupler(in_, internal_input_pipe.in);
  167. in_ = internal_input_pipe.out;
  168. // simple naive implementation for now
  169. const sig = {
  170. listeners_: [],
  171. emit (signal) {
  172. for ( const listener of this.listeners_ ) {
  173. listener(signal);
  174. }
  175. },
  176. on (listener) {
  177. this.listeners_.push(listener);
  178. }
  179. };
  180. in_ = new SignalReader({ delegate: in_, sig });
  181. if ( command.input?.syncLines ) {
  182. in_ = new SyncLinesReader({ delegate: in_ });
  183. }
  184. in_ = new CommandStdinDecorator(in_);
  185. let out = this.ctx.externs.out;
  186. const outputMemWriters = [];
  187. if ( this.outputRedirects.length > 0 ) {
  188. for ( let i=0 ; i < this.outputRedirects.length ; i++ ) {
  189. outputMemWriters.push(new MemWriter());
  190. }
  191. out = new NullifyWriter({ delegate: out });
  192. out = new MultiWriter({
  193. delegates: [...outputMemWriters, out],
  194. });
  195. }
  196. const ctx = this.ctx.sub({
  197. externs: {
  198. in_,
  199. out,
  200. sig,
  201. },
  202. cmdExecState: {
  203. valid: true,
  204. printHelpAndExit: false,
  205. },
  206. locals: {
  207. command,
  208. args,
  209. outputIsRedirected: this.outputRedirects.length > 0,
  210. }
  211. });
  212. if ( command.args ) {
  213. const argProcessorId = command.args.$;
  214. const argProcessor = argparsers[argProcessorId];
  215. const spec = { ...command.args };
  216. delete spec.$;
  217. await argProcessor.process(ctx, spec);
  218. }
  219. if ( ! ctx.cmdExecState.valid ) {
  220. ctx.locals.exit = -1;
  221. await ctx.externs.out.close();
  222. return;
  223. }
  224. if ( ctx.cmdExecState.printHelpAndExit ) {
  225. ctx.locals.exit = 0;
  226. await printUsage(command, ctx.externs.out, ctx.vars);
  227. await ctx.externs.out.close();
  228. return;
  229. }
  230. let execute = command.execute.bind(command);
  231. if ( command.decorators ) {
  232. for ( const decoratorId in command.decorators ) {
  233. const params = command.decorators[decoratorId];
  234. const decorator = decorators[decoratorId];
  235. execute = decorator.decorate(execute, {
  236. command, params, ctx
  237. });
  238. }
  239. }
  240. // FIXME: This is really sketchy...
  241. // `await execute(ctx);` should automatically throw any promise rejections,
  242. // but for some reason Node crashes first, unless we set this handler,
  243. // EVEN IF IT DOES NOTHING. I also can't find a place to safely remove it,
  244. // so apologies if it makes debugging promises harder.
  245. if (ctx.platform.name === 'node') {
  246. const rejectionCatcher = (reason, promise) => {
  247. };
  248. process.on('unhandledRejection', rejectionCatcher);
  249. }
  250. let exit_code = 0;
  251. try {
  252. await execute(ctx);
  253. valve.close();
  254. } catch (e) {
  255. if ( e instanceof Exit ) {
  256. exit_code = e.code;
  257. } else if ( e.code ) {
  258. await ctx.externs.err.write(
  259. '\x1B[31;1m' +
  260. command.name + ': ' +
  261. e.message + '\x1B[0m\n'
  262. );
  263. } else {
  264. await ctx.externs.err.write(
  265. '\x1B[31;1m' +
  266. command.name + ': ' +
  267. e.toString() + '\x1B[0m\n'
  268. );
  269. ctx.locals.exit = -1;
  270. }
  271. }
  272. // ctx.externs.in?.close?.();
  273. // ctx.externs.out?.close?.();
  274. await ctx.externs.out.close();
  275. // TODO: need write command from puter-shell before this can be done
  276. for ( let i=0 ; i < this.outputRedirects.length ; i++ ) {
  277. const { filesystem } = this.ctx.platform;
  278. const outputRedirect = this.outputRedirects[i];
  279. const dest_path = outputRedirect instanceof Token
  280. ? await outputRedirect.resolve(this.ctx)
  281. : outputRedirect;
  282. const path = resolveRelativePath(ctx.vars, dest_path);
  283. // TODO: error handling here
  284. await filesystem.write(path, outputMemWriters[i].getAsBlob());
  285. }
  286. }
  287. }
  288. export class Pipeline {
  289. static async createFromAST (ctx, ast) {
  290. if ( ast.$ !== 'pipeline' ) {
  291. throw new Error('expected pipeline node');
  292. }
  293. const preparedCommands = [];
  294. for ( const cmdNode of ast.commands ) {
  295. const command = await PreparedCommand.createFromAST(ctx, cmdNode);
  296. preparedCommands.push(command);
  297. }
  298. return new Pipeline({ preparedCommands });
  299. }
  300. constructor ({ preparedCommands }) {
  301. this.preparedCommands = preparedCommands;
  302. }
  303. async execute (ctx) {
  304. const preparedCommands = this.preparedCommands;
  305. let nextIn = ctx.externs.in;
  306. let lastPipe = null;
  307. // TOOD: this will eventually defer piping of certain
  308. // sub-pipelines to the Puter Shell.
  309. for ( let i=0 ; i < preparedCommands.length ; i++ ) {
  310. const command = preparedCommands[i];
  311. // if ( command.command.input?.syncLines ) {
  312. // nextIn = new SyncLinesReader({ delegate: nextIn });
  313. // }
  314. const cmdCtx = { externs: { in_: nextIn } };
  315. const pipe = new Pipe();
  316. lastPipe = pipe;
  317. let cmdOut = pipe.in;
  318. cmdOut = new ByteWriter({ delegate: cmdOut });
  319. cmdCtx.externs.out = cmdOut;
  320. cmdCtx.externs.commandProvider = ctx.externs.commandProvider;
  321. nextIn = pipe.out;
  322. // TODO: need to consider redirect from out to err
  323. cmdCtx.externs.err = ctx.externs.out;
  324. command.setContext(ctx.sub(cmdCtx));
  325. }
  326. const coupler = new Coupler(lastPipe.out, ctx.externs.out);
  327. const commandPromises = [];
  328. for ( let i = preparedCommands.length - 1 ; i >= 0 ; i-- ) {
  329. const command = preparedCommands[i];
  330. commandPromises.push(command.execute());
  331. }
  332. await Promise.all(commandPromises);
  333. await coupler.isDone;
  334. }
  335. }