diff --git a/src/app.js b/src/app.js index ad37d27..9e216b4 100644 --- a/src/app.js +++ b/src/app.js @@ -81,16 +81,13 @@ module.exports = function App(secret, DB) { var periodInstance = null; let self = this; - //return server, express-app, wss + //return server, express-app Object.defineProperty(self, "server", { get: () => server }); Object.defineProperty(self, "express", { get: () => app }); - Object.defineProperty(self, "wss", { - get: () => wss - }); //set trustedID for subAdmin requests Object.defineProperty(self, "trustedIDs", { @@ -100,17 +97,13 @@ module.exports = function App(secret, DB) { //Start (or) Stop servers self.start = (port) => new Promise(resolve => { server = app.listen(port, () => { - wss = new WebSocket.Server({ - server - }); resolve(`Server Running at port ${port}`); }); }); self.stop = () => new Promise(resolve => { server.close(() => { server = null; - wss = null; - resolve('Server stopped') + resolve('Server stopped'); }); }); diff --git a/src/backup/storage.js b/src/backup/storage.js index 9793512..2210413 100644 --- a/src/backup/storage.js +++ b/src/backup/storage.js @@ -4,22 +4,36 @@ const WAIT_TIME = 30 * 60 * 1000, BACKUP_INTERVAL = 10 * 60 * 1000; var DB; //Container for Database connection -var masterWS; //Container for Master websocket connection +var masterWS = null; //Container for Master websocket connection var intervalID = null; -function startIntervalSync(ws) { +function startSlaveProcess(ws) { + if (!ws) throw Error("Master WS connection required"); + //stop existing process + stopSlaveProcess(); //set masterWS ws.on('message', processDataFromMaster); masterWS = ws; - //stop existing sync - stopIntervalSync(); + //inform master + let message = { + floID: global.floID, + pubKey: global.pubKey, + req_time: Date.now(), + type: "SLAVE_CONNECT" + } + message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey); //start sync requestInstance.open(); intervalID = setInterval(requestInstance.open, BACKUP_INTERVAL); } -function stopIntervalSync() { +function stopSlaveProcess() { + if (masterWS !== null) { + masterWS.onclose = () => null; + masterWS.close(); + masterWS = null; + } if (intervalID !== null) { clearInterval(intervalID); intervalID = null; @@ -40,13 +54,13 @@ function requestBackupSync(ws) { subs.push(`SELECT MAX(${tables[t]}) as ts FROM ${t}`); DB.query(`SELECT MAX(ts) as last_time FROM (${subs.join(' UNION ')}) AS Z`).then(result => { let request = { - floID: myFloID, + floID: global.myFloID, pubKey: myPubKey, type: "BACKUP_SYNC", last_time: result[0].last_time, req_time: Date.now() }; - request.sign = floCrypto.signData(request.type + "|" + request.req_time, myPrivKey); + request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey); console.debug("REQUEST: ", request); ws.send(JSON.stringify(request)); resolve(request); @@ -101,17 +115,46 @@ function processDataFromMaster(message) { try { message = JSON.parse(message); console.debug(message); - if (message.mode.startsWith("SYNC")) + if (message.command.startsWith("SYNC")) processBackupData(message); + else switch (message.command) { + case "SINK_SHARE": + storeSinkShare(message); + break; + case "SEND_SHARE": + sendSinkShare(); + break; + } } catch (error) { console.error(error); } } +function storeSinkShare(message) { + console.debug(Date.now(), '|sinkID:', message.sinkID, '|share:', message.keyShare); + DB.query("INSERT INTO sinkShares (floID, share) VALUE (?, ?)", [message.sinkID, message.keyShare]) + .then(_ => null).catch(error => console.error(error)); +} + +function sendSinkShare() { + DB.query("SELECT floID, share FROM sinkShares ORDER BY time_ DESC LIMIT 1").then(result => { + let response = { + type: "SINK_SHARE", + sinkID: result[0].floID, + share: result[0].share, + floID: global.myFloID, + pubKey: global.pubKey, + req_time: Date.now() + } + response.sign = floCrypto.signData(response.type + "|" + response.req_time, global.myPrivKey); //TODO: strengthen signature + masterWS.send(JSON.stringify(response)); + }).catch(error => console.error(error)); +} + function processBackupData(response) { const self = requestInstance; self.last_response_time = Date.now(); - switch (response.mode) { + switch (response.command) { case "SYNC_ERROR": console.log(response.error); self.close(); @@ -201,6 +244,9 @@ module.exports = { set DB(db) { DB = db; }, - start: startIntervalSync, - stop: stopIntervalSync + get masterWS() { + return masterWS; + }, + start: startSlaveProcess, + stop: stopSlaveProcess } \ No newline at end of file diff --git a/src/backup/transmit.js b/src/backup/transmit.js index 1b03cdd..d327b27 100644 --- a/src/backup/transmit.js +++ b/src/backup/transmit.js @@ -1,7 +1,15 @@ 'use strict'; -const shareThreshold = 70 / 100; +const slave = require('./storage'); +const WebSocket = require('ws'); +const shareThreshold = 50 / 100; -var DB; //Container for database +var DB, app, wss; //Container for database and app +var nodeShares = null, + nodeSinkID = null, + connectedSlaves = {}, + mod = null; +const SLAVE_MODE = 0, + MASTER_MODE = 1; //Backup Transfer function sendBackup(timestamp, ws) { @@ -19,14 +27,14 @@ function sendBackup(timestamp, ws) { if (failedSync.length) { console.info("Backup Sync Failed:", failedSync); ws.send(JSON.stringify({ - mode: "SYNC_END", + command: "SYNC_END", status: false, info: failedSync })); } else { console.info("Backup Sync completed"); ws.send(JSON.stringify({ - mode: "SYNC_END", + command: "SYNC_END", status: true })); } @@ -37,7 +45,7 @@ function send_deleteSync(timestamp, ws) { return new Promise((resolve, reject) => { DB.query("SELECT * FROM _backup WHERE mode is NULL AND timestamp > ?", [timestamp]).then(result => { ws.send(JSON.stringify({ - mode: "SYNC_DELETE", + command: "SYNC_DELETE", delete_data: result })); resolve("deleteSync"); @@ -54,7 +62,7 @@ function send_dataSync(timestamp, ws) { .then(data => { ws.send(JSON.stringify({ table, - mode: "SYNC_ADD_UPDATE", + command: "SYNC_ADD_UPDATE", data })); res(table); @@ -68,7 +76,7 @@ function send_dataSync(timestamp, ws) { let sync_needed = {}; result.forEach(r => r.t_name in sync_needed ? sync_needed[r.t_name].push(r.id) : sync_needed[r.t_name] = [r.id]); ws.send(JSON.stringify({ - mode: "SYNC_ADD_UPDATE_HEADER", + command: "SYNC_ADD_UPDATE_HEADER", add_data: result })); let promises = []; @@ -101,7 +109,7 @@ function send_dataImmutable(timestamp, ws) { .then(data => { ws.send(JSON.stringify({ table, - mode: "SYNC_ADD_IMMUTABLE", + command: "SYNC_ADD_IMMUTABLE", data })); res(table); @@ -129,7 +137,7 @@ function send_dataImmutable(timestamp, ws) { //Shares function generateNewSink() { let sink = floCrypto.generateNewID(); - let nextNodes = KB.nextNode(myFloID, null); + let nextNodes = KB.nextNode(global.myFloID, null); let shares = floCrypto.createShamirsSecretShares(sink.privKey, nextNodes.length, Math.ceil(nextNodes.length * shareThreshold)); sink.shares = {}; for (let i in nextNodes) @@ -137,7 +145,7 @@ function generateNewSink() { return sink; } -function sendShare(ws, sinkID, share) { +function sendShare(ws, sinkID, keyShare) { ws.send(JSON.stringify({ command: "SINK_SHARE", sinkID, @@ -145,9 +153,142 @@ function sendShare(ws, sinkID, share) { })); } +function sendSharesToNodes(sinkID, shares) { + nodeSinkID = sinkID; + nodeShares = shares; + for (let node in shares) + if (node in connectedSlaves) + sendShare(connectedSlaves[node], sinkID, shares[node]); +} + +function storeSink(sinkID, sinkPrivKey) { + global.sinkID = sinkID; + global.sinkPrivKey = sinkPrivKey; + let encryptedKey = Crypto.AES.encrypt(sinkPrivKey, global.myPrivKey); + DB.query('INSERT INTO sinkShares (floID, share) VALUE (?, ?)', [sinkID, '$$$' + encryptedKey]) + .then(_ => console.log('SinkID:', sinkID, '|SinkEnKey:', encryptedKey)) + .catch(error => console.error(error)); +} + +function transferMoneyToNewSink(oldSinkID, oldSinkKey) { + return new Promise((resolve, reject) => { + let newSink = generateNewSink(); + floBlockchainAPI.getBalance(oldSinkID).then(balFLO => { + tokenAPI.getBalance(oldSinkID).then(balRupee => { + floBlockchainAPI.sendTx(oldSinkID, newSink.floID, balFLO - floGlobals.fee, oldSinkKey, `send ${balRupee} ${floGlobals.token}# |Exchange-market New sink`) + .then(result => resolve(newSink)) + .catch(error => reject(error)) + }).catch(error => reject(error)); + }).catch(error => reject(error)) + }) +} + +function collectShares(floID, sinkID, share) { + if (!collectShares.sinkID) { + collectShares.sinkID = sinkID; + collectShares.shares = {}; + } else if (collectShares.sinkID !== sinkID) + return console.error("Something is wrong! Slaves are sending different sinkID"); + collectShares.shares[floID] = share; + try { + let privKey = floCrypto.retrieveShamirSecret(Object.values(collectShares.shares)); + if (floCrypto.verifyPrivKey(privKey, collectShares.sinkID)) { + transferMoneyToNewSink(collectShares.sinkID, privKey).then(newSink => { + delete collectShares.sinkID; + delete collectShares.shares; + collectShares.active = false; + storeSink(newSink.floID, newSink.privKey); + sendSharesToNodes(newSink.floID, newSink.shares); + }).catch(error => console.error(error)); + } + } catch (error) { + //Unable to retrive sink private key. Waiting for more shares! Do nothing for now + }; +} + +function connectWS(floID) { + let url = nodeURL[floID]; + return new Promise((resolve, reject) => { + const ws = new WebSocket(url); + ws.on('open', _ => resolve(ws)); + ws.on('error', _ => reject(error)); + }) +} + +function connectToMaster(i = 0) { + if (i >= nodeList.length) { + console.error("No master is found, and myFloID is not in list. This should not happen!"); + process.exit(1); + } + let floID = nodeList[i]; + if (floID === myFloID) + serveAsMaster(); + else + connectWS(floID).then(ws => { + ws.floID = floID; + ws.onclose = () => connectToMaster(i); + serveAsSlave(ws); + }).catch(error => { + console.log(`Node(${floID}) is offline`); + connectToMaster(i + 1) + }); +} + +//Node becomes master +function serveAsMaster() { + app.resume(); + slave.stop(); + mod = MASTER_MODE; + informLiveNodes(); + collectShares.active = true; +} + +function serveAsSlave(ws) { + app.pause(); + slave.start(ws); + mod = SLAVE_MODE; +} + +function informLiveNodes() { + let message = { + floID: global.myFloID, + type: "UPDATE_MASTER", + pubKey: global.myPubKey, + req_time: Date.now() + }; + message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey); + message = JSON.stringify(message); + for (let n in nodeURL) + if (n !== global.myFloID) + connectWS(n).then(ws => { + ws.send(message); + ws.close(); + }).catch(error => console.warn(`Node ${n} is offline`)); +} + +function updateMaster(floID) { + let currentMaster = mod === MASTER_MODE ? global.floID : slave.masterWS.floID; + if (nodeList.indexOf(floID) < nodeList.indexOf(currentMaster)) + connectToMaster(); +} + +function slaveConnect(floID, ws) { + ws.floID = floID; + connectedSlaves[floID] = ws; + if (collectShares.active) + ws.send(JSON.stringify({ + command: "SEND_SHARE" + })); + else if (nodeShares[floID]) + sendShare(ws, nodeSinkID, nodeShares[floID]); +} + //Transmistter var nodeList; //Container for (backup) node list -function startBackupTransmitter(wss) { +function startBackupTransmitter(server) { + wss = new WebSocket.Server({ + server + }); wss.on('connection', ws => { ws.on('message', message => { //verify if from a backup node @@ -155,38 +296,64 @@ function startBackupTransmitter(wss) { let invalid = null, request = JSON.parse(message); console.debug(request); - if (!backupIDs.includes(request.floID)) - invalid = "FLO ID not approved for backup"; + if (!nodeList.includes(request.floID)) + invalid = `floID ${request.floID} not in nodeList`; else if (request.floID !== floCrypto.getFloID(request.pubKey)) invalid = "Invalid pubKey"; else if (!floCrypto.verifySign(request.type + "|" + request.req_time, request.sign, request.pubKey)) invalid = "Invalid signature"; - else if (request.type !== "BACKUP_SYNC") - invalid = "Invalid Request Type"; //TODO: check if request time is valid; + else switch (request.type) { + case "BACKUP_SYNC": + sendBackup(request.last_time, ws); + break; + case "UPDATE_MASTER": + updateMaster(request.floID); + break; + case "SLAVE_CONNECT": + slaveConnect(request.floID, ws); + break; + case "SINK_SHARE": + collectShares(request.floID, request.sinkID, request.share) + default: + invalid = "Invalid Request Type"; + } if (invalid) ws.send(JSON.stringify({ error: invalid })); - else - sendBackup(request.last_time, ws); } catch (error) { console.error(error); ws.send(JSON.stringify({ - mode: "SYNC_ERROR", + command: "SYNC_ERROR", error: 'Unable to process the request!' })); } }); + ws.on('close', () => { + // remove from connected slaves (if needed) + if (ws.floID in connectedSlaves) + delete connectedSlaves[ws.floID]; + }) }); } +function initProcess(a) { + app = a; + startBackupTransmitter(app.server); + connectToMaster(); +} + module.exports = { - init: startBackupTransmitter, + init: initProcess, set nodeList(ids) { nodeList = ids; }, set DB(db) { DB = db; + slave.DB = db; + }, + get wss() { + return wss; } }; \ No newline at end of file diff --git a/src/main.js b/src/main.js index 5166cf4..81c5c45 100644 --- a/src/main.js +++ b/src/main.js @@ -4,13 +4,13 @@ require('./set_globals'); require('./lib'); require('./floCrypto'); require('./floBlockchainAPI'); +require('./tokenAPI'); const Database = require("./database"); const App = require('./app'); const PORT = config['port']; const K_Bucket = require('./backup/KBucket'); -const slave = require('./backup/storage'); const transmit = require('./backup/transmit'); var DB, app; @@ -134,48 +134,9 @@ loadDataFromDB.trustedIDs = function() { function setDB(db) { DB = db; - slave.DB = DB; transmit.DB = DB; } -function connectWS(floID) { - let url = nodeURL[floID]; - return new Promise((resolve, reject) => { - const ws = new WebSocket(url); - ws.on('open', _ => resolve(ws)); - ws.on('error', _ => reject(error)); - }) -} - -function connectToMaster(i = 0) { - if (i >= nodeList.length) { - console.error("No master is found, and myFloID is not in list. This should not happen!"); - process.exit(1); - } - let floID = nodeList[i]; - if (floID === myFloID) - serveAsMaster(); - else - connectWS(floID).then(ws => { - ws.floID = floID; - ws.onclose = () => connectToMaster(i); - serveAsSlave(ws); - }).catch(error => { - console.log(`Node(${floID}) is offline`); - connectToMaster(i + 1) - }); -} - -function serveAsMaster() { - app.resume(); - slave.stop(); -} - -function serveAsSlave(masterWS) { - app.pause(); - slave.start(masterWS); -} - module.exports = function startServer(public_dir) { try { var _tmp = require('../args/keys.json'); @@ -208,8 +169,7 @@ module.exports = function startServer(public_dir) { refreshData(true).then(_ => { app.start(PORT).then(result => { console.log(result); - transmit.init(app.wss); - connectToMaster(); + transmit.init(app); }).catch(error => console.error(error)) }).catch(error => console.error(error)) }).catch(error => console.error(error));