Improved sink-key management
- multiple sink keys supported - sink keys are split and shared among nodes (master and slaves) - when a key is required, master collects the shares from slaves and reconstructs. (key is removed from memory after usage)
This commit is contained in:
parent
60a574a905
commit
65f7a5d94e
@ -27,7 +27,6 @@ module.exports = {
|
||||
backup: {
|
||||
SHARE_THRESHOLD: 50 / 100, //50%
|
||||
HASH_N_ROW: 100,
|
||||
SINK_KEY_INDICATOR: '$$$',
|
||||
BACKUP_INTERVAL: 5 * 60 * 1000, //5 min
|
||||
BACKUP_SYNC_TIMEOUT: 10 * 60 * 1000, //10 mins
|
||||
CHECKSUM_INTERVAL: 100, //times of BACKUP_INTERVAL
|
||||
|
||||
@ -6,39 +6,39 @@ const sync = require('./sync');
|
||||
const WebSocket = require('ws');
|
||||
|
||||
const {
|
||||
SINK_KEY_INDICATOR,
|
||||
SHARE_THRESHOLD
|
||||
} = require("../_constants")["backup"];
|
||||
|
||||
var DB, app, wss, tokenList; //Container for database and app
|
||||
var nodeList, nodeURL, nodeKBucket; //Container for (backup) node list
|
||||
var nodeShares = null,
|
||||
connectedSlaves = {},
|
||||
mod = null;
|
||||
const connectedSlaves = {},
|
||||
shares_collected = {},
|
||||
shares_pending = {};
|
||||
var _mode = null;
|
||||
const SLAVE_MODE = 0,
|
||||
MASTER_MODE = 1;
|
||||
|
||||
const sinkList = {};
|
||||
|
||||
//Shares
|
||||
function generateShares(sinkKey) {
|
||||
let nextNodes = nodeKBucket.nextNode(global.myFloID, null),
|
||||
aliveNodes = Object.keys(connectedSlaves);
|
||||
if (nextNodes.length == 0) //This is the last node in nodeList
|
||||
return false;
|
||||
else if (aliveNodes.length == 0) //This is the last node in nodeList
|
||||
return null;
|
||||
else {
|
||||
let N = nextNodes.length + 1,
|
||||
th = Math.ceil(aliveNodes.length * SHARE_THRESHOLD) + 1,
|
||||
shares, refShare, mappedShares = {};
|
||||
shares = floCrypto.createShamirsSecretShares(sinkKey, N, th);
|
||||
refShare = shares.pop();
|
||||
for (let i in nextNodes)
|
||||
mappedShares[nextNodes[i]] = [refShare, shares[i]].join("|");
|
||||
return mappedShares;
|
||||
}
|
||||
nextNodes.unshift(global.myFloID);
|
||||
aliveNodes.unshift(global.myFloID);
|
||||
let N = nextNodes.length + 1,
|
||||
th = Math.ceil(aliveNodes.length * SHARE_THRESHOLD) + 1,
|
||||
shares, refShare, mappedShares = {};
|
||||
shares = floCrypto.createShamirsSecretShares(sinkKey, N, th);
|
||||
refShare = shares.pop();
|
||||
for (let i in nextNodes)
|
||||
mappedShares[nextNodes[i]] = [refShare, shares[i]].join("|");
|
||||
return mappedShares;
|
||||
}
|
||||
|
||||
function sendShare(ws, sinkID, keyShare) {
|
||||
if (shares_pending[sinkID][ws.floID] === keyShare) //this should always be true unless there is an error
|
||||
delete shares_pending[sinkID][ws.floID]; //delete the share after sending it to respective slave
|
||||
ws.send(JSON.stringify({
|
||||
command: "SINK_SHARE",
|
||||
sinkID,
|
||||
@ -47,19 +47,23 @@ function sendShare(ws, sinkID, keyShare) {
|
||||
}
|
||||
|
||||
function sendSharesToNodes(sinkID, shares) {
|
||||
nodeShares = shares;
|
||||
shares_pending[sinkID] = shares;
|
||||
for (let node in shares)
|
||||
if (node in connectedSlaves)
|
||||
sendShare(connectedSlaves[node], sinkID, shares[node]);
|
||||
if (global.myFloID in shares) {
|
||||
slave.storeShare(sinkID, shares[global.myFloID], false);
|
||||
delete shares_pending[sinkID][global.myFloID];
|
||||
}
|
||||
sinkList[sinkID] = Date.now();
|
||||
}
|
||||
|
||||
function storeSink(sinkID, sinkPrivKey) {
|
||||
global.sinkID = sinkID;
|
||||
global.sinkPrivKey = sinkPrivKey;
|
||||
let encryptedKey = Crypto.AES.encrypt(SINK_KEY_INDICATOR + sinkPrivKey, global.myPrivKey);
|
||||
DB.query('INSERT INTO sinkShares (floID, share) VALUE (?, ?) ON DUPLICATE KEY UPDATE share=?', [sinkID, encryptedKey, encryptedKey])
|
||||
.then(_ => console.log('SinkID:', sinkID, '|SinkEnKey:', encryptedKey))
|
||||
.catch(error => console.error(error));
|
||||
function requestShare(ws, sinkID) {
|
||||
ws.send(JSON.stringify({
|
||||
command: "SEND_SHARE",
|
||||
sinkID: sinkID,
|
||||
pubKey: global.myPubKey
|
||||
}));
|
||||
}
|
||||
|
||||
/*
|
||||
@ -97,42 +101,42 @@ function transferMoneyToNewSink(oldSinkID, oldSinkKey, newSink) {
|
||||
}
|
||||
*/
|
||||
|
||||
const collectShares = {};
|
||||
collectShares.retrive = function(floID, sinkID, share) {
|
||||
const self = this;
|
||||
if (!self.sinkID) {
|
||||
self.sinkID = sinkID;
|
||||
self.shares = {};
|
||||
} else if (self.sinkID !== sinkID)
|
||||
return console.error("Something is wrong! Slaves are sending different sinkID");
|
||||
if (share.startsWith(SINK_KEY_INDICATOR)) {
|
||||
let sinkKey = share.substring(SINK_KEY_INDICATOR.length);
|
||||
console.debug("Received sink:", sinkID);
|
||||
self.verify(sinkKey);
|
||||
} else
|
||||
self.shares[floID] = share.split("|");
|
||||
function collectAndCall(sinkID, callback) {
|
||||
if (!(callback instanceof Function))
|
||||
throw Error("callback should be a function");
|
||||
if (sinkID in shares_collected) //if there is already a collect for sinkID, then just add the callback to queue
|
||||
return shares_collected[sinkID].callbacks.push(callback);
|
||||
|
||||
shares_collected[sinkID] = {
|
||||
callbacks: [callback],
|
||||
shares: {}
|
||||
};
|
||||
for (let floID in connectedSlaves)
|
||||
requestShare(connectedSlaves[floID], sinkID);
|
||||
DB.query("SELECT share FROM sinkShares WHERE floID=?", [sinkID]).then(result => {
|
||||
if (result.length)
|
||||
collectShares(myFloID, sinkID, Crypto.AES.decrypt(result[0].share, global.myPrivKey))
|
||||
}).catch(error => console.error(error))
|
||||
}
|
||||
|
||||
function collectShares(floID, sinkID, share) {
|
||||
if (_mode !== MASTER_MODE)
|
||||
return console.warn("Not serving as master");
|
||||
if (!(sinkID in shares_collected))
|
||||
return console.error("Something is wrong! Slaves are sending sinkID thats not been collected");
|
||||
shares_collected[sinkID].shares[floID] = share.split("|");
|
||||
try {
|
||||
let sinkKey = floCrypto.retrieveShamirSecret([].concat(...Object.values(self.shares)));
|
||||
console.debug("Retrived sink:", sinkID);
|
||||
self.verify(sinkKey);
|
||||
let sinkKey = floCrypto.retrieveShamirSecret([].concat(...Object.values(shares_collected[sinkID].shares)));
|
||||
if (floCrypto.verifyPrivKey(sinkKey, sinkID)) {
|
||||
console.log("Shares collected successfully for", sinkID);
|
||||
shares_collected[sinkID].callbacks.map(fn => fn(sinkKey));
|
||||
delete shares_collected[sinkID];
|
||||
}
|
||||
} catch {
|
||||
//Unable to retrive sink private key. Waiting for more shares! Do nothing for now
|
||||
};
|
||||
}
|
||||
|
||||
collectShares.verify = function(sinkKey) {
|
||||
const self = this;
|
||||
if (floCrypto.verifyPrivKey(sinkKey, self.sinkID)) {
|
||||
let sinkID = self.sinkID;
|
||||
console.log("Shares collected successfully for", sinkID);
|
||||
self.active = false;
|
||||
delete self.sinkID;
|
||||
delete self.shares;
|
||||
storeSink(sinkID, sinkKey);
|
||||
sendSharesToNodes(sinkID, generateShares(sinkKey));
|
||||
}
|
||||
}
|
||||
|
||||
function connectWS(floID) {
|
||||
let url = nodeURL[floID];
|
||||
return new Promise((resolve, reject) => {
|
||||
@ -165,7 +169,7 @@ function connectToMaster(i = 0, init = false) {
|
||||
function serveAsMaster(init) {
|
||||
console.debug('Starting master process');
|
||||
slave.stop();
|
||||
mod = MASTER_MODE;
|
||||
_mode = MASTER_MODE;
|
||||
informLiveNodes(init);
|
||||
app.resume();
|
||||
}
|
||||
@ -174,7 +178,7 @@ function serveAsSlave(ws, init) {
|
||||
console.debug('Starting slave process');
|
||||
app.pause();
|
||||
slave.start(ws, init);
|
||||
mod = SLAVE_MODE;
|
||||
_mode = SLAVE_MODE;
|
||||
}
|
||||
|
||||
function informLiveNodes(init) {
|
||||
@ -199,40 +203,15 @@ function informLiveNodes(init) {
|
||||
console.warn(`Node(${nodes[i]}) is offline`);
|
||||
if (init && flag)
|
||||
syncRequest();
|
||||
//Check if sinkKey or share available in DB
|
||||
DB.query("SELECT floID, share FROM sinkShares ORDER BY time_stored DESC LIMIT 1").then(result => {
|
||||
if (result.length) {
|
||||
let share = Crypto.AES.decrypt(result[0].share, global.myPrivKey);
|
||||
if (share.startsWith(SINK_KEY_INDICATOR)) {
|
||||
//sinkKey is already present in DB, use it directly
|
||||
collectShares.active = false;
|
||||
global.sinkPrivKey = share.substring(SINK_KEY_INDICATOR.length);
|
||||
global.sinkID = floCrypto.getFloID(global.sinkPrivKey);
|
||||
if (global.sinkID != result[0].floID) {
|
||||
console.warn("sinkID and sinkKey in DB are not pair!");
|
||||
storeSink(global.sinkID, global.sinkPrivKey);
|
||||
}
|
||||
console.debug("Loaded sink:", global.sinkID);
|
||||
sendSharesToNodes(global.sinkID, generateShares(global.sinkPrivKey))
|
||||
} else {
|
||||
//Share is present in DB, try to collect remaining shares and retrive sinkKey
|
||||
collectShares.active = true;
|
||||
collectShares.retrive(global.myFloID, result[0].floID, share);
|
||||
}
|
||||
} else if (init) {
|
||||
if (flag) //Other nodes online, try to collect shares and retrive sinkKey
|
||||
collectShares.active = true;
|
||||
else {
|
||||
//No other node is active (possible 1st node to start exchange)
|
||||
console.log("Starting the exchange...");
|
||||
collectShares.active = false;
|
||||
let newSink = floCrypto.generateNewID();
|
||||
console.debug("Generated sink:", newSink.floID, newSink.privKey);
|
||||
storeSink(newSink.floID, newSink.privKey);
|
||||
sendSharesToNodes(newSink.floID, generateShares(newSink.privKey));
|
||||
}
|
||||
} else //This should not happen!
|
||||
console.error("Something is wrong! Node is not starting and no key/share present in DB");
|
||||
DB.query("SELECT floID, share FROM sinkShares ORDER BY time_stored DESC").then(result => {
|
||||
if (result.length)
|
||||
result.forEach(r => reconstructShares(r.floID));
|
||||
else if (!flag) {
|
||||
console.log("Starting the exchange...");
|
||||
let newSink = floCrypto.generateNewID();
|
||||
console.debug("Generated sink:", newSink.floID, newSink.privKey);
|
||||
sendSharesToNodes(newSink.floID, generateShares(newSink.privKey));
|
||||
}
|
||||
}).catch(error => console.error(error));
|
||||
});
|
||||
}
|
||||
@ -248,25 +227,42 @@ function syncRequest(cur = global.myFloID) {
|
||||
}
|
||||
|
||||
function updateMaster(floID) {
|
||||
let currentMaster = mod === MASTER_MODE ? global.myFloID : slave.masterWS.floID;
|
||||
let currentMaster = _mode === MASTER_MODE ? global.myFloID : slave.masterWS.floID;
|
||||
if (nodeList.indexOf(floID) < nodeList.indexOf(currentMaster))
|
||||
connectToMaster();
|
||||
}
|
||||
|
||||
function slaveConnect(floID, pubKey, ws) {
|
||||
function reconstructShares(sinkID) {
|
||||
if (_mode !== MASTER_MODE)
|
||||
return console.warn("Not serving as master");
|
||||
sinkList[sinkID] = null;
|
||||
collectAndCall(sinkID, sinkKey => sendSharesToNodes(sinkID, generateShares(sinkKey)));
|
||||
}
|
||||
|
||||
function slaveConnect(floID, pubKey, ws, sinks) {
|
||||
if (_mode !== MASTER_MODE)
|
||||
return console.warn("Not serving as master");
|
||||
ws.floID = floID;
|
||||
ws.pubKey = pubKey;
|
||||
connectedSlaves[floID] = ws;
|
||||
if (collectShares.active)
|
||||
ws.send(JSON.stringify({
|
||||
command: "SEND_SHARE",
|
||||
pubKey: global.myPubKey
|
||||
}));
|
||||
else if (nodeShares === null || //The 1st backup is connected
|
||||
Object.keys(connectedSlaves).length < Math.pow(SHARE_THRESHOLD, 2) * Object.keys(nodeShares).length) //re-calib shares for better
|
||||
|
||||
//Send shares if need to be delivered
|
||||
for (let sinkID in shares_pending)
|
||||
if (floID in shares_pending[sinkID])
|
||||
sendShare(ws, sinkID, shares_pending[floID]);
|
||||
//Request shares if any
|
||||
for (let sinkID in shares_collected)
|
||||
requestShare(ws, sinkID); //if (!(floID in shares_collected[sinkID].shares))
|
||||
//check if sinks in slaves are present
|
||||
if (Array.isArray(sinks))
|
||||
for (let sinkID of sinks)
|
||||
if (!(sinkID in sinkList))
|
||||
reconstructShares(sinkID);
|
||||
/*
|
||||
if (shares_pending === null || //The 1st backup is connected
|
||||
Object.keys(connectedSlaves).length < Math.pow(SHARE_THRESHOLD, 2) * Object.keys(shares_pending).length) //re-calib shares for better
|
||||
sendSharesToNodes(global.sinkID, generateShares(global.sinkPrivKey))
|
||||
else if (nodeShares[floID])
|
||||
sendShare(ws, global.sinkID, nodeShares[floID]);
|
||||
*/
|
||||
}
|
||||
|
||||
//Transmistter
|
||||
@ -302,10 +298,10 @@ function startBackupTransmitter(server) {
|
||||
updateMaster(request.floID);
|
||||
break;
|
||||
case "SLAVE_CONNECT":
|
||||
slaveConnect(request.floID, request.pubKey, ws);
|
||||
slaveConnect(request.floID, request.pubKey, ws, request.sinks);
|
||||
break;
|
||||
case "SINK_SHARE":
|
||||
collectShares.retrive(request.floID, request.sinkID, floCrypto.decryptData(request.share, global.myPrivKey))
|
||||
collectShares(request.floID, request.sinkID, floCrypto.decryptData(request.share, global.myPrivKey))
|
||||
default:
|
||||
invalid = "Invalid Request Type";
|
||||
}
|
||||
|
||||
@ -4,7 +4,6 @@ const {
|
||||
BACKUP_INTERVAL,
|
||||
BACKUP_SYNC_TIMEOUT,
|
||||
CHECKSUM_INTERVAL,
|
||||
SINK_KEY_INDICATOR,
|
||||
HASH_N_ROW
|
||||
} = require("../_constants")["backup"];
|
||||
|
||||
@ -20,19 +19,25 @@ function startSlaveProcess(ws, init) {
|
||||
//set masterWS
|
||||
ws.on('message', processDataFromMaster);
|
||||
masterWS = ws;
|
||||
//inform master
|
||||
let message = {
|
||||
floID: global.myFloID,
|
||||
pubKey: global.myPubKey,
|
||||
req_time: Date.now(),
|
||||
type: "SLAVE_CONNECT"
|
||||
}
|
||||
message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey);
|
||||
ws.send(JSON.stringify(message));
|
||||
//start sync
|
||||
if (init)
|
||||
requestInstance.open();
|
||||
intervalID = setInterval(() => requestInstance.open(), BACKUP_INTERVAL);
|
||||
let sinks_stored = [];
|
||||
DB.query("SELECT floID FROM sinkShares").then(result => {
|
||||
sinks_stored = result.map(r => r.floID);
|
||||
}).catch(error => console.error(error)).finally(_ => {
|
||||
//inform master
|
||||
let message = {
|
||||
floID: global.myFloID,
|
||||
pubKey: global.myPubKey,
|
||||
sinks: sinks_stored,
|
||||
req_time: Date.now(),
|
||||
type: "SLAVE_CONNECT"
|
||||
}
|
||||
message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey);
|
||||
ws.send(JSON.stringify(message));
|
||||
//start sync
|
||||
if (init)
|
||||
requestInstance.open();
|
||||
intervalID = setInterval(() => requestInstance.open(), BACKUP_INTERVAL);
|
||||
})
|
||||
}
|
||||
|
||||
function stopSlaveProcess() {
|
||||
@ -131,7 +136,7 @@ function processDataFromMaster(message) {
|
||||
storeSinkShare(message.sinkID, message.keyShare);
|
||||
break;
|
||||
case "SEND_SHARE":
|
||||
sendSinkShare(message.pubKey);
|
||||
sendSinkShare(message.sinkID, message.pubKey);
|
||||
break;
|
||||
case "REQUEST_ERROR":
|
||||
console.log(message.error);
|
||||
@ -144,23 +149,23 @@ function processDataFromMaster(message) {
|
||||
}
|
||||
}
|
||||
|
||||
function storeSinkShare(sinkID, keyShare) {
|
||||
let encryptedShare = Crypto.AES.encrypt(floCrypto.decryptData(keyShare, global.myPrivKey), global.myPrivKey);
|
||||
function storeSinkShare(sinkID, keyShare, decrypt = true) {
|
||||
if (decrypt)
|
||||
keyShare = floCrypto.decryptData(keyShare, global.myPrivKey)
|
||||
let encryptedShare = Crypto.AES.encrypt(keyShare, global.myPrivKey);
|
||||
console.log(Date.now(), '|sinkID:', sinkID, '|EnShare:', encryptedShare);
|
||||
DB.query("INSERT INTO sinkShares (floID, share) VALUE (?, ?) ON DUPLICATE KEY UPDATE share=?", [sinkID, encryptedShare, encryptedShare])
|
||||
.then(_ => null).catch(error => console.error(error));
|
||||
}
|
||||
|
||||
function sendSinkShare(pubKey) {
|
||||
DB.query("SELECT floID, share FROM sinkShares ORDER BY time_stored DESC LIMIT 1").then(result => {
|
||||
function sendSinkShare(sinkID, pubKey) {
|
||||
DB.query("SELECT share FROM sinkShares WHERE floID=?", [sinkID]).then(result => {
|
||||
if (!result.length)
|
||||
return console.warn("No key-shares in DB!");
|
||||
return console.warn(`key-shares for ${sinkID} not found in DB!`);
|
||||
let share = Crypto.AES.decrypt(result[0].share, global.myPrivKey);
|
||||
if (share.startsWith(SINK_KEY_INDICATOR))
|
||||
console.warn("Key is stored instead of share!");
|
||||
let response = {
|
||||
type: "SINK_SHARE",
|
||||
sinkID: result[0].floID,
|
||||
sinkID: sinkID,
|
||||
share: floCrypto.encryptData(share, pubKey),
|
||||
floID: global.myFloID,
|
||||
pubKey: global.myPubKey,
|
||||
@ -425,5 +430,6 @@ module.exports = {
|
||||
},
|
||||
start: startSlaveProcess,
|
||||
stop: stopSlaveProcess,
|
||||
storeShare: storeSinkShare,
|
||||
syncRequest: ws => requestInstance.open(ws)
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user