diff --git a/src/client.js b/src/client.js index 4e0424f..0414146 100644 --- a/src/client.js +++ b/src/client.js @@ -1,6 +1,6 @@ var DB, _list; //container for database and _list (stored n serving) -global.INVALID = function(message) { +global.INVALID = function (message) { if (!(this instanceof INVALID)) return new INVALID(message); this.message = message; @@ -39,7 +39,7 @@ function processIncomingData(data) { console.debug(result); resolve(result); }).catch(error => { - console.debug(error); + (error instanceof INVALID ? console.debug : console.error)(error); reject(error); }); }; diff --git a/src/intra.js b/src/intra.js index 6f29104..8915592 100644 --- a/src/intra.js +++ b/src/intra.js @@ -1,5 +1,6 @@ 'use strict'; const WebSocket = require('ws'); +const keys = require('./keys'); //CONSTANTS const SUPERNODE_INDICATOR = '$', @@ -26,12 +27,12 @@ var DB, refresher; //container for database and refresher //List of node backups stored const _list = {}; Object.defineProperty(_list, 'delete', { - value: function(id) { + value: function (id) { delete this[id]; } }); Object.defineProperty(_list, 'get', { - value: function(keys = null) { + value: function (keys = null) { if (keys === null) keys = Object.keys(this); if (Array.isArray(keys)) return Object.fromEntries(keys.map(k => [k, this[k]])); @@ -40,12 +41,12 @@ Object.defineProperty(_list, 'get', { } }); Object.defineProperty(_list, 'stored', { - get: function() { + get: function () { return Object.keys(this); } }); Object.defineProperty(_list, 'serving', { - get: function() { + get: function () { let serveList = []; for (let id in this) if (this[id] === 0) @@ -58,7 +59,7 @@ Object.defineProperty(_list, 'serving', { function NodeContainer() { var _ws, _id, _onmessage, _onclose; Object.defineProperty(this, 'set', { - value: function(id, ws) { + value: function (id, ws) { if (_ws !== undefined) this.close(); _id = id; @@ -70,12 +71,12 @@ function NodeContainer() { } }); Object.defineProperty(this, 'id', { - get: function() { + get: function () { return _id; } }); Object.defineProperty(this, 'readyState', { - get: function() { + get: function () { if (_ws instanceof WebSocket) return _ws.readyState; else @@ -83,29 +84,29 @@ function NodeContainer() { } }); Object.defineProperty(this, 'send', { - value: function(packet) { + value: function (packet) { _ws.send(packet); } }); Object.defineProperty(this, 'onmessage', { - set: function(fn) { + set: function (fn) { if (fn instanceof Function) _onmessage = fn; } }); Object.defineProperty(this, 'onclose', { - set: function(fn) { + set: function (fn) { if (fn instanceof Function) _onclose = fn; } }); Object.defineProperty(this, 'is', { - value: function(ws) { + value: function (ws) { return ws === _ws; } }); Object.defineProperty(this, 'close', { - value: function() { + value: function () { if (_ws.readyState === 1) { _ws.onclose = () => console.warn('Closing: ' + _id); _ws.close(); @@ -126,17 +127,17 @@ _prevNode.onclose = evt => _prevNode.close(); //Packet processing const packet_ = {}; -packet_.construct = function(message) { +packet_.construct = function (message) { const packet = { - from: myFloID, + from: keys.node_id, message: message, time: Date.now() }; - packet.sign = floCrypto.signData(this.s(packet), myPrivKey); + packet.sign = floCrypto.signData(this.s(packet), keys.node_priv); return SUPERNODE_INDICATOR + JSON.stringify(packet); }; packet_.s = d => [JSON.stringify(d.message), d.time].join("|"); -packet_.parse = function(str) { +packet_.parse = function (str) { try { let packet = JSON.parse(str.substring(SUPERNODE_INDICATOR.length)); let curTime = Date.now(); @@ -172,7 +173,7 @@ function connectToActiveNode(snID, reverse = false) { return new Promise((resolve, reject) => { if (!(snID in floGlobals.supernodes)) return reject(`${snID} is not a supernode`); - if (snID === myFloID) + if (snID === keys.node_id) return reject(`Reached end of circle. Next node avaiable is self`); connectToNode(snID) .then(ws => resolve(ws)) @@ -186,12 +187,12 @@ function connectToActiveNode(snID, reverse = false) { }; //Connect to next available node -function connectToNextNode(curNode = myFloID) { +function connectToNextNode(curNode = keys.node_id) { return new Promise((resolve, reject) => { - if (curNode === myFloID && !(myFloID in floGlobals.supernodes)) - return reject(`This (${myFloID}) is not a supernode`); + if (curNode === keys.node_id && !(keys.node_id in floGlobals.supernodes)) + return reject(`This (${keys.node_id}) is not a supernode`); let nextNodeID = cloud.nextNode(curNode); - if (nextNodeID === myFloID) + if (nextNodeID === keys.node_id) return reject("No other node online"); connectToNode(nextNodeID).then(ws => { _nextNode.set(nextNodeID, ws); @@ -209,7 +210,7 @@ function connectToNextNode(curNode = myFloID) { function connectToAliveNodes(nodes = null) { if (!Array.isArray(nodes)) nodes = Object.keys(floGlobals.supernodes); - nodes = nodes.filter(n => n !== myFloID); + nodes = nodes.filter(n => n !== keys.node_id); return new Promise((resolve, reject) => { Promise.allSettled(nodes.map(n => connectToNode(n))).then(results => { let ws_connections = {}; @@ -261,7 +262,7 @@ function processTaskFromNextNode(packet) { storeBackupData(task.data, from, packet); break; default: - console.log("Invalid task type:" + task.type + "from next-node"); + console.warn("Invalid task type:" + task.type + "from next-node"); }; }); }; @@ -299,7 +300,7 @@ function processTaskFromPrevNode(packet) { deleteMigratedData(task.data, from, packet); break; default: - console.log("Invalid task type:" + task.type + "from prev-node"); + console.warn("Invalid task type:" + task.type + "from prev-node"); }; }); }; @@ -325,7 +326,7 @@ function processTaskFromSupernode(packet, ws) { initiateRefresh(); break; default: - console.log("Invalid task type:" + task.type + "from super-node"); + console.warn("Invalid task type:" + task.type + "from super-node"); }; }); }; @@ -336,7 +337,7 @@ function processTaskFromSupernode(packet, ws) { //Acknowledge handshake function handshakeMid(id, ws) { if (_prevNode.id && _prevNode.id in floGlobals.supernodes) { - if (cloud.innerNodes(_prevNode.id, myFloID).includes(id)) { + if (cloud.innerNodes(_prevNode.id, keys.node_id).includes(id)) { //close existing prev-node connection _prevNode.send(packet_.construct({ type: RECONNECT_NEXT_NODE @@ -364,7 +365,7 @@ function handshakeMid(id, ws) { if (!_nextNode.id) reconnectNextNode(); //Reorder storelist - let nodes = cloud.innerNodes(_prevNode.id, myFloID).concat(myFloID), + let nodes = cloud.innerNodes(_prevNode.id, keys.node_id).concat(keys.node_id), req_sync = [], new_order = []; nodes.forEach(n => { @@ -384,7 +385,7 @@ function handshakeMid(id, ws) { handshakeMid.requestData(req_sync, new_order); }; -handshakeMid.requestData = function(req_sync, new_order) { +handshakeMid.requestData = function (req_sync, new_order) { if (handshakeMid.timeout) { clearTimeout(handshakeMid.timeout); delete handshakeMid.timeout; @@ -438,15 +439,15 @@ function reconnectNextNode() { if (_nextNode.id) _nextNode.close(); connectToNextNode() - .then(result => console.log(result)) + .then(result => console.debug(result)) .catch(error => { //Case: No other node is online console.info(error); //Serve all nodes for (let sn in floGlobals.supernodes) DB.createTable(sn) - .then(result => _list[sn] = 0) - .catch(error => console.error(error)); + .then(result => _list[sn] = 0) + .catch(error => console.error(error)); }); }; @@ -456,13 +457,13 @@ function reconnectNextNode() { function orderBackup(order) { let new_order = [], req_sync = []; - let cur_serve = cloud.innerNodes(_prevNode.id, myFloID).concat(myFloID); + let cur_serve = cloud.innerNodes(_prevNode.id, keys.node_id).concat(keys.node_id); for (let n in order) { if (!cur_serve.includes(n) && order[n] + 1 !== _list[n] && n in floGlobals.supernodes) { if (order[n] >= floGlobals.sn_config.backupDepth) DB.dropTable(n).then(_ => null) - .catch(error => console.error(error)) - .finally(_ => _list.delete(n)); + .catch(error => console.error(error)) + .finally(_ => _list.delete(n)); else if (order[n] >= 0) { if (_list[n] === undefined) req_sync.push(n); @@ -477,7 +478,7 @@ function orderBackup(order) { orderBackup.requestData(req_sync, new_order); }; -orderBackup.requestData = function(req_sync, new_order) { +orderBackup.requestData = function (req_sync, new_order) { Promise.allSettled(req_sync.map(n => DB.createGetLastLog(n))).then(result => { let lastlogs = {}, @@ -522,13 +523,13 @@ function sendStoredData(lastlogs, node) { id: n, status: true })); - console.log(`START: ${n} data sync(send) to ${node.id}`); + console.info(`START: ${n} data sync(send) to ${node.id}`); //TODO: efficiently handle large number of data instead of loading all into memory result.forEach(d => node.send(packet_.construct({ type: STORE_BACKUP_DATA, data: d }))); - console.log(`END: ${n} data sync(send) to ${node.id}`); + console.info(`END: ${n} data sync(send) to ${node.id}`); node.send(packet_.construct({ type: DATA_SYNC, id: n, @@ -541,7 +542,7 @@ function sendStoredData(lastlogs, node) { //Indicate sync of data function dataSyncIndication(snID, status, from) { - console.log(`${status ? 'START':'END'}: ${snID} data sync(receive) form ${from}`); + console.info(`${status ? 'START' : 'END'}: ${snID} data sync(receive) form ${from}`); }; //Store (backup) data @@ -633,7 +634,7 @@ function dataMigration(node_change, flag) { if (del_nodes.includes(_nextNode.id)) reconnectNextNode(); else { //reconnect next node if there are newly added nodes in between self and current next node - let innerNodes = cloud.innerNodes(myFloID, _nextNode.id); + let innerNodes = cloud.innerNodes(keys.node_id, _nextNode.id); if (new_nodes.filter(n => innerNodes.includes(n)).length) reconnectNextNode(); }; @@ -654,17 +655,17 @@ function dataMigration(node_change, flag) { }; //data migration sub-process: Deleted nodes -dataMigration.process_del = async function(del_nodes, old_kb) { +dataMigration.process_del = async function (del_nodes, old_kb) { if (!del_nodes.length) return; - let serve = _prevNode.id ? old_kb.innerNodes(_prevNode.id, myFloID) : _list.serving; + let serve = _prevNode.id ? old_kb.innerNodes(_prevNode.id, keys.node_id) : _list.serving; let process_nodes = del_nodes.filter(n => serve.includes(n)); if (process_nodes.length) { connectToAllActiveNodes().then(ws_connections => { let remaining = process_nodes.length; process_nodes.forEach(n => { DB.readAllData(n, 0).then(result => { - console.log(`START: Data migration for ${n}`); + console.info(`START: Data migration for ${n}`); //TODO: efficiently handle large number of data instead of loading all into memory result.forEach(d => { let closest = cloud.closestNode(d.receiverID); @@ -681,7 +682,7 @@ dataMigration.process_del = async function(del_nodes, old_kb) { data: d })); }); - console.log(`END: Data migration for ${n}`); + console.info(`END: Data migration for ${n}`); _list.delete(n); DB.dropTable(n).then(_ => null).catch(e => console.error(e)); remaining--; @@ -706,7 +707,7 @@ dataMigration.process_del = async function(del_nodes, old_kb) { }; //data migration sub-process: Added nodes -dataMigration.process_new = async function(new_nodes) { +dataMigration.process_new = async function (new_nodes) { if (!new_nodes.length) return; connectToAllActiveNodes(new_nodes).then(ws_connections => { @@ -756,7 +757,7 @@ dataMigration.process_new = async function(new_nodes) { }); }; -dataMigration.intimateAllNodes = function() { +dataMigration.intimateAllNodes = function () { connectToAliveNodes().then(ws_connections => { let packet = packet_.construct({ type: INITIATE_REFRESH diff --git a/src/keys.js b/src/keys.js new file mode 100644 index 0000000..d24bd00 --- /dev/null +++ b/src/keys.js @@ -0,0 +1,37 @@ +'use strict'; + +const PRIV_EKEY_MIN = 32, + PRIV_EKEY_MAX = 48; + +var node_priv, e_key, node_id, node_pub; //containers for node-key wrapper +const _ = { + get node_priv() { + if (!node_priv || !e_key) + throw Error("keys not set"); + return Crypto.AES.decrypt(node_priv, e_key); + }, + set node_priv(key) { + node_pub = floCrypto.getPubKeyHex(key); + node_id = floCrypto.getFloID(node_pub); + if (!key || !node_pub || !node_id) + throw Error("Invalid Keys"); + let n = floCrypto.randInt(PRIV_EKEY_MIN, PRIV_EKEY_MAX) + e_key = floCrypto.randString(n); + node_priv = Crypto.AES.encrypt(key, e_key); + } +} + +module.exports = { + set node_priv(key) { + _.node_priv = key; + }, + get node_priv() { + return _.node_priv; + }, + get node_id() { + return node_id; + }, + get node_pub() { + return node_pub; + } +} \ No newline at end of file diff --git a/src/main.js b/src/main.js index 615baf3..789c701 100644 --- a/src/main.js +++ b/src/main.js @@ -1,4 +1,3 @@ -const config = require('../args/config.json'); global.floGlobals = require("./floGlobals"); require('./set_globals'); require('./lib'); @@ -9,16 +8,33 @@ const Database = require("./database"); const intra = require('./intra'); const client = require('./client'); const Server = require('./server'); +const keys = require("./keys"); var DB; //Container for Database object const INTERVAL_REFRESH_TIME = 1 * 60 * 60 * 1000; //1 hr function startNode() { - //Set myPrivKey, myPubKey, myFloID - global.myPrivKey = config["privateKey"]; - global.myPubKey = floCrypto.getPubKeyHex(config["privateKey"]); - global.myFloID = floCrypto.getFloID(config["privateKey"]); - console.info("Logged In as " + myFloID); + + const config = require(`../args/config.json`); + let _pass; + for (let arg of process.argv) + if (/^-password=/i.test(arg)) + _pass = arg.split(/=(.*)/s)[1]; + try { + let _tmp = require(`../args/keys.json`); + _tmp = floCrypto.retrieveShamirSecret(_tmp); + if (!_pass) { + console.error('Password not entered!'); + process.exit(1); + } + keys.node_priv = Crypto.AES.decrypt(_tmp, _pass); + } catch (error) { + console.error('Unable to load private key!'); + process.exit(1); + } + + console.info("Logged in as", keys.node_id); + //DB connect Database(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(db => { console.info("Connected to Database"); @@ -233,7 +249,7 @@ function diskCleanUp(base) { if (failed.length) { console.error(JSON.stringify(failed)); let success = results.length - failed.length; - reject(`Disk clean-up process has failed at ${100 * success/results.length}%. (Success:${success}|Failed:${failed.count})`); + reject(`Disk clean-up process has failed at ${100 * success / results.length}%. (Success:${success}|Failed:${failed.count})`); } else resolve("Disk clean-up process finished successfully (100%)"); }).catch(error => reject(error));