diff --git a/src/backup/head.js b/src/backup/head.js index e64bde5..7096bcf 100644 --- a/src/backup/head.js +++ b/src/backup/head.js @@ -140,11 +140,13 @@ function send_dataImmutable(timestamp, ws) { //Shares function generateNewSink() { let sink = floCrypto.generateNewID(); - let nextNodes = KB.nextNode(global.myFloID, null); - let shares = floCrypto.createShamirsSecretShares(sink.privKey, nextNodes.length, Math.ceil(nextNodes.length * shareThreshold)); sink.shares = {}; - for (let i in nextNodes) - sink.shares[nextNodes[i]] = shares[i]; + let nextNodes = nodeKBucket.nextNode(global.myFloID, null); + if (nextNodes.length) { + let shares = floCrypto.createShamirsSecretShares(sink.privKey, nextNodes.length, Math.ceil(nextNodes.length * shareThreshold)); + for (let i in nextNodes) + sink.shares[nextNodes[i]] = shares[i]; + } return sink; } @@ -212,20 +214,20 @@ function collectShares(floID, sinkID, share) { function connectWS(floID) { let url = nodeURL[floID]; return new Promise((resolve, reject) => { - const ws = new WebSocket(url); + const ws = new WebSocket('ws://' + url); ws.on('open', _ => resolve(ws)); - ws.on('error', _ => reject(error)); + ws.on('error', error => reject(error)); }) } -function connectToMaster(i = 0) { +function connectToMaster(i = 0, init = false) { if (i >= nodeList.length) { console.error("No master is found, and myFloID is not in list. This should not happen!"); process.exit(1); } let floID = nodeList[i]; if (floID === myFloID) - serveAsMaster(); + serveAsMaster(init); else connectWS(floID).then(ws => { ws.floID = floID; @@ -233,26 +235,27 @@ function connectToMaster(i = 0) { serveAsSlave(ws); }).catch(error => { console.log(`Node(${floID}) is offline`); - connectToMaster(i + 1) + connectToMaster(i + 1, init) }); } //Node becomes master -function serveAsMaster() { - app.resume(); +function serveAsMaster(init) { + console.debug('Starting master process'); slave.stop(); mod = MASTER_MODE; - informLiveNodes(); - collectShares.active = true; + informLiveNodes(init); + app.resume(); } function serveAsSlave(ws) { + console.debug('Starting slave process'); app.pause(); slave.start(ws); mod = SLAVE_MODE; } -function informLiveNodes() { +function informLiveNodes(init) { let message = { floID: global.myFloID, type: "UPDATE_MASTER", @@ -261,12 +264,34 @@ function informLiveNodes() { }; message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey); message = JSON.stringify(message); - for (let n in nodeURL) - if (n !== global.myFloID) - connectWS(n).then(ws => { + let nodes = nodeList.filter(n => n !== global.myFloID); + Promise.allSettled(nodes.map(n => connectWS(n))).then(result => { + let flag = false; + for (let i in result) + if (result[i].status === "fulfilled") { + let ws = result[i].value; ws.send(message); ws.close(); - }).catch(error => console.warn(`Node ${n} is offline`)); + flag = true; + } else + console.warn(`Node(${nodes[i]}) is offline`); + if (init) { + if (flag === true) + collectShares.active = true; + else { + //No other node is active (possible 1st node to start exchange) + console.debug("Starting the exchange...") + let newSink = generateNewSink(); + storeSink(newSink.floID, newSink.privKey); + sendSharesToNodes(newSink.floID, newSink.shares); + } + } else { + collectShares.active = true; + DB.query("SELECT floID, share FROM sinkShares ORDER BY time_ DESC LIMIT 1") + .then(result => collectShares(global.myFloID, result[0].floID, result[0].share)) + .catch(error => console.error(error)) + } + }); } function updateMaster(floID) { @@ -343,7 +368,7 @@ function startBackupTransmitter(server) { function initProcess(a) { app = a; startBackupTransmitter(app.server); - connectToMaster(); + connectToMaster(0, true); } module.exports = { @@ -352,6 +377,7 @@ module.exports = { nodeURL = list; nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL)); nodeList = nodeKBucket.order; + console.debug(nodeList); }, set DB(db) { DB = db;