Flow when a node dies

Added the indications and processes when a noded goes offline:
The previous nodes that the dead was serving ll connect to a next available node
The immediate prev node informs the immediate next node to start serving the dead
This commit is contained in:
sairajzero 2019-11-12 21:15:14 +05:30
parent f7a14aa304
commit d7be0d414f

View File

@ -35,7 +35,8 @@
defaultStorage : "General",
serveList : [],
backupStoredList : [],
supernodeConfig : {}
supernodeConfig : {},
backupNodes : []
}
</script>
@ -6043,12 +6044,10 @@
setInterval(refreshBlockchainData, 3600000);
floSupernode.initSupernode(serverPwd, myFloID).then(result => {
console.log(result)
floSupernode.kBucket.getNextSupernode(myFloID).then(result => {
connectToBackupSupernode(result[0].floID).then(result => {
console.log(result)
reactor.dispatchEvent("indicate_sn_up",myFloID)
reactor.dispatchEvent("request_data",{floid:myFloID, holder:floGlobals.backupNode.floID})
})
connectToAllBackupSupernode().then(result => {
console.log(result)
reactor.dispatchEvent("indicate_sn_up",myFloID)
reactor.dispatchEvent("request_data",{floid:myFloID, holder:floGlobals.backupNode.floID})
})
})
})
@ -6127,6 +6126,24 @@
})
}
function connectToAllBackupSupernode(curNode = myFloID, i = 0){
return new Promise((resolve,reject) => {
if(i >= floGlobals.supernodeConfig.backupDepth)
resolve("Connected to all backup nodes")
else{
floSupernode.kBucket.getNextSupernode(curNode).then(nextBackupNode => {
connectToBackupSupernode(nextBackupNode).then(backupConn => {
floGlobals.backupNodes[i] = backupConn
connectToAllBackupSupernode(backupConn.floID, i+1)
.then(result => resolve(result))
.catch(error => reject(error))
})
}).catch(error => console.log(error))
}
})
}
function connectToBackupSupernode(backupNodeID){
return new Promise((resolve,reject) => {
initateBackupWebsocket(backupNodeID)
@ -6143,16 +6160,16 @@
function initateBackupWebsocket(backupNodeID){
return new Promise((resolve,reject) => {
try{
floGlobals.backupNode = {
var backupNode = {
floID: backupNodeID,
wsConn: new WebSocket("wss://" + floGlobals.supernodes[backupNodeID].uri + "/ws")
}
floGlobals.backupNode.wsConn.onopen = (evt) => {
floGlobals.backupNode.wsConn.onmessage = (ev) => reactor.dispatchEvent("backup_node_message",ev);
floGlobals.backupNode.wsConn.onclose = (ev) => reactor.dispatchEvent("backup_node_disconnected",ev);
resolve(`connnected to ${backupNodeID} ws`);
backupNode.wsConn.onopen = (evt) => {
backupNode.wsConn.onmessage = (ev) => reactor.dispatchEvent("backup_node_message",{floID:backupNodeID,data:ev.data});
backupNode.wsConn.onclose = (ev) => reactor.dispatchEvent("backup_node_disconnected",backupNodeID);
resolve(backupNode);
}
floGlobals.backupNode.wsConn.onerror = (evt) => reject(`${backupNodeID} ws not found`);
backupNode.wsConn.onerror = (evt) => reject(`${backupNodeID} ws not found`);
}catch(error){
reject(error.message)
}
@ -6168,93 +6185,110 @@
console.log("backup_node_message");
//received message from backup node
if(event.data == "$+")
reactor.dispatchEvent("backup_node_online",null);
reactor.dispatchEvent("backup_node_online",event.floID);
else if (event.data == "$-")
reactor.dispatchEvent("backup_node_offline",null);
reactor.dispatchEvent("backup_node_offline",event.floID);
else
console.log(event.data)
})
reactor.registerEvent("backup_node_disconnected");
reactor.addEventListener("backup_node_disconnected", function (event) {
reactor.addEventListener("backup_node_disconnected", function (backupNodeID) {
console.log("backup_node_disconnected");
//try to reconnect (usually disconnects due to timeout)
connectToBackupSupernode(floGlobals.backupNode.floID)
initateBackupWebsocket(backupNodeID)
.then(result => console.log(result))
.catch(error => console.log(error))
.catch(error =>{
console.log(error)
reactor.dispatchEvent("backup_node_offline",backupNodeID);
})
})
reactor.registerEvent("backup_node_online");
reactor.addEventListener("backup_node_online", function (event) {
reactor.addEventListener("backup_node_online", function (backupNodeID) {
console.log("backup_node_online");
//backup node is alive.
//do nothing for now
})
reactor.registerEvent("backup_node_offline");
reactor.addEventListener("backup_node_offline", function (event) {
reactor.addEventListener("backup_node_offline", function (offlineNodeID) {
console.log("backup_node_offline");
//backup node is offline.
//get next available node
var offlineNodeID = floGlobals.backupNode.floID
floSupernode.kBucket.getNextSupernode(offlineNodeID).then(result => {
connectToBackupSupernode(result[0].floID).then(result => {
reactor.dispatchEvent("indicate_sn_down",offlineNodeID)
})
//remove offline node and add the immediate next available node
var index = floGlobals.backupNodes.indexOf(offlineNodeID);
if (index !== -1) floGlobals.backupNodes.splice(index, 1);
var len = floGlobals.backupNodes.length
connectToAllBackupSupernode(floGlobals.backupNodes[len-1], len).then(result => {
console.log(result)
//inform the newly connected node to store backups of self
var sendData1 = {
from: myFloID,
backupMsg: {
type: "startBackupStore",
snfloID: myFloID,
time: Date.now()
},
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
}
floGlobals.backupNodes[len].wsConn.send(JSON.stringify(sendData1))
//inform the immediate next node of the dead to start serving it
if(index == 0){
var sendData2 = {
from: myFloID,
backupMsg: {
type: "startBackupServe",
snfloID: offlineNodeID,
time: Date.now()
},
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
}
floGlobals.backupNodes[0].wsConn.send(JSON.stringify(sendData2))
}
})
})
reactor.registerEvent("indicate_sn_down");
reactor.addEventListener("indicate_sn_down", function (snfloID) {
console.log("indicate_sn_down");
//send message to backup's backup to server for backup node
reactor.registerEvent("send_message_to_backup_nodes");
reactor.addEventListener("send_message_to_backup_nodes", function (backupMsg) {
console.log("send_message_to_backup_nodes");
//send message to all connected backup nodes
var sendData = {
from: myFloID,
backupMsg: {
type: "supernodeDown",
snfloID: snfloID,
time: Date.now()
},
backupMsg: backupMsg,
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
}
floGlobals.backupNode.wsConn.send(sendData)
for(var i = 0; i < floGlobals.backupNodes.length; i++)
floGlobals.backupNode[i].wsConn.send(sendData)
})
reactor.registerEvent("indicate_sn_up");
reactor.addEventListener("indicate_sn_up", function (snfloID) {
console.log("indicate_sn_up");
//send message to backup's backup to server for backup node
var sendData = {
from: myFloID,
backupMsg: {
type: "supernodeUp",
snfloID: snfloID,
time: Date.now()
},
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
//send message to backup's backup to server for backup node (*to be rectified*)
var backupMsg = {
type: "supernodeUp",
snfloID: snfloID,
time: Date.now()
}
floGlobals.backupNode.wsConn.send(sendData)
reactor.dispatchEvent("send_message_to_backup_nodes", backupMsg)
})
reactor.registerEvent("send_backup");
reactor.addEventListener("send_backup", function (data) {
console.log("send_backup");
var sendData = {
from: myFloID,
backupMsg : {
type: "backupData",
snfloID: data.snfloID,
key: data.k,
value: data.value
},
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
//send backup data to backup nodes
var backupMsg = {
type: "backupData",
snfloID: data.snfloID,
key: data.k,
value: data.value
}
floGlobals.backupNode.wsConn.send(sendData)
reactor.dispatchEvent("send_message_to_backup_nodes", backupMsg)
})
reactor.registerEvent("send_stored_backup");
reactor.addEventListener("send_stored_backup", function (data) {
console.log("send_stored_backup");
//send stored backuped data to the requestor node
try{
var requestorWS = new WebSocket("wss://" + floGlobals.supernodes[data.from].uri + "/ws")
requestorWS.onopen = (evt) => {
@ -6263,7 +6297,7 @@
for(k in result){
var sendData = {
from: myFloID,
backupMsg : {
backupMsg: {
type: "backupData",
snfloID: data.backupMsg.snfloID,
key: k,
@ -6277,8 +6311,8 @@
})
}
requestorWS.onmessage = (evt) => console.log(evt.data);
requestorWS.onclose = (evt) => console.log("Disconnected from "+data.from);
requestorWS.onerror = (evt) => console.log("Error connecting to "+data.from);
requestorWS.onclose = (evt) => console.log("Disconnected from " + data.from);
requestorWS.onerror = (evt) => console.log("Error connecting to " + data.from);
}catch(error){
console.log(error.message)
}
@ -6288,13 +6322,23 @@
reactor.addEventListener("backup_message_event", function (data) {
console.log("backup_message_event");
if(floCrypto.verifySign(JSON.stringify(data.backupMsg), data.sign, floGlobals.supernodes[data.from].pubKey)){
//Backup event messages
//Backup event messages (most crucial part)
if(data.backupMsg.type === "backupData" && floGlobals.backupStoredList.includes(data.backupMsg.snfloID))
reactor.dispatchEvent("store_backup_data", data.backupMsg)
else if(data.backupMsg.type === "supernodeDown")
else if(data.backupMsg.type === "startBackupServe")
reactor.dispatchEvent("start_backup_serve", data.backupMsg.snfloID)
else if(data.backupMsg.type === "supernodeUp")
else if(data.backupMsg.type === "stopBackupServe")
reactor.dispatchEvent("stop_backup_serve", data.backupMsg.snfloID)
else if(data.backupMsg.type === "startBackupStore")
reactor.dispatchEvent("start_backup_store", data.backupMsg.snfloID)
else if(data.backupMsg.type === "stopBackupStore")
reactor.dispatchEvent("stop_backup_store", data.backupMsg.snfloID)
else if(data.backupMsg.type === "dataRequest" && floGlobals.backupStoredList.includes(data.backupMsg.snfloID))
reactor.dispatchEvent("send_stored_backup", data)
}
@ -6303,12 +6347,13 @@
reactor.registerEvent("request_data");
reactor.addEventListener("request_data", function (event) {
console.log("request_data");
//request the backup data
try{
var holderWS = new WebSocket("wss://" + floGlobals.supernodes[event.holder].uri + "/ws")
holderWS.onopen = (evt) => {
var promises = []
for(var i=0; i < floGlobals.storageList.length; i++)
promises[i] = compactIDB.searchData(floGlobals.storageList[i],{lastOnly: true},`SN_${event.floID}`)
promises[i] = compactIDB.searchData(floGlobals.storageList[i], {lastOnly: true},`SN_${event.floID}`)
Promise.all(promises).then(results => {
var lowerKey = {}
for(var i=0; i < results.length; i++)
@ -6335,21 +6380,64 @@
reactor.registerEvent("store_backup_data");
reactor.addEventListener("store_backup_data", function (data) {
console.log("store_backup_data");
//store received backup data
compactIDB.addData(floGlobals.storageList.includes(data.value.application) ? data.value.application:floGlobals.defaultStorage , data.value ,data.key,`SN_${data.snfloID}`)
})
reactor.registerEvent("start_backup_serve");
reactor.addEventListener("start_backup_serve", function (floID) {
console.log("start_backup_serve :"+floID);
floGlobals.serveList.push(floID)
reactor.addEventListener("start_backup_serve", function (snfloID) {
console.log("start_backup_serve :"+snfloID);
//start serving the dead node
if(!floGlobals.serveList.includes(snfloID))
floGlobals.serveList.push(snfloID)
//indicate the last backup node to store the dead's backup too
var sendData = {
from: myFloID,
backupMsg: {
type: "startBackupStore",
snfloID: snfloID,
time: Date.now()
},
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
}
var lastIndex = floGlobals.backupNodes.length - 1
floGlobals.backupNodes[lastIndex].wsConn.send(JSON.stringify(sendData))
})
reactor.registerEvent("stop_backup_serve");
reactor.addEventListener("stop_backup_serve", function (floID) {
console.log("stop_backup_serve :"+floID);
floGlobals.serveList = floGlobals.serveList.filter(e => e !== floID)
reactor.addEventListener("stop_backup_serve", function (snfloID) {
console.log("stop_backup_serve :"+snfloID);
//stop serving the revived node
var index = floGlobals.serveList.indexOf(snfloID);
if (index !== -1) floGlobals.serveList.splice(index, 1);
//indicate the last backup node to stop storing the revived's backup
var sendData = {
from: myFloID,
backupMsg: {
type: "stopBackupStore",
snfloID: snfloID,
time: Date.now()
},
sign: floCrypto.signData(JSON.stringify(this.backupMsg), myPrivKey)
}
var lastIndex = floGlobals.backupNodes.length - 1
floGlobals.backupNodes[lastIndex].wsConn.send(JSON.stringify(sendData))
})
reactor.registerEvent("start_backup_store");
reactor.addEventListener("start_backup_store", function (floID) {
console.log("start_backup_store :"+floID);
if(!floGlobals.backupStoredList.includes(floID))
floGlobals.backupStoredList.push(floID)
})
reactor.registerEvent("stop_backup_store");
reactor.addEventListener("stop_backup_store", function (floID) {
console.log("stop_backup_store :"+floID);
var index = floGlobals.backupStoredList.indexOf(floID);
if (index !== -1) floGlobals.backupStoredList.splice(index, 1);
})
</script>
</body>