From d7be0d414f48a6e0c945c281f801c2416ec1ec81 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Tue, 12 Nov 2019 21:15:14 +0530 Subject: [PATCH] Flow when a node dies Added the indications and processes when a noded goes offline: The previous nodes that the dead was serving ll connect to a next available node The immediate prev node informs the immediate next node to start serving the dead --- index.html | 228 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 158 insertions(+), 70 deletions(-) diff --git a/index.html b/index.html index dc1501b..3f9eb1c 100644 --- a/index.html +++ b/index.html @@ -35,7 +35,8 @@ defaultStorage : "General", serveList : [], backupStoredList : [], - supernodeConfig : {} + supernodeConfig : {}, + backupNodes : [] } @@ -6043,12 +6044,10 @@ setInterval(refreshBlockchainData, 3600000); floSupernode.initSupernode(serverPwd, myFloID).then(result => { console.log(result) - floSupernode.kBucket.getNextSupernode(myFloID).then(result => { - connectToBackupSupernode(result[0].floID).then(result => { - console.log(result) - reactor.dispatchEvent("indicate_sn_up",myFloID) - reactor.dispatchEvent("request_data",{floid:myFloID, holder:floGlobals.backupNode.floID}) - }) + connectToAllBackupSupernode().then(result => { + console.log(result) + reactor.dispatchEvent("indicate_sn_up",myFloID) + reactor.dispatchEvent("request_data",{floid:myFloID, holder:floGlobals.backupNode.floID}) }) }) }) @@ -6127,6 +6126,24 @@ }) } + function connectToAllBackupSupernode(curNode = myFloID, i = 0){ + return new Promise((resolve,reject) => { + if(i >= floGlobals.supernodeConfig.backupDepth) + resolve("Connected to all backup nodes") + else{ + floSupernode.kBucket.getNextSupernode(curNode).then(nextBackupNode => { + connectToBackupSupernode(nextBackupNode).then(backupConn => { + floGlobals.backupNodes[i] = backupConn + connectToAllBackupSupernode(backupConn.floID, i+1) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) + }).catch(error => console.log(error)) + } + + }) + } + function connectToBackupSupernode(backupNodeID){ return new Promise((resolve,reject) => { initateBackupWebsocket(backupNodeID) @@ -6143,16 +6160,16 @@ function initateBackupWebsocket(backupNodeID){ return new Promise((resolve,reject) => { try{ - floGlobals.backupNode = { + var backupNode = { floID: backupNodeID, wsConn: new WebSocket("wss://" + floGlobals.supernodes[backupNodeID].uri + "/ws") } - floGlobals.backupNode.wsConn.onopen = (evt) => { - floGlobals.backupNode.wsConn.onmessage = (ev) => reactor.dispatchEvent("backup_node_message",ev); - floGlobals.backupNode.wsConn.onclose = (ev) => reactor.dispatchEvent("backup_node_disconnected",ev); - resolve(`connnected to ${backupNodeID} ws`); + backupNode.wsConn.onopen = (evt) => { + backupNode.wsConn.onmessage = (ev) => reactor.dispatchEvent("backup_node_message",{floID:backupNodeID,data:ev.data}); + backupNode.wsConn.onclose = (ev) => reactor.dispatchEvent("backup_node_disconnected",backupNodeID); + resolve(backupNode); } - floGlobals.backupNode.wsConn.onerror = (evt) => reject(`${backupNodeID} ws not found`); + backupNode.wsConn.onerror = (evt) => reject(`${backupNodeID} ws not found`); }catch(error){ reject(error.message) } @@ -6168,93 +6185,110 @@ console.log("backup_node_message"); //received message from backup node if(event.data == "$+") - reactor.dispatchEvent("backup_node_online",null); + reactor.dispatchEvent("backup_node_online",event.floID); else if (event.data == "$-") - reactor.dispatchEvent("backup_node_offline",null); + reactor.dispatchEvent("backup_node_offline",event.floID); else console.log(event.data) }) reactor.registerEvent("backup_node_disconnected"); - reactor.addEventListener("backup_node_disconnected", function (event) { + reactor.addEventListener("backup_node_disconnected", function (backupNodeID) { console.log("backup_node_disconnected"); //try to reconnect (usually disconnects due to timeout) - connectToBackupSupernode(floGlobals.backupNode.floID) + initateBackupWebsocket(backupNodeID) .then(result => console.log(result)) - .catch(error => console.log(error)) + .catch(error =>{ + console.log(error) + reactor.dispatchEvent("backup_node_offline",backupNodeID); + }) }) reactor.registerEvent("backup_node_online"); - reactor.addEventListener("backup_node_online", function (event) { + reactor.addEventListener("backup_node_online", function (backupNodeID) { console.log("backup_node_online"); //backup node is alive. //do nothing for now }) reactor.registerEvent("backup_node_offline"); - reactor.addEventListener("backup_node_offline", function (event) { + reactor.addEventListener("backup_node_offline", function (offlineNodeID) { console.log("backup_node_offline"); - //backup node is offline. - //get next available node - var offlineNodeID = floGlobals.backupNode.floID - floSupernode.kBucket.getNextSupernode(offlineNodeID).then(result => { - connectToBackupSupernode(result[0].floID).then(result => { - reactor.dispatchEvent("indicate_sn_down",offlineNodeID) - }) + //remove offline node and add the immediate next available node + var index = floGlobals.backupNodes.indexOf(offlineNodeID); + if (index !== -1) floGlobals.backupNodes.splice(index, 1); + var len = floGlobals.backupNodes.length + connectToAllBackupSupernode(floGlobals.backupNodes[len-1], len).then(result => { + console.log(result) + //inform the newly connected node to store backups of self + var sendData1 = { + from: myFloID, + backupMsg: { + type: "startBackupStore", + snfloID: myFloID, + time: Date.now() + }, + sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) + } + floGlobals.backupNodes[len].wsConn.send(JSON.stringify(sendData1)) + //inform the immediate next node of the dead to start serving it + if(index == 0){ + var sendData2 = { + from: myFloID, + backupMsg: { + type: "startBackupServe", + snfloID: offlineNodeID, + time: Date.now() + }, + sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) + } + floGlobals.backupNodes[0].wsConn.send(JSON.stringify(sendData2)) + } }) }) - reactor.registerEvent("indicate_sn_down"); - reactor.addEventListener("indicate_sn_down", function (snfloID) { - console.log("indicate_sn_down"); - //send message to backup's backup to server for backup node + reactor.registerEvent("send_message_to_backup_nodes"); + reactor.addEventListener("send_message_to_backup_nodes", function (backupMsg) { + console.log("send_message_to_backup_nodes"); + //send message to all connected backup nodes var sendData = { from: myFloID, - backupMsg: { - type: "supernodeDown", - snfloID: snfloID, - time: Date.now() - }, + backupMsg: backupMsg, sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) } - floGlobals.backupNode.wsConn.send(sendData) + for(var i = 0; i < floGlobals.backupNodes.length; i++) + floGlobals.backupNode[i].wsConn.send(sendData) }) reactor.registerEvent("indicate_sn_up"); reactor.addEventListener("indicate_sn_up", function (snfloID) { console.log("indicate_sn_up"); - //send message to backup's backup to server for backup node - var sendData = { - from: myFloID, - backupMsg: { - type: "supernodeUp", - snfloID: snfloID, - time: Date.now() - }, - sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) + //send message to backup's backup to server for backup node (*to be rectified*) + var backupMsg = { + type: "supernodeUp", + snfloID: snfloID, + time: Date.now() } - floGlobals.backupNode.wsConn.send(sendData) + reactor.dispatchEvent("send_message_to_backup_nodes", backupMsg) }) reactor.registerEvent("send_backup"); reactor.addEventListener("send_backup", function (data) { console.log("send_backup"); - var sendData = { - from: myFloID, - backupMsg : { - type: "backupData", - snfloID: data.snfloID, - key: data.k, - value: data.value - }, - sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) + //send backup data to backup nodes + var backupMsg = { + type: "backupData", + snfloID: data.snfloID, + key: data.k, + value: data.value } - floGlobals.backupNode.wsConn.send(sendData) + reactor.dispatchEvent("send_message_to_backup_nodes", backupMsg) }) reactor.registerEvent("send_stored_backup"); reactor.addEventListener("send_stored_backup", function (data) { console.log("send_stored_backup"); + //send stored backuped data to the requestor node try{ var requestorWS = new WebSocket("wss://" + floGlobals.supernodes[data.from].uri + "/ws") requestorWS.onopen = (evt) => { @@ -6263,7 +6297,7 @@ for(k in result){ var sendData = { from: myFloID, - backupMsg : { + backupMsg: { type: "backupData", snfloID: data.backupMsg.snfloID, key: k, @@ -6277,8 +6311,8 @@ }) } requestorWS.onmessage = (evt) => console.log(evt.data); - requestorWS.onclose = (evt) => console.log("Disconnected from "+data.from); - requestorWS.onerror = (evt) => console.log("Error connecting to "+data.from); + requestorWS.onclose = (evt) => console.log("Disconnected from " + data.from); + requestorWS.onerror = (evt) => console.log("Error connecting to " + data.from); }catch(error){ console.log(error.message) } @@ -6288,13 +6322,23 @@ reactor.addEventListener("backup_message_event", function (data) { console.log("backup_message_event"); if(floCrypto.verifySign(JSON.stringify(data.backupMsg), data.sign, floGlobals.supernodes[data.from].pubKey)){ - //Backup event messages + //Backup event messages (most crucial part) + if(data.backupMsg.type === "backupData" && floGlobals.backupStoredList.includes(data.backupMsg.snfloID)) reactor.dispatchEvent("store_backup_data", data.backupMsg) - else if(data.backupMsg.type === "supernodeDown") + + else if(data.backupMsg.type === "startBackupServe") reactor.dispatchEvent("start_backup_serve", data.backupMsg.snfloID) - else if(data.backupMsg.type === "supernodeUp") + + else if(data.backupMsg.type === "stopBackupServe") reactor.dispatchEvent("stop_backup_serve", data.backupMsg.snfloID) + + else if(data.backupMsg.type === "startBackupStore") + reactor.dispatchEvent("start_backup_store", data.backupMsg.snfloID) + + else if(data.backupMsg.type === "stopBackupStore") + reactor.dispatchEvent("stop_backup_store", data.backupMsg.snfloID) + else if(data.backupMsg.type === "dataRequest" && floGlobals.backupStoredList.includes(data.backupMsg.snfloID)) reactor.dispatchEvent("send_stored_backup", data) } @@ -6303,12 +6347,13 @@ reactor.registerEvent("request_data"); reactor.addEventListener("request_data", function (event) { console.log("request_data"); + //request the backup data try{ var holderWS = new WebSocket("wss://" + floGlobals.supernodes[event.holder].uri + "/ws") holderWS.onopen = (evt) => { var promises = [] for(var i=0; i < floGlobals.storageList.length; i++) - promises[i] = compactIDB.searchData(floGlobals.storageList[i],{lastOnly: true},`SN_${event.floID}`) + promises[i] = compactIDB.searchData(floGlobals.storageList[i], {lastOnly: true},`SN_${event.floID}`) Promise.all(promises).then(results => { var lowerKey = {} for(var i=0; i < results.length; i++) @@ -6335,21 +6380,64 @@ reactor.registerEvent("store_backup_data"); reactor.addEventListener("store_backup_data", function (data) { console.log("store_backup_data"); + //store received backup data compactIDB.addData(floGlobals.storageList.includes(data.value.application) ? data.value.application:floGlobals.defaultStorage , data.value ,data.key,`SN_${data.snfloID}`) }) reactor.registerEvent("start_backup_serve"); - reactor.addEventListener("start_backup_serve", function (floID) { - console.log("start_backup_serve :"+floID); - floGlobals.serveList.push(floID) + reactor.addEventListener("start_backup_serve", function (snfloID) { + console.log("start_backup_serve :"+snfloID); + //start serving the dead node + if(!floGlobals.serveList.includes(snfloID)) + floGlobals.serveList.push(snfloID) + //indicate the last backup node to store the dead's backup too + var sendData = { + from: myFloID, + backupMsg: { + type: "startBackupStore", + snfloID: snfloID, + time: Date.now() + }, + sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) + } + var lastIndex = floGlobals.backupNodes.length - 1 + floGlobals.backupNodes[lastIndex].wsConn.send(JSON.stringify(sendData)) }) reactor.registerEvent("stop_backup_serve"); - reactor.addEventListener("stop_backup_serve", function (floID) { - console.log("stop_backup_serve :"+floID); - floGlobals.serveList = floGlobals.serveList.filter(e => e !== floID) + reactor.addEventListener("stop_backup_serve", function (snfloID) { + console.log("stop_backup_serve :"+snfloID); + //stop serving the revived node + var index = floGlobals.serveList.indexOf(snfloID); + if (index !== -1) floGlobals.serveList.splice(index, 1); + //indicate the last backup node to stop storing the revived's backup + var sendData = { + from: myFloID, + backupMsg: { + type: "stopBackupStore", + snfloID: snfloID, + time: Date.now() + }, + sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) + } + var lastIndex = floGlobals.backupNodes.length - 1 + floGlobals.backupNodes[lastIndex].wsConn.send(JSON.stringify(sendData)) }) + reactor.registerEvent("start_backup_store"); + reactor.addEventListener("start_backup_store", function (floID) { + console.log("start_backup_store :"+floID); + if(!floGlobals.backupStoredList.includes(floID)) + floGlobals.backupStoredList.push(floID) + + }) + + reactor.registerEvent("stop_backup_store"); + reactor.addEventListener("stop_backup_store", function (floID) { + console.log("stop_backup_store :"+floID); + var index = floGlobals.backupStoredList.indexOf(floID); + if (index !== -1) floGlobals.backupStoredList.splice(index, 1); + })