Flow when a node revives

Added the indications and processes when a noded comes back online:
Connect to the backup nodes
Inform the backup nodes that node is revived
Serving node transfers the serve list to revived node
Storing nodes informs their farest stored backup to connect to revived node
This commit is contained in:
sairajzero 2019-11-14 02:12:16 +05:30
parent d7be0d414f
commit b56f90d292

View File

@ -5406,7 +5406,7 @@
requestor = request[0];
request = JSON.parse(request[1]);
floSupernode.kBucket.determineClosestSupernode(data.receiverID).then(result => {
if(floGlobals.serveList.includes(result[0].floID)){
if((myFloID === result[0].floID || floGlobals.serveList.includes(result[0].floID))){
var filterOptions = {
lowerKey: request.lowerVectorClock,
upperKey: request.upperVectorClock,
@ -5432,7 +5432,7 @@
data = JSON.parse(data)
if(!data.backupMsg){ //Serving Users
floSupernode.kBucket.determineClosestSupernode(data.receiverID).then(result => {
if (floGlobals.serveList.includes(result[0].floID)
if ((myFloID === result[0].floID || floGlobals.serveList.includes(result[0].floID))
&& data.senderID == floCrypto.getFloIDfromPubkeyHex(data.pubKey)
&& floCrypto.verifySign(JSON.stringify(data.message), data.sign, data.pubKey)){
var key = `${Date.now()}_${data.senderID}`
@ -6046,8 +6046,7 @@
console.log(result)
connectToAllBackupSupernode().then(result => {
console.log(result)
reactor.dispatchEvent("indicate_sn_up",myFloID)
reactor.dispatchEvent("request_data",{floid:myFloID, holder:floGlobals.backupNode.floID})
reactor.dispatchEvent("supernode_back_online",myFloID)
})
})
})
@ -6260,16 +6259,39 @@
floGlobals.backupNode[i].wsConn.send(sendData)
})
reactor.registerEvent("indicate_sn_up");
reactor.addEventListener("indicate_sn_up", function (snfloID) {
console.log("indicate_sn_up");
//send message to backup's backup to server for backup node (*to be rectified*)
var backupMsg = {
type: "supernodeUp",
snfloID: snfloID,
time: Date.now()
reactor.registerEvent("send_message_to_node");
reactor.addEventListener("send_message_to_node", function (event) {
console.log("send_message_to_node");
//send message to a supernode
try{
var nodeWS = new WebSocket("wss://" + floGlobals.supernodes[event.snfloID].uri + "/ws")
nodeWS.onopen = (evt) => {
if(event.backupMsg){
var sendData = {
from: myFloID,
backupMsg: event.backupMsg,
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
}
nodeWS.send(JSON.stringify(this.sendData))
}
if(event.bulkBackupMsg){
for(var i=0; i < event.bulkBackupMsg.length; i++){
var sendData = {
from: myFloID,
backupMsg: event.bulkBackupMsg[i],
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
}
nodeWS.send(JSON.stringify(this.sendData))
}
}
}
nodeWS.onmessage = (evt) => console.log(evt.data);
nodeWS.onclose = (evt) => console.log("Disconnected from " + event.holder);
nodeWS.onerror = (evt) => console.log("Error connecting to " + event.holder);
}catch(error){
console.log(error.message)
}
reactor.dispatchEvent("send_message_to_backup_nodes", backupMsg)
})
reactor.registerEvent("send_backup");
@ -6289,32 +6311,34 @@
reactor.addEventListener("send_stored_backup", function (data) {
console.log("send_stored_backup");
//send stored backuped data to the requestor node
try{
var requestorWS = new WebSocket("wss://" + floGlobals.supernodes[data.from].uri + "/ws")
requestorWS.onopen = (evt) => {
floGlobals.storageList.forEach(obs => {
compactIDB.searchData(obs, {lowerKey: data.backupMsg.lowerKey[obs]}, `SN_${data.backupMsg.snfloID}`).then(result => {
for(k in result){
var sendData = {
from: myFloID,
backupMsg: {
type: "backupData",
snfloID: data.backupMsg.snfloID,
key: k,
value: result[k]
},
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
if(data.backupMsg.snfloID === myFloID || floGlobals.backupStoredList.includes(data.backupMsg.snfloID)){
try{
var requestorWS = new WebSocket("wss://" + floGlobals.supernodes[data.from].uri + "/ws")
requestorWS.onopen = (evt) => {
floGlobals.storageList.forEach(obs => {
compactIDB.searchData(obs, {lowerKey: data.backupMsg.lowerKey[obs]}, `SN_${data.backupMsg.snfloID}`).then(result => {
for(k in result){
var sendData = {
from: myFloID,
backupMsg: {
type: "backupData",
snfloID: data.backupMsg.snfloID,
key: k,
value: result[k]
},
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
}
requestorWS.send(JSON.stringify(sendData))
}
requestorWS.send(JSON.stringify(sendData))
}
}).catch(error => console.log(error))
})
}).catch(error => console.log(error))
})
}
requestorWS.onmessage = (evt) => console.log(evt.data);
requestorWS.onclose = (evt) => console.log("Disconnected from " + data.from);
requestorWS.onerror = (evt) => console.log("Error connecting to " + data.from);
}catch(error){
console.log(error.message)
}
requestorWS.onmessage = (evt) => console.log(evt.data);
requestorWS.onclose = (evt) => console.log("Disconnected from " + data.from);
requestorWS.onerror = (evt) => console.log("Error connecting to " + data.from);
}catch(error){
console.log(error.message)
}
})
@ -6324,23 +6348,31 @@
if(floCrypto.verifySign(JSON.stringify(data.backupMsg), data.sign, floGlobals.supernodes[data.from].pubKey)){
//Backup event messages (most crucial part)
if(data.backupMsg.type === "backupData" && floGlobals.backupStoredList.includes(data.backupMsg.snfloID))
reactor.dispatchEvent("store_backup_data", data.backupMsg)
else if(data.backupMsg.type === "startBackupServe")
reactor.dispatchEvent("start_backup_serve", data.backupMsg.snfloID)
else if(data.backupMsg.type === "stopBackupServe")
reactor.dispatchEvent("stop_backup_serve", data.backupMsg.snfloID)
else if(data.backupMsg.type === "startBackupStore")
reactor.dispatchEvent("start_backup_store", data.backupMsg.snfloID)
else if(data.backupMsg.type === "stopBackupStore")
reactor.dispatchEvent("stop_backup_store", data.backupMsg.snfloID)
else if(data.backupMsg.type === "dataRequest" && floGlobals.backupStoredList.includes(data.backupMsg.snfloID))
reactor.dispatchEvent("send_stored_backup", data)
switch(data.backupMsg.type){
case "backupData":
reactor.dispatchEvent("store_backup_data", data.backupMsg)
break;
case "supernodeUp":
reactor.dispatchEvent("supernode_back_online", data.backupMsg.snfloID)
break;
case "startBackupServe":
reactor.dispatchEvent("start_backup_serve", data.backupMsg.snfloID)
break;
case "stopBackupServe":
reactor.dispatchEvent("stop_backup_serve", data.backupMsg.snfloID)
break;
case "startBackupStore":
reactor.dispatchEvent("start_backup_store", data.backupMsg.snfloID)
break;
case "stopBackupStore":
reactor.dispatchEvent("stop_backup_store", data.backupMsg.snfloID)
break;
case "dataRequest":
reactor.dispatchEvent("send_stored_backup", data)
break;
default:
console.log(data.backupMsg)
}
}
})
@ -6348,42 +6380,103 @@
reactor.addEventListener("request_data", function (event) {
console.log("request_data");
//request the backup data
try{
var holderWS = new WebSocket("wss://" + floGlobals.supernodes[event.holder].uri + "/ws")
holderWS.onopen = (evt) => {
var promises = []
for(var i=0; i < floGlobals.storageList.length; i++)
promises[i] = compactIDB.searchData(floGlobals.storageList[i], {lastOnly: true},`SN_${event.floID}`)
Promise.all(promises).then(results => {
var lowerKey = {}
for(var i=0; i < results.length; i++)
for(key in results[i])
lowerKey[floGlobals.storageList[i]] = key
var sendData = {
from: myFloID,
backupMsg = {
snfloID: event.floID,
lowerKey: lowerKey
},
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
}
})
var promises = []
for(var i=0; i < floGlobals.storageList.length; i++)
promises[i] = compactIDB.searchData(floGlobals.storageList[i], {lastOnly: true},`SN_${event.floID}`)
Promise.all(promises).then(results => {
var lowerKey = {}
for(var i=0; i < results.length; i++)
for(key in results[i])
lowerKey[floGlobals.storageList[i]] = key
var backupMsg = {
type: "dataRequest",
snfloID: event.floID,
lowerKey: lowerKey,
time: Date.now()
}
holderWS.onmessage = (evt) => console.log(evt.data);
holderWS.onclose = (evt) => console.log("Disconnected from " + event.holder);
holderWS.onerror = (evt) => console.log("Error connecting to " + event.holder);
}catch(error){
console.log(error.message)
}
reactor.dispatchEvent("send_message_to_node", {snfloID:event.holder, backupMsg:backupMsg})
})
})
reactor.registerEvent("store_backup_data");
reactor.addEventListener("store_backup_data", function (data) {
console.log("store_backup_data");
//store received backup data
compactIDB.addData(floGlobals.storageList.includes(data.value.application) ? data.value.application:floGlobals.defaultStorage , data.value ,data.key,`SN_${data.snfloID}`)
if(data.backupMsg.snfloID === myFloID || floGlobals.backupStoredList.includes(data.backupMsg.snfloID)){
compactIDB.addData(
floGlobals.storageList.includes(data.value.application) ? data.value.application:floGlobals.defaultStorage,
data.value, data.key, `SN_${data.snfloID}`
)
}
})
reactor.registerEvent("indicate_supernode_up");
reactor.addEventListener("indicate_supernode_up", function (snfloID) {
console.log("indicate_supernode_up");
//send message to backup's backup to server for backup node (*to be rectified*)
var backupMsg = {
type: "supernodeUp",
snfloID: snfloID,
time: Date.now()
}
reactor.dispatchEvent("send_message_to_backup_nodes", backupMsg)
})
reactor.registerEvent("supernode_back_online");
reactor.addEventListener("supernode_back_online", function (snfloID) {
console.log("supernode_back_online");
if(floGlobals.serveList.includes(snfloID)){
//inform the revived node to serve the other dead nodes
var kBucketArray = floSupernode.kBucket.supernodeKBucket.toArray
for(var i=0; i < floGlobals.serveList.length; i++)
if(kBucketArray.indexOf(floGlobals.serveList[i]) <= kBucketArray.indexOf(snfloID)){
var backupMsg = {
type: "startBackupServe",
snfloID: floGlobals.serveList[i],
time: Date.now()
}
reactor.dispatchEvent("stop_backup_serve", floGlobals.serveList[i])
reactor.dispatchEvent("send_message_to_node", {snfloID:snfloID, backupMsg:backupMsg})
i--; //reduce iterator as an element is removed
}
}
if(floGlobals.backupStoredList.includes(snfloID)){
var lastBackup = floGlobals.backupStoredList.pop()
//inform the revived node to store the backup
var backupMsg1 = {
type: "startBackupStore",
snfloID:lastBackup,
time: Date.now()
}
reactor.dispatchEvent("send_message_to_node", {snfloID:snfloID, backupMsg:backupMsg1})
//inform the backup node that a node is revived
var backupMsg2 = {
type: "supernodeUp",
snfloID:snfloID,
time: Date.now()
}
reactor.dispatchEvent("send_message_to_node", {snfloID:lastBackup, backupMsg:backupMsg2})
} else {
//connect to the revived node as backup if needed
var kBucketArray = floSupernode.kBucket.supernodeKBucket.toArray
var index = false
for(var i = 0; i < floGlobals.backupNodes.length; i++){
if(kBucketArray.indexOf(floGlobals.backupNodes[i].floID) == kBucketArray.indexOf(snfloID)) //revived node is already connected
break;
else if(kBucketArray.indexOf(floGlobals.backupNodes[i].floID) > kBucketArray.indexOf(snfloID)){
index = i;
break;
}
}
if(index){
initateBackupWebsocket(snfloID).then(result => {
floGlobals.backupNodes.splice(index,0,result) // add revived node as backup node
floGlobals.backupNodes.pop() // remove the last extra backup node
}).catch(error => console.log(error))
}
}
})
reactor.registerEvent("start_backup_serve");
reactor.addEventListener("start_backup_serve", function (snfloID) {
console.log("start_backup_serve :"+snfloID);