diff --git a/src/backup/head.js b/src/backup/head.js index c32ae96..5907019 100644 --- a/src/backup/head.js +++ b/src/backup/head.js @@ -8,7 +8,6 @@ const shareThreshold = 50 / 100; var DB, app, wss, tokenList; //Container for database and app var nodeList, nodeURL, nodeKBucket; //Container for (backup) node list var nodeShares = null, - nodeSinkID = null, connectedSlaves = {}, mod = null; const SLAVE_MODE = 0, @@ -100,28 +99,34 @@ function send_dataSync(timestamp, ws) { } //Shares -function generateNewSink() { - let sink = floCrypto.generateNewID(); - sink.shares = {}; - let nextNodes = nodeKBucket.nextNode(global.myFloID, null); - if (nextNodes.length) { - let shares = floCrypto.createShamirsSecretShares(sink.privKey, nextNodes.length, Math.ceil(nextNodes.length * shareThreshold)); +function generateShares(sinkKey) { + let nextNodes = nodeKBucket.nextNode(global.myFloID, null), + aliveNodes = Object.keys(connectedSlaves); + if (nextNodes.length == 0) //This is the last node in nodeList + return false; + else if (aliveNodes.length == 0) //This is the last node in nodeList + return null; + else { + let N = nextNodes.length + 1, + th = Math.ceil(aliveNodes.length * shareThreshold), + shares, refShare, mappedShares = {}; + shares = floCrypto.createShamirsSecretShares(sinkKey, N, th < 2 ? 2 : th); + refShare = shares.pop(); for (let i in nextNodes) - sink.shares[nextNodes[i]] = shares[i]; + mappedShares[nextNodes[i]] = [refShare, shares[i]].join("|"); + return mappedShares; } - return sink; } function sendShare(ws, sinkID, keyShare) { ws.send(JSON.stringify({ command: "SINK_SHARE", sinkID, - keyShare + keyShare: floCrypto.encryptData(keyShare, ws.pubKey) })); } function sendSharesToNodes(sinkID, shares) { - nodeSinkID = sinkID; nodeShares = shares; for (let node in shares) if (node in connectedSlaves) @@ -131,12 +136,13 @@ function sendSharesToNodes(sinkID, shares) { function storeSink(sinkID, sinkPrivKey) { global.sinkID = sinkID; global.sinkPrivKey = sinkPrivKey; - let encryptedKey = Crypto.AES.encrypt(sinkPrivKey, global.myPrivKey); - DB.query('INSERT INTO sinkShares (floID, share) VALUE (?, ?)', [sinkID, '$$$' + encryptedKey]) + let encryptedKey = Crypto.AES.encrypt(slave.SINK_KEY_INDICATOR + sinkPrivKey, global.myPrivKey); + DB.query('INSERT INTO sinkShares (floID, share) VALUE (?, ?) AS new ON DUPLICATE KEY UPDATE share=new.share', [sinkID, encryptedKey]) .then(_ => console.log('SinkID:', sinkID, '|SinkEnKey:', encryptedKey)) .catch(error => console.error(error)); } +/* function transferMoneyToNewSink(oldSinkID, oldSinkKey, newSink) { const transferToken = token => new Promise((resolve, reject) => { tokenAPI.getBalance(oldSinkID, token).then(tokenBalance => { @@ -169,32 +175,44 @@ function transferMoneyToNewSink(oldSinkID, oldSinkKey, newSink) { }); }) } +*/ -function collectShares(floID, sinkID, share) { - if (!collectShares.sinkID) { - collectShares.sinkID = sinkID; - collectShares.shares = {}; - } else if (collectShares.sinkID !== sinkID) +const collectShares = {}; +collectShares.retrive = function(floID, sinkID, share) { + const self = this; + if (!self.sinkID) { + self.sinkID = sinkID; + self.shares = {}; + } else if (self.sinkID !== sinkID) return console.error("Something is wrong! Slaves are sending different sinkID"); - collectShares.shares[floID] = share; + if (share.startsWith(slave.SINK_KEY_INDICATOR)) { + let sinkKey = share.substring(slave.SINK_KEY_INDICATOR.length); + console.debug("Received sinkKey:", sinkID, sinkKey); + self.verify(sinkKey); + } else + self.shares[floID] = share.split("|"); try { - let oldSinkKey = floCrypto.retrieveShamirSecret(Object.values(collectShares.shares)); - if (floCrypto.verifyPrivKey(oldSinkKey, collectShares.sinkID)) { - let newSink = generateNewSink(); - transferMoneyToNewSink(collectShares.sinkID, oldSinkKey, newSink).then(result => { - console.log("Money transfer successful", result); - delete collectShares.sinkID; - delete collectShares.shares; - collectShares.active = false; - sendSharesToNodes(newSink.floID, newSink.shares); - }).catch(error => console.error(error)) - .finally(_ => storeSink(newSink.floID, newSink.privKey)); - } - } catch (error) { + let sinkKey = floCrypto.retrieveShamirSecret([].concat(...Object.values(self.shares))); + console.debug("Retrived sinkKey:", sinkID, sinkKey); + self.verify(sinkKey); + } catch { //Unable to retrive sink private key. Waiting for more shares! Do nothing for now }; } +collectShares.verify = function(sinkKey) { + const self = this; + if (floCrypto.verifyPrivKey(sinkKey, self.sinkID)) { + let sinkID = self.sinkID; + console.log("Shares collected successfully for", sinkID); + self.active = false; + delete self.sinkID; + delete self.shares; + storeSink(sinkID, sinkKey); + sendSharesToNodes(sinkID, generateShares(sinkKey)); + } +} + function connectWS(floID) { let url = nodeURL[floID]; return new Promise((resolve, reject) => { @@ -259,29 +277,48 @@ function informLiveNodes(init) { flag = true; } else console.warn(`Node(${nodes[i]}) is offline`); - if (init) { - if (flag === true) { - collectShares.active = true; - syncRequest(); - } else { - //No other node is active (possible 1st node to start exchange) - console.debug("Starting the exchange...") - let newSink = generateNewSink(); - storeSink(newSink.floID, newSink.privKey); - sendSharesToNodes(newSink.floID, newSink.shares); - } - } else { - collectShares.active = true; - DB.query("SELECT floID, share FROM sinkShares ORDER BY time_ DESC LIMIT 1") - .then(result => collectShares(global.myFloID, result[0].floID, result[0].share)) - .catch(error => console.error(error)) - } + if (init && flag) + syncRequest(); + //Check if sinkKey or share available in DB + DB.query("SELECT floID, share FROM sinkShares ORDER BY time_ DESC LIMIT 1").then(result => { + if (result.length) { + let share = Crypto.AES.decrypt(result[0].share, global.myPrivKey); + if (share.startsWith(slave.SINK_KEY_INDICATOR)) { + //sinkKey is already present in DB, use it directly + collectShares.active = false; + global.sinkPrivKey = share.substring(slave.SINK_KEY_INDICATOR.length); + global.sinkID = floCrypto.getFloID(global.sinkPrivKey); + if (global.sinkID != result[0].floID) { + console.warn("sinkID and sinkKey in DB are not pair!"); + storeSink(global.sinkID, global.sinkPrivKey); + } + console.debug("Loaded sinkKey:", global.sinkID, global.sinkPrivKey) + sendSharesToNodes(global.sinkID, generateShares(global.sinkPrivKey)) + } else { + //Share is present in DB, try to collect remaining shares and retrive sinkKey + collectShares.active = true; + collectShares.retrive(global.myFloID, result[0].floID, share); + } + } else if (init) { + if (flag) //Other nodes online, try to collect shares and retrive sinkKey + collectShares.active = true; + else { + //No other node is active (possible 1st node to start exchange) + console.log("Starting the exchange..."); + collectShares.active = false; + let newSink = floCrypto.generateNewID(); + console.debug("Generated sinkKey:", newSink.floID, newSink.privKey); + storeSink(newSink.floID, newSink.privKey); + sendSharesToNodes(newSink.floID, generateShares(newSink.privKey)); + } + } else //This should not happen! + console.error("Something is wrong! Node is not starting and no key/share present in DB"); + }).catch(error => console.error(error)); }); } function syncRequest(cur = global.myFloID) { //Sync data from next available node - console.debug("syncRequest", cur); let nextNode = nodeKBucket.nextNode(cur); if (!nextNode) return console.warn("No nodes available to Sync"); @@ -296,15 +333,20 @@ function updateMaster(floID) { connectToMaster(); } -function slaveConnect(floID, ws) { +function slaveConnect(floID, pubKey, ws) { ws.floID = floID; + ws.pubKey = pubKey; connectedSlaves[floID] = ws; if (collectShares.active) ws.send(JSON.stringify({ - command: "SEND_SHARE" + command: "SEND_SHARE", + pubKey: global.myPubKey })); + else if (nodeShares === null || //The 1st backup is connected + Object.keys(connectedSlaves).length < Math.pow(shareThreshold, 2) * Object.keys(nodeShares).length) //re-calib shares for better + sendSharesToNodes(global.sinkID, generateShares(global.sinkPrivKey)) else if (nodeShares[floID]) - sendShare(ws, nodeSinkID, nodeShares[floID]); + sendShare(ws, global.sinkID, nodeShares[floID]); } //Transmistter @@ -334,21 +376,24 @@ function startBackupTransmitter(server) { updateMaster(request.floID); break; case "SLAVE_CONNECT": - slaveConnect(request.floID, ws); + slaveConnect(request.floID, request.pubKey, ws); break; case "SINK_SHARE": - collectShares(request.floID, request.sinkID, request.share) + collectShares.retrive(request.floID, request.sinkID, floCrypto.decryptData(request.share, global.myPrivKey)) default: invalid = "Invalid Request Type"; } if (invalid) ws.send(JSON.stringify({ + type: request.type, + command: "REQUEST_ERROR", error: invalid })); } catch (error) { console.error(error); ws.send(JSON.stringify({ - command: "SYNC_ERROR", + type: request.type, + command: "REQUEST_ERROR", error: 'Unable to process the request!' })); } @@ -373,7 +418,6 @@ module.exports = { nodeURL = list; nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL)); nodeList = nodeKBucket.order; - console.debug(nodeList); }, set assetList(assets) { tokenList = assets.filter(a => a.toUpperCase() !== "FLO"); diff --git a/src/backup/slave.js b/src/backup/slave.js index 4ab0ffa..cd4a4f5 100644 --- a/src/backup/slave.js +++ b/src/backup/slave.js @@ -1,7 +1,8 @@ 'use strict'; const WAIT_TIME = 10 * 60 * 1000, - BACKUP_INTERVAL = 1 * 60 * 1000; + BACKUP_INTERVAL = 1 * 60 * 1000, + SINK_KEY_INDICATOR = '$$$'; var DB; //Container for Database connection var masterWS = null; //Container for Master websocket connection @@ -18,14 +19,15 @@ function startSlaveProcess(ws) { //inform master let message = { floID: global.myFloID, - pubKey: global.pubKey, + pubKey: global.myPubKey, req_time: Date.now(), type: "SLAVE_CONNECT" } message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey); + ws.send(JSON.stringify(message)); //start sync requestInstance.open(); - intervalID = setInterval(requestInstance.open, BACKUP_INTERVAL); + intervalID = setInterval(() => requestInstance.open(), BACKUP_INTERVAL); } function stopSlaveProcess() { @@ -46,7 +48,7 @@ function requestBackupSync(ws) { DB.query('SELECT MAX(timestamp) as last_time FROM _backup').then(result => { let request = { floID: global.myFloID, - pubKey: myPubKey, + pubKey: global.myPubKey, type: "BACKUP_SYNC", last_time: result[0].last_time, req_time: Date.now() @@ -93,6 +95,7 @@ requestInstance.open = function(ws = null) { self.request = request; self.cache = []; self.add_count = self.delete_count = 0; + self.last_response_time = Date.now(); self.ws = ws; }).catch(error => console.error(error)) } @@ -116,7 +119,7 @@ requestInstance.close = function() { function processDataFromMaster(message) { try { message = JSON.parse(message); - console.debug("Master:", message); + //console.debug("Master:", message); if (message.command.startsWith("SYNC")) processBackupData(message); else switch (message.command) { @@ -124,7 +127,12 @@ function processDataFromMaster(message) { storeSinkShare(message); break; case "SEND_SHARE": - sendSinkShare(); + sendSinkShare(message.pubKey); + break; + case "REQUEST_ERROR": + console.log(message.error); + if (message.type === "BACKUP_SYNC") + requestInstance.close(); break; } } catch (error) { @@ -133,19 +141,25 @@ function processDataFromMaster(message) { } function storeSinkShare(message) { - console.debug(Date.now(), '|sinkID:', message.sinkID, '|share:', message.keyShare); - DB.query("INSERT INTO sinkShares (floID, share) VALUE (?, ?)", [message.sinkID, message.keyShare]) + let encryptedShare = Crypto.AES.encrypt(floCrypto.decryptData(message.keyShare, global.myPrivKey), global.myPrivKey); + console.debug(Date.now(), '|sinkID:', message.sinkID, '|EnShare:', encryptedShare); + DB.query("INSERT INTO sinkShares (floID, share) VALUE (?, ?) AS new ON DUPLICATE KEY UPDATE share=new.share", [message.sinkID, encryptedShare]) .then(_ => null).catch(error => console.error(error)); } -function sendSinkShare() { +function sendSinkShare(pubKey) { DB.query("SELECT floID, share FROM sinkShares ORDER BY time_ DESC LIMIT 1").then(result => { + if (!result.length) + return console.warn("No key-shares in DB!"); + let share = Crypto.AES.decrypt(result[0].share, global.myPrivKey); + if (share.startsWith(SINK_KEY_INDICATOR)) + console.warn("Key is stored instead of share!"); let response = { type: "SINK_SHARE", sinkID: result[0].floID, - share: result[0].share, + share: floCrypto.encryptData(share, pubKey), floID: global.myFloID, - pubKey: global.pubKey, + pubKey: global.myPubKey, req_time: Date.now() } response.sign = floCrypto.signData(response.type + "|" + response.req_time, global.myPrivKey); //TODO: strengthen signature @@ -160,10 +174,6 @@ function processBackupData(response) { const self = requestInstance; self.last_response_time = Date.now(); switch (response.command) { - case "SYNC_ERROR": - console.log(response.error); - self.close(); - break; case "SYNC_END": if (response.status) { if (self.total_add !== self.add_count) @@ -182,7 +192,7 @@ function processBackupData(response) { break; case "SYNC_HEADER": self.add_data = response.add_data; - self.total_add = Object.keys(response.add_data).length; + self.total_add = new Set(response.add_data.map(a => a.t_name)).size; break; case "SYNC_UPDATE": self.add_count += 1; @@ -229,7 +239,7 @@ function storeBackupData(cache_promises, add_header, delete_header) { console.error(error); console.warn("ABORT: BackupCache -> Tables") }) - }).catch(error => reject(error)) + }) } storeBackupData.commit = function(data, result) { @@ -287,6 +297,7 @@ function updateTableData(table, data) { const validateValue = val => (typeof val === "string" && /\.\d{3}Z$/.test(val)) ? val.substring(0, val.length - 1) : val; module.exports = { + SINK_KEY_INDICATOR, set DB(db) { DB = db; }, diff --git a/src/floCrypto.js b/src/floCrypto.js index 1c69e31..dacf98c 100644 --- a/src/floCrypto.js +++ b/src/floCrypto.js @@ -44,8 +44,8 @@ }, getSenderPublicKeyString: function() { - privateKey = ellipticCurveEncryption.senderRandom(); - senderPublicKeyString = ellipticCurveEncryption.senderPublicString(privateKey); + let privateKey = ellipticCurveEncryption.senderRandom(); + var senderPublicKeyString = ellipticCurveEncryption.senderPublicString(privateKey); return { privateKey: privateKey, senderPublicKeyString: senderPublicKeyString @@ -80,8 +80,8 @@ //If the private key corresponded to a compressed public key, also drop the last byte (it should be 0x01). if (isPubKeyCompressed == true) pk.pop() pk.unshift(0) - privateKeyDecimal = BigInteger(pk).toString() - privateKeyHex = Crypto.util.bytesToHex(pk) + let privateKeyDecimal = BigInteger(pk).toString() + let privateKeyHex = Crypto.util.bytesToHex(pk) return { privateKeyDecimal: privateKeyDecimal, privateKeyHex: privateKeyHex