From b56f90d29264d5227cfa23be03bcf70fbdf7d118 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Thu, 14 Nov 2019 02:12:16 +0530 Subject: [PATCH] Flow when a node revives Added the indications and processes when a noded comes back online: Connect to the backup nodes Inform the backup nodes that node is revived Serving node transfers the serve list to revived node Storing nodes informs their farest stored backup to connect to revived node --- index.html | 255 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 174 insertions(+), 81 deletions(-) diff --git a/index.html b/index.html index 3f9eb1c..d72dddb 100644 --- a/index.html +++ b/index.html @@ -5406,7 +5406,7 @@ requestor = request[0]; request = JSON.parse(request[1]); floSupernode.kBucket.determineClosestSupernode(data.receiverID).then(result => { - if(floGlobals.serveList.includes(result[0].floID)){ + if((myFloID === result[0].floID || floGlobals.serveList.includes(result[0].floID))){ var filterOptions = { lowerKey: request.lowerVectorClock, upperKey: request.upperVectorClock, @@ -5432,7 +5432,7 @@ data = JSON.parse(data) if(!data.backupMsg){ //Serving Users floSupernode.kBucket.determineClosestSupernode(data.receiverID).then(result => { - if (floGlobals.serveList.includes(result[0].floID) + if ((myFloID === result[0].floID || floGlobals.serveList.includes(result[0].floID)) && data.senderID == floCrypto.getFloIDfromPubkeyHex(data.pubKey) && floCrypto.verifySign(JSON.stringify(data.message), data.sign, data.pubKey)){ var key = `${Date.now()}_${data.senderID}` @@ -6046,8 +6046,7 @@ console.log(result) connectToAllBackupSupernode().then(result => { console.log(result) - reactor.dispatchEvent("indicate_sn_up",myFloID) - reactor.dispatchEvent("request_data",{floid:myFloID, holder:floGlobals.backupNode.floID}) + reactor.dispatchEvent("supernode_back_online",myFloID) }) }) }) @@ -6260,16 +6259,39 @@ floGlobals.backupNode[i].wsConn.send(sendData) }) - reactor.registerEvent("indicate_sn_up"); - reactor.addEventListener("indicate_sn_up", function (snfloID) { - console.log("indicate_sn_up"); - //send message to backup's backup to server for backup node (*to be rectified*) - var backupMsg = { - type: "supernodeUp", - snfloID: snfloID, - time: Date.now() + reactor.registerEvent("send_message_to_node"); + reactor.addEventListener("send_message_to_node", function (event) { + console.log("send_message_to_node"); + //send message to a supernode + try{ + var nodeWS = new WebSocket("wss://" + floGlobals.supernodes[event.snfloID].uri + "/ws") + nodeWS.onopen = (evt) => { + if(event.backupMsg){ + var sendData = { + from: myFloID, + backupMsg: event.backupMsg, + sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) + } + nodeWS.send(JSON.stringify(this.sendData)) + } + if(event.bulkBackupMsg){ + for(var i=0; i < event.bulkBackupMsg.length; i++){ + var sendData = { + from: myFloID, + backupMsg: event.bulkBackupMsg[i], + sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) + } + nodeWS.send(JSON.stringify(this.sendData)) + } + } + + } + nodeWS.onmessage = (evt) => console.log(evt.data); + nodeWS.onclose = (evt) => console.log("Disconnected from " + event.holder); + nodeWS.onerror = (evt) => console.log("Error connecting to " + event.holder); + }catch(error){ + console.log(error.message) } - reactor.dispatchEvent("send_message_to_backup_nodes", backupMsg) }) reactor.registerEvent("send_backup"); @@ -6289,32 +6311,34 @@ reactor.addEventListener("send_stored_backup", function (data) { console.log("send_stored_backup"); //send stored backuped data to the requestor node - try{ - var requestorWS = new WebSocket("wss://" + floGlobals.supernodes[data.from].uri + "/ws") - requestorWS.onopen = (evt) => { - floGlobals.storageList.forEach(obs => { - compactIDB.searchData(obs, {lowerKey: data.backupMsg.lowerKey[obs]}, `SN_${data.backupMsg.snfloID}`).then(result => { - for(k in result){ - var sendData = { - from: myFloID, - backupMsg: { - type: "backupData", - snfloID: data.backupMsg.snfloID, - key: k, - value: result[k] - }, - sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) + if(data.backupMsg.snfloID === myFloID || floGlobals.backupStoredList.includes(data.backupMsg.snfloID)){ + try{ + var requestorWS = new WebSocket("wss://" + floGlobals.supernodes[data.from].uri + "/ws") + requestorWS.onopen = (evt) => { + floGlobals.storageList.forEach(obs => { + compactIDB.searchData(obs, {lowerKey: data.backupMsg.lowerKey[obs]}, `SN_${data.backupMsg.snfloID}`).then(result => { + for(k in result){ + var sendData = { + from: myFloID, + backupMsg: { + type: "backupData", + snfloID: data.backupMsg.snfloID, + key: k, + value: result[k] + }, + sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) + } + requestorWS.send(JSON.stringify(sendData)) } - requestorWS.send(JSON.stringify(sendData)) - } - }).catch(error => console.log(error)) - }) + }).catch(error => console.log(error)) + }) + } + requestorWS.onmessage = (evt) => console.log(evt.data); + requestorWS.onclose = (evt) => console.log("Disconnected from " + data.from); + requestorWS.onerror = (evt) => console.log("Error connecting to " + data.from); + }catch(error){ + console.log(error.message) } - requestorWS.onmessage = (evt) => console.log(evt.data); - requestorWS.onclose = (evt) => console.log("Disconnected from " + data.from); - requestorWS.onerror = (evt) => console.log("Error connecting to " + data.from); - }catch(error){ - console.log(error.message) } }) @@ -6324,23 +6348,31 @@ if(floCrypto.verifySign(JSON.stringify(data.backupMsg), data.sign, floGlobals.supernodes[data.from].pubKey)){ //Backup event messages (most crucial part) - if(data.backupMsg.type === "backupData" && floGlobals.backupStoredList.includes(data.backupMsg.snfloID)) - reactor.dispatchEvent("store_backup_data", data.backupMsg) - - else if(data.backupMsg.type === "startBackupServe") - reactor.dispatchEvent("start_backup_serve", data.backupMsg.snfloID) - - else if(data.backupMsg.type === "stopBackupServe") - reactor.dispatchEvent("stop_backup_serve", data.backupMsg.snfloID) - - else if(data.backupMsg.type === "startBackupStore") - reactor.dispatchEvent("start_backup_store", data.backupMsg.snfloID) - - else if(data.backupMsg.type === "stopBackupStore") - reactor.dispatchEvent("stop_backup_store", data.backupMsg.snfloID) - - else if(data.backupMsg.type === "dataRequest" && floGlobals.backupStoredList.includes(data.backupMsg.snfloID)) - reactor.dispatchEvent("send_stored_backup", data) + switch(data.backupMsg.type){ + case "backupData": + reactor.dispatchEvent("store_backup_data", data.backupMsg) + break; + case "supernodeUp": + reactor.dispatchEvent("supernode_back_online", data.backupMsg.snfloID) + break; + case "startBackupServe": + reactor.dispatchEvent("start_backup_serve", data.backupMsg.snfloID) + break; + case "stopBackupServe": + reactor.dispatchEvent("stop_backup_serve", data.backupMsg.snfloID) + break; + case "startBackupStore": + reactor.dispatchEvent("start_backup_store", data.backupMsg.snfloID) + break; + case "stopBackupStore": + reactor.dispatchEvent("stop_backup_store", data.backupMsg.snfloID) + break; + case "dataRequest": + reactor.dispatchEvent("send_stored_backup", data) + break; + default: + console.log(data.backupMsg) + } } }) @@ -6348,42 +6380,103 @@ reactor.addEventListener("request_data", function (event) { console.log("request_data"); //request the backup data - try{ - var holderWS = new WebSocket("wss://" + floGlobals.supernodes[event.holder].uri + "/ws") - holderWS.onopen = (evt) => { - var promises = [] - for(var i=0; i < floGlobals.storageList.length; i++) - promises[i] = compactIDB.searchData(floGlobals.storageList[i], {lastOnly: true},`SN_${event.floID}`) - Promise.all(promises).then(results => { - var lowerKey = {} - for(var i=0; i < results.length; i++) - for(key in results[i]) - lowerKey[floGlobals.storageList[i]] = key - var sendData = { - from: myFloID, - backupMsg = { - snfloID: event.floID, - lowerKey: lowerKey - }, - sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey) - } - }) + var promises = [] + for(var i=0; i < floGlobals.storageList.length; i++) + promises[i] = compactIDB.searchData(floGlobals.storageList[i], {lastOnly: true},`SN_${event.floID}`) + Promise.all(promises).then(results => { + var lowerKey = {} + for(var i=0; i < results.length; i++) + for(key in results[i]) + lowerKey[floGlobals.storageList[i]] = key + var backupMsg = { + type: "dataRequest", + snfloID: event.floID, + lowerKey: lowerKey, + time: Date.now() } - holderWS.onmessage = (evt) => console.log(evt.data); - holderWS.onclose = (evt) => console.log("Disconnected from " + event.holder); - holderWS.onerror = (evt) => console.log("Error connecting to " + event.holder); - }catch(error){ - console.log(error.message) - } + reactor.dispatchEvent("send_message_to_node", {snfloID:event.holder, backupMsg:backupMsg}) + }) }) reactor.registerEvent("store_backup_data"); reactor.addEventListener("store_backup_data", function (data) { console.log("store_backup_data"); //store received backup data - compactIDB.addData(floGlobals.storageList.includes(data.value.application) ? data.value.application:floGlobals.defaultStorage , data.value ,data.key,`SN_${data.snfloID}`) + if(data.backupMsg.snfloID === myFloID || floGlobals.backupStoredList.includes(data.backupMsg.snfloID)){ + compactIDB.addData( + floGlobals.storageList.includes(data.value.application) ? data.value.application:floGlobals.defaultStorage, + data.value, data.key, `SN_${data.snfloID}` + ) + } }) + reactor.registerEvent("indicate_supernode_up"); + reactor.addEventListener("indicate_supernode_up", function (snfloID) { + console.log("indicate_supernode_up"); + //send message to backup's backup to server for backup node (*to be rectified*) + var backupMsg = { + type: "supernodeUp", + snfloID: snfloID, + time: Date.now() + } + reactor.dispatchEvent("send_message_to_backup_nodes", backupMsg) + }) + + reactor.registerEvent("supernode_back_online"); + reactor.addEventListener("supernode_back_online", function (snfloID) { + console.log("supernode_back_online"); + if(floGlobals.serveList.includes(snfloID)){ + //inform the revived node to serve the other dead nodes + var kBucketArray = floSupernode.kBucket.supernodeKBucket.toArray + for(var i=0; i < floGlobals.serveList.length; i++) + if(kBucketArray.indexOf(floGlobals.serveList[i]) <= kBucketArray.indexOf(snfloID)){ + var backupMsg = { + type: "startBackupServe", + snfloID: floGlobals.serveList[i], + time: Date.now() + } + reactor.dispatchEvent("stop_backup_serve", floGlobals.serveList[i]) + reactor.dispatchEvent("send_message_to_node", {snfloID:snfloID, backupMsg:backupMsg}) + i--; //reduce iterator as an element is removed + } + } + if(floGlobals.backupStoredList.includes(snfloID)){ + var lastBackup = floGlobals.backupStoredList.pop() + //inform the revived node to store the backup + var backupMsg1 = { + type: "startBackupStore", + snfloID:lastBackup, + time: Date.now() + } + reactor.dispatchEvent("send_message_to_node", {snfloID:snfloID, backupMsg:backupMsg1}) + //inform the backup node that a node is revived + var backupMsg2 = { + type: "supernodeUp", + snfloID:snfloID, + time: Date.now() + } + reactor.dispatchEvent("send_message_to_node", {snfloID:lastBackup, backupMsg:backupMsg2}) + } else { + //connect to the revived node as backup if needed + var kBucketArray = floSupernode.kBucket.supernodeKBucket.toArray + var index = false + for(var i = 0; i < floGlobals.backupNodes.length; i++){ + if(kBucketArray.indexOf(floGlobals.backupNodes[i].floID) == kBucketArray.indexOf(snfloID)) //revived node is already connected + break; + else if(kBucketArray.indexOf(floGlobals.backupNodes[i].floID) > kBucketArray.indexOf(snfloID)){ + index = i; + break; + } + } + if(index){ + initateBackupWebsocket(snfloID).then(result => { + floGlobals.backupNodes.splice(index,0,result) // add revived node as backup node + floGlobals.backupNodes.pop() // remove the last extra backup node + }).catch(error => console.log(error)) + } + } + }) + reactor.registerEvent("start_backup_serve"); reactor.addEventListener("start_backup_serve", function (snfloID) { console.log("start_backup_serve :"+snfloID);