update supernode modules

This commit is contained in:
sairajzero 2021-07-16 20:01:45 +05:30
parent ac5a90ccd0
commit aa23c30287
3 changed files with 290 additions and 174 deletions

View File

@ -1,4 +1,7 @@
'use strict';
const {
stat
} = require('fs');
const WebSocket = require('ws');
//CONSTANTS
@ -21,10 +24,10 @@ const SUPERNODE_INDICATOR = '$',
BACKUP_HANDSHAKE_INIT = "handshakeInitate",
BACKUP_HANDSHAKE_END = "handshakeEnd";
/*Supernode Backup and migration functions*/
//const backupNodes = [];
var backupDepth, supernodeList, appList, kBucket;
//List of node backups stored
const _list = {};
Object.defineProperty(_list, 'delete', {
value: function(id) {
@ -55,32 +58,77 @@ Object.defineProperty(_list, 'serving', {
}
});
const _nextNode = {};
Object.defineProperty(_nextNode, 'set', {
value: function(id, ws) {
this.id = id;
this.ws = ws;
}
})
Object.defineProperty(_nextNode, 'send', {
value: function(packet) {
this.ws.send(packet);
}
});
//Node container
function NodeContainer() {
var _ws, _id, _onmessage, _onclose;
Object.defineProperty(this, 'set', {
value: function(id, ws) {
if (_ws !== undefined)
this.close();
_id = id;
_ws = ws;
if (_onmessage)
_ws.onmessage = _onmessage;
if (_onclose)
_ws.onclose = _onclose;
}
})
Object.defineProperty(this, 'id', {
get: function() {
return _id;
}
})
Object.defineProperty(this, 'readyState', {
get: function() {
if (_ws instanceof WebSocket)
return _ws.readyState;
else
return null;
}
})
Object.defineProperty(this, 'send', {
value: function(packet) {
_ws.send(packet);
}
});
Object.defineProperty(this, 'onmessage', {
set: function(fn) {
if (fn instanceof Function)
_onmessage = fn;
}
})
Object.defineProperty(this, 'onclose', {
set: function(fn) {
if (fn instanceof Function)
_onclose = fn;
}
})
Object.defineProperty(this, 'is', {
value: function(ws) {
return ws === _ws;
}
})
Object.defineProperty(this, 'close', {
value: function() {
if (_ws.readyState === 1) {
_ws.onclose = () => console.warn('Closing: ' + _id)
_ws.close();
}
_ws = _id = undefined;
}
})
}
const _prevNode = {};
Object.defineProperty(_prevNode, 'set', {
value: function(id, ws) {
this.id = id;
this.ws = ws;
}
})
Object.defineProperty(_prevNode, 'send', {
value: function(packet) {
this.ws.send(packet);
}
});
//Container for next-node
const _nextNode = new NodeContainer();
_nextNode.onmessage = evt => processTaskFromNextNode(evt.data);
_nextNode.onclose = evt => reconnectNextNode();
//Container for prev-node
const _prevNode = new NodeContainer();
_prevNode.onmessage = evt => processTaskFromPrevNode(evt.data);
_prevNode.onclose = evt => _prevNode.close();
//Packet processing
const packet_ = {}
packet_.constuct = function(message) {
const packet = {
@ -103,6 +151,7 @@ packet_.parse = function(str) {
}
}
//Set parameters from blockchain
function setBlockchainParameters(depth, supernodes, apps, KB, delay) {
backupDepth = depth;
supernodeList = supernodes;
@ -111,31 +160,137 @@ function setBlockchainParameters(depth, supernodes, apps, KB, delay) {
delayTime = delay;
}
function connectToNextNode(nodeID = myFloID) {
//-----NODE CONNECTORS (WEBSOCKET)-----
//Connect to Node websocket
function connectToNode(snID) {
return new Promise((resolve, reject) => {
let nextNodeID = kBucket.nextNode(nodeID);
if (!(snID in supernodeList))
return reject(`${snID} is not a supernode`)
const ws = new WebSocket("wss://" + supernodeList[nextNodeID].uri + "/");
ws.on("error", () => {
connectToNextNode(nextNodeID)
.then(result => resolve(result))
.catch(error => reject(error))
});
ws.on('open', function open() {
_nextNode.set(nextNodeID, ws)
ws.send(packet_.constuct({
type: BACKUP_HANDSHAKE_INIT
}));
resolve("BACKUP_HANDSHAKE_INIT: " + nextNodeID)
});
ws.on('message', function incoming(packet) {
processTaskFromNextNode(packet)
});
ws.on("error", () => reject(`${snID} is offline`));
ws.on('open', () => resolve(ws));
})
}
//Connect to Node websocket thats online
function connectToActiveNode(snID, reverse = false) {
return new Promise((resolve, reject) => {
if (!(snID in supernodeList))
return reject(`${snID} is not a supernode`)
if (snID === myFloID)
return reject(`Reached end of circle. Next node avaiable is self`)
connectToNode(snID)
.then(ws => resolve(ws))
.catch(error => {
var next = reverse ? kBucket.prevNode(snID) : kBucket.nextNode(snID)
connectToActiveNode(next, reverse)
.then(ws => resolve(ws))
.catch(error => reject(error))
})
})
}
//Connect to next available node
function connectToNextNode() {
return new Promise((resolve, reject) => {
let nextNodeID = kBucket.nextNode(nodeID);
connectToActiveNode(nextNodeID).then(ws => {
_nextNode.set(nextNodeID, ws)
_nextNode.send(packet_.constuct({
type: BACKUP_HANDSHAKE_INIT
}));
resolve("BACKUP_HANDSHAKE_INIT: " + nextNodeID)
}).catch(error => reject(error))
})
}
//-----PROCESS TASKS-----
//Tasks from next-node
function processTaskFromNextNode(packet) {
var {
from,
message
} = packet_.parse(packet)
if (message) {
message.forEach(task => {
switch (task.type) {
case RECONNECT_NEXT_NODE: //Triggered when a node inbetween is available
reconnectNextNode()
break;
case BACKUP_HANDSHAKE_END:
handshakeEnd()
break;
case DATA_REQUEST:
sendStoredData(task.nodes, _nextNode)
break;
case DATA_SYNC:
dataSyncIndication(task.id, task.status, from)
break;
case STORE_BACKUP_DATA:
storeBackupData(task.data)
break;
}
})
}
}
//Tasks from prev-node
function processTaskFromPrevNode(packet) {
var {
from,
message
} = packet_.parse(packet)
if (message) {
message.forEach(task => {
switch (task.type) {
case ORDER_BACKUP:
orderBackup(task.order)
break;
case STORE_BACKUP_DATA:
storeBackupData(task.data)
break;
case TAG_BACKUP_DATA:
tagBackupData(task.data)
break;
case DATA_REQUEST:
sendStoredData(task.nodes, _prevNode)
break;
case DATA_SYNC:
dataSyncIndication(task.id, task.status, from)
break;
case INITIATE_REFRESH: //TODO
initiateRefresh()
break;
}
});
}
}
//Tasks from any supernode
function processTaskFromSupernode(packet, ws) {
var {
from,
message
} = packet_.parse(packet)
if (message) {
message.forEach(task => {
switch (task.type) {
case BACKUP_HANDSHAKE_INIT:
handshakeMid(from, ws)
break;
case STORE_MIGRATED_DATA: //TODO
storeMigratedData(task.data)
break;
}
});
}
}
//-----HANDSHAKE PROCESS-----
//Acknowledge handshake
function handshakeMid(id, ws) {
if (_prevNode.id) {
if (kBucket.innerNodes(_prevNode.id, myFloID).includes(id)) {
@ -200,6 +355,7 @@ function handshakeMid(id, ws) {
})
}
//Complete handshake
function handshakeEnd() {
console.log("Backup connected: " + _nextNode.id)
_nextNode.send(packet_.constuct({
@ -208,12 +364,17 @@ function handshakeEnd() {
}))
}
//Reconnect to next available node
function reconnectNextNode() {
_nextNode.close();
connectToNextNode()
.then(result => console.log(result))
.catch(error => console.error(error))
}
//-----BACKUP TASKS-----
//Order the stored backup
function orderBackup(order) {
let new_order = [];
for (let n in order) {
@ -234,102 +395,52 @@ function orderBackup(order) {
}
}
//Send stored data
function sendStoredData(lastlogs, node) {
for (n in lastlogs) {
if (_list.stored.includes(n)) {
db.getData(n, lastlogs[n]).then(result => {
node.send(packet_.constuct({
type: DATA_SYNC,
id: n,
status: true
}))
//TODO: efficiently handle large number of data instead of loading all into memory
result.forEach(d => node.send(packet_.constuct({
type: STORE_BACKUP_DATA,
data: d
})))
node.send(packet_.constuct({
type: DATA_SYNC,
id: n,
status: false
}))
}).catch(error => reject(error))
}
}
}
function dataSyncIndication(snID, status, from) {
console.info(`${status ? 'START':'END'}: ${snID} data sync(receive) form ${from}`);
}
//Store (backup) data
function storeBackupData(data) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.stored.includes(closestNode))
db.storeData(closestNode, data);
}
//tag (backup) data
function tagBackupData(data) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.stored.includes(closestNode))
db.storeTag(closestNode, data);
}
function processTaskFromNextNode(packet) {
var {
from,
message
} = packet_.parse(packet)
if (message) {
message.forEach(task => {
switch (task.type) {
case RECONNECT_NEXT_NODE: //Triggered when a node inbetween is available
reconnectNextNode();
break;
case BACKUP_HANDSHAKE_END:
handshakeEnd();
break;
case DATA_REQUEST:
sendStoredData(task, from)
break;
case DATA_SYNC:
dataSyncIndication(task, from)
break;
}
})
}
}
function processTaskFromPrevNode(packet) {
var {
from,
message
} = packet_.parse(packet)
if (message) {
message.forEach(task => {
switch (task.type) {
case ORDER_BACKUP:
orderBackup(task.order);
break;
case STORE_BACKUP_DATA:
storeBackupData(task.data)
break;
case TAG_BACKUP_DATA:
tagBackupData(task)
break;
case DATA_REQUEST:
sendStoredData(data.sn_msg.snID, data.from)
break;
case DATA_SYNC:
dataSyncIndication(data.sn_msg.snID, data.sn_msg.mode, data.from)
break;
case INITIATE_REFRESH:
initiateRefresh()
break;
}
});
}
}
function processTaskFromSupernode(packet, ws) {
var {
from,
message
} = packet_.parse(packet)
if (message) {
message.forEach(task => {
switch (task.type) {
case BACKUP_HANDSHAKE_INIT:
handshakeMid(from, ws)
break;
case STORE_MIGRATED_DATA:
storeMigratedData(data.sn_msg)
break;
}
});
}
}
function processTask(packet, ws) {
if (_prevNode.is(ws))
processTaskFromPrevNode(packet)
else
processTaskFromSupernode(packet, ws)
}
//-----EXPORTS-----
module.exports = {
processTask,
processTaskFromSupernode,
setBlockchainParameters,
SUPERNODE_INDICATOR
}

View File

@ -1,52 +1,36 @@
'use strict';
var mysql = require('mysql');
(function() {
var db = module.exports = {}
const H_struct = {
VECTOR_CLOCK: "vectorClock",
SENDER_ID: "senderID",
RECEIVER_ID: "receiverID",
TYPE: "type",
APPLICATION: "application"
}
const H_struct = {
VECTOR_CLOCK: "vectorClock",
SENDER_ID: "senderID",
RECEIVER_ID: "receiverID",
TYPE: "type",
APPLICATION: "application"
}
const B_struct = {
TIME: "time",
MESSAGE: "message",
SIGNATURE: "sign",
PUB_KEY: "pubKey",
COMMENT: "comment"
}
const B_struct = {
TIME: "time",
MESSAGE: "message",
SIGNATURE: "sign",
PUB_KEY: "pubKey",
COMMENT: "comment"
}
const L_struct = {
STATUS: "status_n",
LOG_TIME: "log_time"
}
const L_struct = {
STATUS: "status_n",
LOG_TIME: "log_time"
}
const T_struct = {
TAG: "tag",
TAG_TIME: "tag_time",
TAG_KEY: "tag_key",
TAG_SIGN: "tag_sign"
}
const T_struct = {
TAG: "tag",
TAG_TIME: "tag_time",
TAG_KEY: "tag_key",
TAG_SIGN: "tag_sign"
}
db.connect = function(user, password, db, host = 'localhost') {
return new Promise((resolve, reject) => {
let conn = mysql.createConnection({
host: host,
user: user,
password: password,
database: db
});
conn.connect((err) => {
if (err) return reject(err);
db.conn = conn;
resolve("Connected")
})
})
}
function Database(user, password, dbname, host = 'localhost') {
const db = {};
db.createTable = function(snID) {
return new Promise((resolve, reject) => {
@ -195,4 +179,24 @@ var mysql = require('mysql');
db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data));
})
}
})()
db.close = function() {
db.conn.end();
}
return new Promise((resolve, reject) => {
let conn = mysql.createConnection({
host: host,
user: user,
password: password,
database: dbname
});
conn.connect((err) => {
if (err) return reject(err);
db.conn = conn;
resolve(db)
});
})
}
module.exports = Database;

View File

@ -39,12 +39,13 @@ const wsServer = new WebSocket.Server({
server
});
wsServer.on('connection', function connection(ws) {
ws.on('message', message => {
ws.onmessage = function(evt){
let message = evt.data;
if (message.startsWith(backupProcess.SUPERNODE_INDICATOR))
backupProcess.processTask(message, ws);
backupProcess.processTaskFromSupernode(message, ws);
else
supernode.processRequestFromUser(JSON.parse(message))
.then(result => ws.send(JSON.parse(result[0])))
.catch(error => ws.send(error))
});
}
});