Sfoglia il codice sorgente

dev: CLink and SLink classes

KernelDeimos 9 mesi fa
parent
commit
e2f35b4844

+ 4 - 1
doc/contributors/comment_prefixes.md

@@ -27,4 +27,7 @@ This document will be updated on an _as-needed_ basis.
     [track-comments.md](../devmeta/track-comments.md)
 - `wet:` is usesd to track anything that doesn't adhere
   to the DRY principle; the following message should describe
-  where similar code is
+  where similar code is
+- `compare(<identifier>):` is used to note differences between other
+  implementations of a similar idea
+- `name:` pedantic commentary on the name of something

+ 14 - 256
src/backend/src/modules/broadcast/BroadcastService.js

@@ -18,257 +18,8 @@
  */
 const { AdvancedBase } = require("@heyputer/puter-js-common");
 const BaseService = require("../../services/BaseService");
-
-class KeyPairHelper extends AdvancedBase {
-    static MODULES = {
-        tweetnacl: require('tweetnacl'),
-    };
-    
-    constructor ({
-        kpublic,
-        ksecret,
-    }) {
-        super();
-        this.kpublic = kpublic;
-        this.ksecret = ksecret;
-        this.nonce_ = 0;
-    }
-    
-    to_nacl_key_ (key) {
-        console.log('WUT', key);
-        const full_buffer = Buffer.from(key, 'base64');
-
-        // Remove version byte (assumed to be 0x31 and ignored for now)
-        const buffer = full_buffer.slice(1);
-        
-        return new Uint8Array(buffer);
-    }
-    
-    get naclSecret () {
-        return this.naclSecret_ ?? (
-            this.naclSecret_ = this.to_nacl_key_(this.ksecret));
-    }
-    get naclPublic () {
-        return this.naclPublic_ ?? (
-            this.naclPublic_ = this.to_nacl_key_(this.kpublic));
-    }
-    
-    write (text) {
-        const require = this.require;
-        const nacl = require('tweetnacl');
-
-        const nonce = nacl.randomBytes(nacl.box.nonceLength);
-        const message = {};
-        
-        const textUint8 = new Uint8Array(Buffer.from(text, 'utf-8'));
-        const encryptedText = nacl.box(
-            textUint8, nonce,
-            this.naclPublic, this.naclSecret
-        );
-        message.text = Buffer.from(encryptedText);
-        message.nonce = Buffer.from(nonce);
-        
-        return message;
-    }
-    
-    read (message) {
-        const require = this.require;
-        const nacl = require('tweetnacl');
-        
-        const arr = nacl.box.open(
-            new Uint8Array(message.text),
-            new Uint8Array(message.nonce),
-            this.naclPublic,
-            this.naclSecret,
-        );
-        
-        return Buffer.from(arr).toString('utf-8');
-    }
-}
-
-class Peer extends AdvancedBase {
-    static AUTHENTICATING = Symbol('AUTHENTICATING');
-    static ONLINE = Symbol('ONLINE');
-    static OFFLINE = Symbol('OFFLINE');
-    
-    static MODULES = {
-        sioclient: require('socket.io-client'),
-        crypto: require('crypto'),
-    };
-
-    constructor (svc_broadcast, config) {
-        super();
-        this.svc_broadcast = svc_broadcast;
-        this.log = this.svc_broadcast.log;
-        this.config = config;
-    }
-    
-    send (data) {
-        if ( ! this.socket ) return;
-        const require = this.require;
-        const crypto = require('crypto');
-        const iv = crypto.randomBytes(16);
-        const cipher = crypto.createCipheriv(
-            'aes-256-cbc',
-            this.aesKey,
-            iv,
-        );
-        const jsonified = JSON.stringify(data);
-        let buffers = [];
-        buffers.push(cipher.update(Buffer.from(jsonified, 'utf-8')));
-        buffers.push(cipher.final());
-        const buffer = Buffer.concat(buffers);
-        this.socket.send({
-            iv,
-            message: buffer,
-        });
-    }
-    
-    get state () {
-        try {
-            if ( this.socket?.connected ) return this.constructor.ONLINE;
-        } catch (e) {
-            console.error('could not get peer state', e);
-        }
-        return this.constructor.OFFLINE;
-    }
-    
-    connect () {
-        const address = this.config.address;
-        const socket = this.modules.sioclient(address, {
-            transports: ['websocket'],
-            path: '/wssinternal',
-            reconnection: true,
-            extraHeaders: {
-                ...(this.config.host ? {
-                    Host: this.config.host,
-                } : {})
-            }
-        });
-        socket.on('connect', () => {
-            this.log.info(`connected`, {
-                address: this.config.address
-            });
-
-            const require = this.require;
-            const crypto = require('crypto');
-            this.aesKey = crypto.randomBytes(32);
-
-            const kp_helper = new KeyPairHelper({
-                kpublic: this.config.key,
-                ksecret: this.svc_broadcast.config.keys.secret,
-            });
-            socket.send({
-                $: 'take-my-key',
-                key: this.svc_broadcast.config.keys.public,
-                message: kp_helper.write(
-                    this.aesKey.toString('base64')
-                ),
-            });
-        });
-        socket.on('disconnect', () => {
-            this.log.info(`disconnected`, {
-                address: this.config.address
-            });
-        });
-        socket.on('connect_error', e => {
-            this.log.info(`connection error`, {
-                address: this.config.address,
-                message: e.message,
-            });
-        });
-        socket.on('error', e => {
-            this.log.info('error', {
-                message: e.message,
-            });
-        });
-
-        this.socket = socket;
-    }
-}
-
-class Connection extends AdvancedBase {
-    static MODULES = {
-        crypto: require('crypto'),
-    }
-
-    static AUTHENTICATING = {
-        on_message (data) {
-            if ( data.$ !== 'take-my-key' ) {
-                this.disconnect();
-                return;
-            }
-            
-            const hasKey = this.svc_broadcast.trustedPublicKeys_[data.key];
-            if ( ! hasKey ) {
-                this.disconnect();
-                return;
-            }
-            
-            const is_trusted =
-                this.svc_broadcast.trustedPublicKeys_
-                    .hasOwnProperty(data.key)
-            if ( ! is_trusted ) {
-                this.disconnect();
-                return;
-            }
-
-            const kp_helper = new KeyPairHelper({
-                kpublic: data.key,
-                ksecret: this.svc_broadcast.config.keys.secret,
-            });
-            
-            const message = kp_helper.read(data.message);
-            this.aesKey = Buffer.from(message, 'base64');
-            
-            this.state = this.constructor.ONLINE;
-        }
-    }
-    static ONLINE = {
-        on_message (data) {
-            if ( ! this.on_message ) return;
-            
-            const require = this.require;
-            const crypto = require('crypto');
-            const decipher = crypto.createDecipheriv(
-                'aes-256-cbc',
-                this.aesKey,
-                data.iv,
-            )
-            const buffers = [];
-            buffers.push(decipher.update(data.message));
-            buffers.push(decipher.final());
-            
-            const rawjson = Buffer.concat(buffers).toString('utf-8');
-            
-            const output = JSON.parse(rawjson);
-            
-            this.on_message(output);
-        }
-    }
-    static OFFLINE = {
-        on_message () {
-            throw new Error('unexpected message');
-        }
-    }
-    
-    constructor (svc_broadcast, socket) {
-        super();
-        this.state = this.constructor.AUTHENTICATING;
-        this.svc_broadcast = svc_broadcast;
-        this.log = this.svc_broadcast.log;
-        this.socket = socket;
-        
-        socket.on('message', data => {
-            this.state.on_message.call(this, data);
-        });
-    }
-    
-    disconnect () {
-        this.socket.disconnect(true);
-        this.state = this.constructor.OFFLINE;
-    }
-}
+const { CLink } = require("./connection/CLink");
+const { SLink } = require("./connection/SLink");
 
 class BroadcastService extends BaseService {
     static MODULES = {
@@ -286,7 +37,11 @@ class BroadcastService extends BaseService {
         const peers = this.config.peers ?? [];
         for ( const peer_config of peers ) {
             this.trustedPublicKeys_[peer_config.key] = true;
-            const peer = new Peer(this, peer_config);
+            const peer = new CLink({
+                keys: this.config.keys,
+                config: peer_config,
+                log: this.log,
+            });
             this.peers_.push(peer);
             peer.connect();
         }
@@ -301,7 +56,6 @@ class BroadcastService extends BaseService {
         if ( meta.from_outside ) return;
         
         for ( const peer of this.peers_ ) {
-            if ( peer.state !== Peer.ONLINE ) continue;
             peer.send({ key, data, meta });
         }
     }
@@ -318,10 +72,14 @@ class BroadcastService extends BaseService {
         });
         
         io.on('connection', async socket => {
-            const conn = new Connection(this, socket);
+            const conn = new SLink({
+                keys: this.config.keys,
+                trustedKeys: this.trustedPublicKeys_,
+                socket,
+            });
             this.connections_.push(conn);
             
-            conn.on_message = ({ key, data, meta }) => {
+            conn.channels.message.on(({ key, data, meta }) => {
                 if ( meta.from_outside ) {
                     this.log.noticeme('possible over-sending');
                     return;
@@ -335,7 +93,7 @@ class BroadcastService extends BaseService {
                 
                 meta.from_outside = true;
                 svc_event.emit(key, data, meta);
-            };
+            });
         });
         
         

+ 34 - 0
src/backend/src/modules/broadcast/connection/BaseLink.js

@@ -0,0 +1,34 @@
+const { AdvancedBase } = require("@heyputer/puter-js-common");
+const { ChannelFeature } = require("../../../traits/ChannelFeature");
+
+class BaseLink extends AdvancedBase {
+    static FEATURES = [
+        new ChannelFeature(),
+    ];
+    static CHANNELS = ['message'];
+
+    static MODULES = {
+        crypto: require('crypto'),
+    };
+
+    static AUTHENTICATING = {};
+    static ONLINE = {};
+    static OFFLINE = {};
+
+    send (data) {
+        if ( this.state !== this.constructor.ONLINE ) {
+            return false;
+        }
+
+        return this._send(data);
+    }
+
+    constructor () {
+        super();
+        this.state = this.constructor.AUTHENTICATING;
+    }
+}
+
+module.exports = {
+    BaseLink,
+};

+ 103 - 0
src/backend/src/modules/broadcast/connection/CLink.js

@@ -0,0 +1,103 @@
+const { BaseLink } = require("./BaseLink");
+const { KeyPairHelper } = require("./KeyPairHelper");
+
+class CLink extends BaseLink {
+    static MODULES = {
+        sioclient: require('socket.io-client'),
+    };
+
+    _send (data) {
+        if ( ! this.socket ) return;
+        const require = this.require;
+        const crypto = require('crypto');
+        const iv = crypto.randomBytes(16);
+        const cipher = crypto.createCipheriv(
+            'aes-256-cbc',
+            this.aesKey,
+            iv,
+        );
+        const jsonified = JSON.stringify(data);
+        let buffers = [];
+        buffers.push(cipher.update(Buffer.from(jsonified, 'utf-8')));
+        buffers.push(cipher.final());
+        const buffer = Buffer.concat(buffers);
+        this.socket.send({
+            iv,
+            message: buffer,
+        });
+    }
+
+    constructor ({
+        keys,
+        log,
+        config,
+    }) {
+        super();
+        // keys of client (local)
+        this.keys = keys;
+        // keys of server (remote)
+        this.config = config;
+        this.log = log;
+    }
+
+    connect () {
+        const address = this.config.address;
+        const socket = this.modules.sioclient(address, {
+            transports: ['websocket'],
+            path: '/wssinternal',
+            reconnection: true,
+            extraHeaders: {
+                ...(this.config.host ? {
+                    Host: this.config.host,
+                } : {})
+            }
+        });
+        socket.on('connect', () => {
+            this.log.info(`connected`, {
+                address: this.config.address
+            });
+
+            const require = this.require;
+            const crypto = require('crypto');
+            this.aesKey = crypto.randomBytes(32);
+
+            const kp_helper = new KeyPairHelper({
+                kpublic: this.config.key,
+                ksecret: this.keys.secret,
+            });
+            socket.send({
+                $: 'take-my-key',
+                key: this.keys.public,
+                message: kp_helper.write(
+                    this.aesKey.toString('base64')
+                ),
+            });
+            this.state = this.constructor.ONLINE;
+        });
+        socket.on('disconnect', () => {
+            this.log.info(`disconnected`, {
+                address: this.config.address
+            });
+        });
+        socket.on('connect_error', e => {
+            this.log.info(`connection error`, {
+                address: this.config.address,
+                message: e.message,
+            });
+        });
+        socket.on('error', e => {
+            this.log.info('error', {
+                message: e.message,
+            });
+        });
+        socket.on('message', data => {
+            if ( this.state.on_message ) {
+                this.state.on_message.call(this, data);
+            }
+        });
+
+        this.socket = socket;
+    }
+}
+
+module.exports = { CLink };

+ 72 - 0
src/backend/src/modules/broadcast/connection/KeyPairHelper.js

@@ -0,0 +1,72 @@
+const { AdvancedBase } = require('@heyputer/puter-js-common');
+
+class KeyPairHelper extends AdvancedBase {
+    static MODULES = {
+        tweetnacl: require('tweetnacl'),
+    };
+    
+    constructor ({
+        kpublic,
+        ksecret,
+    }) {
+        super();
+        this.kpublic = kpublic;
+        this.ksecret = ksecret;
+        this.nonce_ = 0;
+    }
+    
+    to_nacl_key_ (key) {
+        console.log('WUT', key);
+        const full_buffer = Buffer.from(key, 'base64');
+
+        // Remove version byte (assumed to be 0x31 and ignored for now)
+        const buffer = full_buffer.slice(1);
+        
+        return new Uint8Array(buffer);
+    }
+    
+    get naclSecret () {
+        return this.naclSecret_ ?? (
+            this.naclSecret_ = this.to_nacl_key_(this.ksecret));
+    }
+    get naclPublic () {
+        return this.naclPublic_ ?? (
+            this.naclPublic_ = this.to_nacl_key_(this.kpublic));
+    }
+    
+    write (text) {
+        const require = this.require;
+        const nacl = require('tweetnacl');
+
+        const nonce = nacl.randomBytes(nacl.box.nonceLength);
+        const message = {};
+        
+        const textUint8 = new Uint8Array(Buffer.from(text, 'utf-8'));
+        const encryptedText = nacl.box(
+            textUint8, nonce,
+            this.naclPublic, this.naclSecret
+        );
+        message.text = Buffer.from(encryptedText);
+        message.nonce = Buffer.from(nonce);
+        
+        return message;
+    }
+    
+    read (message) {
+        const require = this.require;
+        const nacl = require('tweetnacl');
+        
+        const arr = nacl.box.open(
+            new Uint8Array(message.text),
+            new Uint8Array(message.nonce),
+            this.naclPublic,
+            this.naclSecret,
+        );
+        
+        return Buffer.from(arr).toString('utf-8');
+    }
+}
+
+module.exports = {
+    KeyPairHelper,
+};

+ 87 - 0
src/backend/src/modules/broadcast/connection/SLink.js

@@ -0,0 +1,87 @@
+const { BaseLink } = require("./BaseLink");
+const { KeyPairHelper } = require("./KeyPairHelper");
+
+class SLink extends BaseLink {
+    static AUTHENTICATING = {
+        on_message (data) {
+            if ( data.$ !== 'take-my-key' ) {
+                this.disconnect();
+                return;
+            }
+
+            const trustedKeys = this.trustedKeys;
+            
+            const hasKey = trustedKeys[data.key];
+            if ( ! hasKey ) {
+                this.disconnect();
+                return;
+            }
+            
+            const is_trusted = trustedKeys.hasOwnProperty(data.key)
+            if ( ! is_trusted ) {
+                this.disconnect();
+                return;
+            }
+
+            const kp_helper = new KeyPairHelper({
+                kpublic: data.key,
+                ksecret: this.keys.secret,
+            });
+            
+            const message = kp_helper.read(data.message);
+            this.aesKey = Buffer.from(message, 'base64');
+            
+            this.state = this.constructor.ONLINE;
+        }
+    };
+    static ONLINE = {
+        on_message (data) {
+            const require = this.require;
+            const crypto = require('crypto');
+            const decipher = crypto.createDecipheriv(
+                'aes-256-cbc',
+                this.aesKey,
+                data.iv,
+            )
+            const buffers = [];
+            buffers.push(decipher.update(data.message));
+            buffers.push(decipher.final());
+            
+            const rawjson = Buffer.concat(buffers).toString('utf-8');
+            
+            const output = JSON.parse(rawjson);
+            
+            this.channels.message.emit(output);
+        }
+    }
+    static OFFLINE = {
+        on_message () {
+            throw new Error('unexpected message');
+        }
+    }
+
+    _send () {
+        // TODO: implement as a fallback
+        throw new Error('cannot send via SLink yet');
+    }
+
+    constructor ({
+        keys,
+        trustedKeys,
+        socket,
+    }) {
+        super();
+        this.state = this.constructor.AUTHENTICATING;
+        // Keys of server (local)
+        this.keys = keys;
+        // Allowed client keys (remote)
+        this.trustedKeys = trustedKeys;
+        this.socket = socket;
+
+        socket.on('message', data => {
+            this.state.on_message.call(this, data);
+        });
+    }
+}
+
+module.exports = { SLink };

+ 46 - 0
src/backend/src/traits/ChannelFeature.js

@@ -0,0 +1,46 @@
+// name: 'Channel' does not behave the same as Golang's channel construct; it
+//   behaves more like an EventEmitter.
+class Channel {
+    constructor () {
+        this.listeners_ = [];
+    }
+
+    // compare(EventService): EventService has an 'on' method,
+    //   but it accepts a 'selector' argument to narrow the scope of events
+    on (callback) {
+        // wet: EventService also creates an object like this
+        const det = {
+            detach: () => {
+                const idx = this.listeners_.indexOf(callback);
+                if ( idx !== -1 ) {
+                    this.listeners_.splice(idx, 1);
+                }
+            }
+        };
+
+        this.listeners_.push(callback);
+
+        return det;
+    }
+
+    emit (...a) {
+        for ( const lis of this.listeners_ ) {
+            lis(...a);
+        }
+    }
+}
+
+class ChannelFeature {
+    install_in_instance (instance) {
+        const channels = instance._get_merged_static_array('CHANNELS');
+
+        instance.channels = {};
+        for ( const name of channels ) {
+            instance.channels[name] = new Channel(name);
+        }
+    }
+}
+
+module.exports = {
+    ChannelFeature,
+};