ll_copy.js 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. /*
  2. * Copyright (C) 2024 Puter Technologies Inc.
  3. *
  4. * This file is part of Puter.
  5. *
  6. * Puter is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU Affero General Public License as published
  8. * by the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Affero General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Affero General Public License
  17. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  18. */
  19. const config = require('../../config');
  20. const { Context } = require('../../util/context');
  21. const { ParallelTasks } = require('../../util/otelutil');
  22. const FSNodeContext = require('../FSNodeContext');
  23. const { NodeUIDSelector } = require('../node/selectors');
  24. const { RESOURCE_STATUS_PENDING_CREATE } = require('../storage/ResourceService');
  25. const { UploadProgressTracker } = require('../storage/UploadProgressTracker');
  26. const { LLFilesystemOperation } = require('./definitions');
  27. class LLCopy extends LLFilesystemOperation {
  28. static MODULES = {
  29. _path: require('path'),
  30. uuidv4: require('uuid').v4,
  31. }
  32. async _run () {
  33. const { _path, uuidv4 } = this.modules;
  34. const { context } = this;
  35. const { source, parent, user, actor, target_name } = this.values;
  36. const svc = context.get('services');
  37. const tracer = svc.get('traceService').tracer;
  38. const fs = svc.get('filesystem');
  39. const svc_event = svc.get('event');
  40. const uuid = uuidv4();
  41. const ts = Math.round(Date.now()/1000);
  42. this.field('target-uid', uuid);
  43. this.field('source', source.selector.describe());
  44. this.checkpoint('before fetch parent entry');
  45. await parent.fetchEntry();
  46. this.checkpoint('before fetch source entry');
  47. await source.fetchEntry({ thumbnail: true });
  48. this.checkpoint('fetched source and parent entries');
  49. console.log('PATH PARAMETERS', {
  50. path: await parent.get('path'),
  51. target_name,
  52. })
  53. // Access Control
  54. {
  55. const svc_acl = context.get('services').get('acl');
  56. this.checkpoint('copy :: access control');
  57. // Check read access to source
  58. if ( ! await svc_acl.check(actor, source, 'read') ) {
  59. throw await svc_acl.get_safe_acl_error(actor, source, 'read');
  60. }
  61. // Check write access to destination
  62. if ( ! await svc_acl.check(actor, parent, 'write') ) {
  63. throw await svc_acl.get_safe_acl_error(actor, source, 'write');
  64. }
  65. }
  66. const raw_fsentry = {
  67. uuid,
  68. is_dir: source.entry.is_dir,
  69. ...(source.entry.is_shortcut ? {
  70. is_shortcut: source.entry.is_shortcut,
  71. shortcut_to: source.entry.shortcut_to,
  72. } :{}),
  73. parent_uid: parent.uid,
  74. name: target_name,
  75. created: ts,
  76. modified: ts,
  77. path: _path.join(await parent.get('path'), target_name),
  78. // if property exists but the value is undefined,
  79. // it will still be included in the INSERT, causing
  80. // an error
  81. ...(source.entry.thumbnail ?
  82. { thumbnail: source.entry.thumbnail } : {}),
  83. user_id: user.id,
  84. };
  85. svc_event.emit('fs.pending.file', {
  86. fsentry: FSNodeContext.sanitize_pending_entry_info(raw_fsentry),
  87. context: this.context,
  88. })
  89. this.checkpoint('emitted fs.pending.file');
  90. if ( await source.get('has-s3') ) {
  91. Object.assign(raw_fsentry, {
  92. size: source.entry.size,
  93. associated_app_id: source.entry.associated_app_id,
  94. bucket: source.entry.bucket,
  95. bucket_region: source.entry.bucket_region,
  96. });
  97. await tracer.startActiveSpan(`fs:cp:storage-copy`, async span => {
  98. let progress_tracker = new UploadProgressTracker();
  99. svc_event.emit('fs.storage.progress.copy', {
  100. upload_tracker: progress_tracker,
  101. context: Context.get(),
  102. meta: {
  103. item_uid: uuid,
  104. item_path: raw_fsentry.path,
  105. }
  106. });
  107. this.checkpoint('emitted fs.storage.progress.copy');
  108. // const storage = new PuterS3StorageStrategy({ services: svc });
  109. const storage = Context.get('storage');
  110. const state_copy = storage.create_copy();
  111. await state_copy.run({
  112. src_node: source,
  113. dst_storage: {
  114. key: uuid,
  115. bucket: raw_fsentry.bucket,
  116. bucket_region: raw_fsentry.bucket_region,
  117. },
  118. storage_api: { progress_tracker },
  119. });
  120. this.checkpoint('finished storage copy');
  121. span.end();
  122. });
  123. }
  124. {
  125. const svc_size = svc.get('sizeService');
  126. await svc_size.add_node_size(undefined, source, user);
  127. this.checkpoint('added source size');
  128. }
  129. const svc_resource = svc.get('resourceService');
  130. svc_resource.register({
  131. uid: uuid,
  132. status: RESOURCE_STATUS_PENDING_CREATE,
  133. });
  134. const svc_fsentry = svc.get('systemFSEntryService');
  135. this.log.info(`inserting entry: ` + uuid);
  136. const entryOp = await svc_fsentry.insert(raw_fsentry);
  137. let node;
  138. this.checkpoint('before parallel tasks');
  139. const tasks = new ParallelTasks({ tracer, max: 4 });
  140. await tracer.startActiveSpan(`fs:cp:parallel-portion`, async span => {
  141. this.checkpoint('starting parallel tasks');
  142. // Add child copy tasks if this is a directory
  143. if ( source.entry.is_dir ) {
  144. const fsEntryService = svc.get('fsEntryService');
  145. const children = await fsEntryService.fast_get_direct_descendants(
  146. source.uid
  147. );
  148. for ( const child_uuid of children ) {
  149. tasks.add(`fs:cp:copy-child`, async () => {
  150. const child_node = await fs.node(
  151. new NodeUIDSelector(child_uuid)
  152. );
  153. const child_name = await child_node.get('name');
  154. // TODO: this should be LLCopy instead
  155. const ll_copy = new LLCopy();
  156. console.log('LL Copy Start');
  157. await ll_copy.run({
  158. source: await fs.node(
  159. new NodeUIDSelector(child_uuid)
  160. ),
  161. parent: await fs.node(
  162. new NodeUIDSelector(uuid)
  163. ),
  164. user,
  165. target_name: child_name,
  166. });
  167. console.log('LL Copy End');
  168. // const hl_copy = new HLCopy();
  169. // await hl_copy.run({
  170. // destination_or_parent: await fs.node(
  171. // new NodeUIDSelector(uuid)
  172. // ),
  173. // source: await fs.node(
  174. // new NodeUIDSelector(child_uuid)
  175. // ),
  176. // user
  177. // });
  178. // await fs.cp(fs, {
  179. // source: await fs.node(
  180. // new NodeUIDSelector(child_uuid)
  181. // ),
  182. // // TODO: don't do this when cp supports uuids
  183. // destinationOrParent: await fs.node(
  184. // new NodeUIDSelector(uuid)
  185. // ),
  186. // user,
  187. // overwrite: false,
  188. // create_missing_parents: false,
  189. // ancestor_check_not_needed: true,
  190. // });
  191. });
  192. }
  193. }
  194. // Add task to await entry
  195. tasks.add(`fs:cp:entry-op`, async () => {
  196. await entryOp.awaitDone();
  197. svc_resource.free(uuid);
  198. this.log.info(`done inserting entry: ` + uuid);
  199. const copy_fsNode = await fs.node(new NodeUIDSelector(uuid));
  200. copy_fsNode.entry = raw_fsentry;
  201. copy_fsNode.found = true;
  202. copy_fsNode.path = raw_fsentry.path;
  203. node = copy_fsNode;
  204. svc_event.emit('fs.create.file', {
  205. node,
  206. context: this.context,
  207. })
  208. }, { force: true });
  209. this.checkpoint('waiting for parallel tasks');
  210. await tasks.awaitAll();
  211. this.checkpoint('finishing up');
  212. span.end();
  213. });
  214. node = node || await fs.node(new NodeUIDSelector(uuid));
  215. // TODO: What event do we emit? How do we know if we're overwriting?
  216. return node;
  217. }
  218. }
  219. module.exports = {
  220. LLCopy,
  221. };