Process for exception cases
Case: No other node active when self node is init - Start the exchange system. Case: When node becomes master when prev node goes down - Add self share to collectShare - collect shares from other node to re-construct sink private key - Fixed some minor bugs
This commit is contained in:
parent
52a1b3077d
commit
11b99c3e76
@ -140,11 +140,13 @@ function send_dataImmutable(timestamp, ws) {
|
||||
//Shares
|
||||
function generateNewSink() {
|
||||
let sink = floCrypto.generateNewID();
|
||||
let nextNodes = KB.nextNode(global.myFloID, null);
|
||||
let shares = floCrypto.createShamirsSecretShares(sink.privKey, nextNodes.length, Math.ceil(nextNodes.length * shareThreshold));
|
||||
sink.shares = {};
|
||||
for (let i in nextNodes)
|
||||
sink.shares[nextNodes[i]] = shares[i];
|
||||
let nextNodes = nodeKBucket.nextNode(global.myFloID, null);
|
||||
if (nextNodes.length) {
|
||||
let shares = floCrypto.createShamirsSecretShares(sink.privKey, nextNodes.length, Math.ceil(nextNodes.length * shareThreshold));
|
||||
for (let i in nextNodes)
|
||||
sink.shares[nextNodes[i]] = shares[i];
|
||||
}
|
||||
return sink;
|
||||
}
|
||||
|
||||
@ -212,20 +214,20 @@ function collectShares(floID, sinkID, share) {
|
||||
function connectWS(floID) {
|
||||
let url = nodeURL[floID];
|
||||
return new Promise((resolve, reject) => {
|
||||
const ws = new WebSocket(url);
|
||||
const ws = new WebSocket('ws://' + url);
|
||||
ws.on('open', _ => resolve(ws));
|
||||
ws.on('error', _ => reject(error));
|
||||
ws.on('error', error => reject(error));
|
||||
})
|
||||
}
|
||||
|
||||
function connectToMaster(i = 0) {
|
||||
function connectToMaster(i = 0, init = false) {
|
||||
if (i >= nodeList.length) {
|
||||
console.error("No master is found, and myFloID is not in list. This should not happen!");
|
||||
process.exit(1);
|
||||
}
|
||||
let floID = nodeList[i];
|
||||
if (floID === myFloID)
|
||||
serveAsMaster();
|
||||
serveAsMaster(init);
|
||||
else
|
||||
connectWS(floID).then(ws => {
|
||||
ws.floID = floID;
|
||||
@ -233,26 +235,27 @@ function connectToMaster(i = 0) {
|
||||
serveAsSlave(ws);
|
||||
}).catch(error => {
|
||||
console.log(`Node(${floID}) is offline`);
|
||||
connectToMaster(i + 1)
|
||||
connectToMaster(i + 1, init)
|
||||
});
|
||||
}
|
||||
|
||||
//Node becomes master
|
||||
function serveAsMaster() {
|
||||
app.resume();
|
||||
function serveAsMaster(init) {
|
||||
console.debug('Starting master process');
|
||||
slave.stop();
|
||||
mod = MASTER_MODE;
|
||||
informLiveNodes();
|
||||
collectShares.active = true;
|
||||
informLiveNodes(init);
|
||||
app.resume();
|
||||
}
|
||||
|
||||
function serveAsSlave(ws) {
|
||||
console.debug('Starting slave process');
|
||||
app.pause();
|
||||
slave.start(ws);
|
||||
mod = SLAVE_MODE;
|
||||
}
|
||||
|
||||
function informLiveNodes() {
|
||||
function informLiveNodes(init) {
|
||||
let message = {
|
||||
floID: global.myFloID,
|
||||
type: "UPDATE_MASTER",
|
||||
@ -261,12 +264,34 @@ function informLiveNodes() {
|
||||
};
|
||||
message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey);
|
||||
message = JSON.stringify(message);
|
||||
for (let n in nodeURL)
|
||||
if (n !== global.myFloID)
|
||||
connectWS(n).then(ws => {
|
||||
let nodes = nodeList.filter(n => n !== global.myFloID);
|
||||
Promise.allSettled(nodes.map(n => connectWS(n))).then(result => {
|
||||
let flag = false;
|
||||
for (let i in result)
|
||||
if (result[i].status === "fulfilled") {
|
||||
let ws = result[i].value;
|
||||
ws.send(message);
|
||||
ws.close();
|
||||
}).catch(error => console.warn(`Node ${n} is offline`));
|
||||
flag = true;
|
||||
} else
|
||||
console.warn(`Node(${nodes[i]}) is offline`);
|
||||
if (init) {
|
||||
if (flag === true)
|
||||
collectShares.active = true;
|
||||
else {
|
||||
//No other node is active (possible 1st node to start exchange)
|
||||
console.debug("Starting the exchange...")
|
||||
let newSink = generateNewSink();
|
||||
storeSink(newSink.floID, newSink.privKey);
|
||||
sendSharesToNodes(newSink.floID, newSink.shares);
|
||||
}
|
||||
} else {
|
||||
collectShares.active = true;
|
||||
DB.query("SELECT floID, share FROM sinkShares ORDER BY time_ DESC LIMIT 1")
|
||||
.then(result => collectShares(global.myFloID, result[0].floID, result[0].share))
|
||||
.catch(error => console.error(error))
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function updateMaster(floID) {
|
||||
@ -343,7 +368,7 @@ function startBackupTransmitter(server) {
|
||||
function initProcess(a) {
|
||||
app = a;
|
||||
startBackupTransmitter(app.server);
|
||||
connectToMaster();
|
||||
connectToMaster(0, true);
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
@ -352,6 +377,7 @@ module.exports = {
|
||||
nodeURL = list;
|
||||
nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL));
|
||||
nodeList = nodeKBucket.order;
|
||||
console.debug(nodeList);
|
||||
},
|
||||
set DB(db) {
|
||||
DB = db;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user