Adding SuperNode modules

This commit is contained in:
sairajzero 2021-07-16 04:37:00 +05:30
parent 1f5a9218dd
commit c1e56354b0
4 changed files with 474 additions and 19 deletions

335
src/backupProcess.js Normal file
View File

@ -0,0 +1,335 @@
'use strict';
const WebSocket = require('ws');
//CONSTANTS
const SUPERNODE_INDICATOR = '$',
//Message type
ORDER_BACKUP = "orderBackup",
START_BACKUP_SERVE = "startBackupServe",
STOP_BACKUP_SERVE = "stopBackupServe",
START_BACKUP_STORE = "startBackupStore",
STOP_BACKUP_STORE = "stopBackupStore",
STORE_BACKUP_DATA = "backupData",
STORE_MIGRATED_DATA = "migratedData",
DELETE_BACKUP_DATA = "backupDelete",
TAG_BACKUP_DATA = "backupTag",
EDIT_BACKUP_DATA = "backupEdit",
NODE_ONLINE = "supernodeUp",
INITIATE_REFRESH = "initiateRefresh",
DATA_REQUEST = "dataRequest",
DATA_SYNC = "dataSync",
BACKUP_HANDSHAKE_INIT = "handshakeInitate",
BACKUP_HANDSHAKE_END = "handshakeEnd";
/*Supernode Backup and migration functions*/
//const backupNodes = [];
var backupDepth, supernodeList, appList, kBucket;
const _list = {};
Object.defineProperty(_list, 'delete', {
value: function(id) {
delete this[id];
}
});
Object.defineProperty(_list, 'get', {
value: function(keys = null) {
if (keys === null) keys = Object.keys(this);
if (Array.isArray(keys))
return Object.fromEntries(keys.map(k => [k, this[k]]));
else
return this[keys];
}
})
Object.defineProperty(_list, 'stored', {
get: function() {
return Object.keys(this);
}
});
Object.defineProperty(_list, 'serving', {
get: function() {
let serveList = []
for (let id in this)
if (this[id] === 0)
serveList.push(id);
return serveList;
}
});
const _nextNode = {};
Object.defineProperty(_nextNode, 'set', {
value: function(id, ws) {
this.id = id;
this.ws = ws;
}
})
Object.defineProperty(_nextNode, 'send', {
value: function(packet) {
this.ws.send(packet);
}
});
const _prevNode = {};
Object.defineProperty(_prevNode, 'set', {
value: function(id, ws) {
this.id = id;
this.ws = ws;
}
})
Object.defineProperty(_prevNode, 'send', {
value: function(packet) {
this.ws.send(packet);
}
});
const packet_ = {}
packet_.constuct = function(message) {
const packet = {
from: myFloID,
message: message,
time: Date.now()
}
packet.sign = floCrypto.signData(this.s(packet), myPrivKey);
return SUPERNODE_INDICATOR + JSON.stringify(packet)
}
packet_.s = d => [JSON.stringify(d.message), d.time].join("|");
packet_.parse = function(str) {
let packet = JSON.parse(str.substring(SUPERNODE_INDICATOR.length))
let curTime = Date.now();
if (packet.time > curTime - delayTime &&
floCrypto.verifySign(this.s(packet), packet.sign, supernodeList[packet.from].pubKey)) {
if (!Array.isArray(packet.message))
packet.message = [packet.message];
return packet;
}
}
function setBlockchainParameters(depth, supernodes, apps, KB, delay) {
backupDepth = depth;
supernodeList = supernodes;
appList = apps;
kBucket = KB;
delayTime = delay;
}
function connectToNextNode(nodeID = myFloID) {
return new Promise((resolve, reject) => {
let nextNodeID = kBucket.nextNode(nodeID);
const ws = new WebSocket("wss://" + supernodeList[nextNodeID].uri + "/");
ws.on("error", () => {
connectToNextNode(nextNodeID)
.then(result => resolve(result))
.catch(error => reject(error))
});
ws.on('open', function open() {
_nextNode.set(nextNodeID, ws)
ws.send(packet_.constuct({
type: BACKUP_HANDSHAKE_INIT
}));
resolve("BACKUP_HANDSHAKE_INIT: " + nextNodeID)
});
ws.on('message', function incoming(packet) {
processTaskFromNextNode(packet)
});
})
}
function handshakeMid(id, ws) {
if (_prevNode.id) {
if (kBucket.innerNodes(_prevNode.id, myFloID).includes(id)) {
//close existing prev-node connection
_prevNode.send(packet_.constuct({
type: RECONNECT_NEXT_NODE
}));
_prevNode.close();
//set the new prev-node connection
_prevNode.set(id, ws);
_prevNode.send(packet_.constuct({
type: BACKUP_HANDSHAKE_END
}))
} else {
//Incorrect order, existing prev-node is already after the incoming node
ws.send(packet_.constuct({
type: RECONNECT_NEXT_NODE
}))
return;
}
} else {
//set the new prev-node connection
_prevNode.set(id, ws);
_prevNode.send(packet_.constuct({
type: BACKUP_HANDSHAKE_END
}))
}
//Reorder storelist
let nodes = kBucket.innerNodes(_prevNode.id, myFloID).concat(myFloID),
req_sync = [],
new_order = [];
nodes.forEach(n => {
switch (_list[n]) {
case 0: //Do nothing
break;
case undefined:
req_sync.push(n);
default:
_list[n] = 0;
new_order.push(n);
}
});
if (!req_sync.length && !new_order.length)
return; //No order change and no need for any data sync
Promise.all(req_sync.forEach(n => db.createGetLastLog(n))).then(result => {
let tasks = [];
if (req_sync.length) {
tasks.push({
type: DATA_REQUEST,
nodes: Object.fromEntries(req_sync.map((k, i) => [k, result[i]]))
})
}
if (new_order.length) {
tasks.push({
type: ORDER_BACKUP,
order: _list.get(new_order)
})
}
_nextNode.send(packet_.constuct(tasks));
}).catch(error => {
FATAL.RECONNECTION_REQUIRED //TODO
})
}
function handshakeEnd() {
console.log("Backup connected: " + _nextNode.id)
_nextNode.send(packet_.constuct({
type: ORDER_BACKUP,
order: _list.get()
}))
}
function reconnectNextNode() {
connectToNextNode()
.then(result => console.log(result))
.catch(error => console.error(error))
}
function orderBackup(order) {
let new_order = [];
for (let n in order) {
if (order[n] + 1 !== _list[n]) {
if (order[n] >= backupDepth)
REMOVE_STORING_if_exist(N) //TODO
else if (condition) { //TODO: condition to check roll over when less nodes online
_list[n] = order[n] + 1;
new_order.push(n);
}
}
}
if (new_order.length) {
_nextNode.send(packet_.constuct({
type: ORDER_BACKUP,
order: _list.get(new_order)
}));
}
}
function storeBackupData(data) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.stored.includes(closestNode))
db.storeData(closestNode, data);
}
function tagBackupData(data) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.stored.includes(closestNode))
db.storeTag(closestNode, data);
}
function processTaskFromNextNode(packet) {
var {
from,
message
} = packet_.parse(packet)
if (message) {
message.forEach(task => {
switch (task.type) {
case RECONNECT_NEXT_NODE: //Triggered when a node inbetween is available
reconnectNextNode();
break;
case BACKUP_HANDSHAKE_END:
handshakeEnd();
break;
case DATA_REQUEST:
sendStoredData(task, from)
break;
case DATA_SYNC:
dataSyncIndication(task, from)
break;
}
})
}
}
function processTaskFromPrevNode(packet) {
var {
from,
message
} = packet_.parse(packet)
if (message) {
message.forEach(task => {
switch (task.type) {
case ORDER_BACKUP:
orderBackup(task.order);
break;
case STORE_BACKUP_DATA:
storeBackupData(task.data)
break;
case TAG_BACKUP_DATA:
tagBackupData(task)
break;
case DATA_REQUEST:
sendStoredData(data.sn_msg.snID, data.from)
break;
case DATA_SYNC:
dataSyncIndication(data.sn_msg.snID, data.sn_msg.mode, data.from)
break;
case INITIATE_REFRESH:
initiateRefresh()
break;
}
});
}
}
function processTaskFromSupernode(packet, ws) {
var {
from,
message
} = packet_.parse(packet)
if (message) {
message.forEach(task => {
switch (task.type) {
case BACKUP_HANDSHAKE_INIT:
handshakeMid(from, ws)
break;
case STORE_MIGRATED_DATA:
storeMigratedData(data.sn_msg)
break;
}
});
}
}
function processTask(packet, ws) {
if (_prevNode.is(ws))
processTaskFromPrevNode(packet)
else
processTaskFromSupernode(packet, ws)
}
module.exports = {
processTask,
setBlockchainParameters,
SUPERNODE_INDICATOR
}

View File

@ -42,7 +42,7 @@ var mysql = require('mysql');
});
conn.connect((err) => {
if (err) return reject(err);
this.conn = conn;
db.conn = conn;
resolve("Connected")
})
})
@ -68,14 +68,14 @@ var mysql = require('mysql');
T_struct.TAG_KEY + " CHAR(66), " +
T_struct.TAG_SIGN + " VARCHAR(160), " +
"PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")";
this.conn.query(statement1, (err, res) => err ? reject(err) : resolve(res));
db.conn.query(statement1, (err, res) => err ? reject(err) : resolve(res));
})
}
db.dropTable = function(snID) {
return new Promise((resolve, reject) => {
let statement = "DROP TABLE " + snID;
this.conn.query(statement, (err, res) => err ? reject(err) : resolve(res));
db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res));
})
}
@ -86,21 +86,27 @@ var mysql = require('mysql');
let statement = "INSERT INTO " + snID +
" (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " +
"VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ")";
this.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data));
db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data));
})
}
db.tagData = function(snID, vectorClock, tag, tagTime, tagKey, tagSign) {
return new Promise((resolve, reject) => {
let data = {
[T_struct.TAG]: tag,
[T_struct.TAG_TIME]: tagTime,
[T_struct.TAG_KEY]: tagKey,
[T_struct.TAG_SIGN]: tagSign,
[L_struct.LOG_TIME]: Date.now(),
[H_struct.VECTOR_CLOCK]: vectorClock
}
let attr = Object.keys(data);
let values = attr.map(a => data[a]);
data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data
let statement = "UPDATE " + snID +
" SET " +
T_struct.TAG + "=?, " +
T_struct.TAG_TIME + "=?, " +
T_struct.TAG_KEY + "=?, " +
T_struct.TAG_SIGN + "=? " +
L_struct.LOG_TIME + "=?"
" WHERE " + H_struct.VECTOR_CLOCK + "=" + vectorClock;
this.conn.query(statement, [tag, tagTime, tagKey, tagSign, Date.now()], (err, res) => err ? reject(err) : resolve(res));
" SET " + attr.map(a => a + "=?").join(", ") +
" WHERE " + H_struct.VECTOR_CLOCK + "=" + vectorClock;
db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data));
})
}
@ -134,7 +140,25 @@ var mysql = require('mysql');
" FROM " + snID +
" WHERE " + conditionArr.join(" AND ") +
request.mostRecent ? "LIMIT 1" : (" ORDER BY " + H_struct.VECTOR_CLOCK);
this.conn.query(statement, (err, res) => err ? reject(err) : resolve(res));
db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res));
})
}
db.lastLogTime = function(snID) {
return new Promise((resolve, reject) => {
let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM " + snID;
db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res))
})
}
db.createGetLastLog = function(snID) {
return new Promise((resolve, reject) => {
db.createTable(snID).then(result => {}).catch(error => {})
.finally(_ => {
db.lastLogTime(snID)
.then(result => resolve(result))
.catch(error => reject(error))
})
})
}
@ -143,7 +167,7 @@ var mysql = require('mysql');
let statement = "SELECT * FROM " + snID +
" WHERE " + L_struct.LOG_TIME + ">=" + logtime +
" ORDER BY " + L_struct.LOG_TIME;
this.conn.query(statement, (err, res) => err ? reject(err) : resolve(res))
db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res))
})
}
@ -157,7 +181,18 @@ var mysql = require('mysql');
" (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " +
"VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ") " +
"ON DUPLICATE KEY UPDATE " + u_attr.map(a => a + "=?").join(", ");
this.conn.query(statement, values.concat(u_values), (err, res) => err ? reject(err) : resolve(res));
db.conn.query(statement, values.concat(u_values), (err, res) => err ? reject(err) : resolve(res));
})
}
db.storeTag = function(snID, data) {
return new Promise((resolve, reject) => {
let attr = Object.keys(T_struct).map(a => T_struct[a]).concat(L_struct.LOG_TIME);
let values = attr.map(a => data[a]);
let statement = "UPDATE " + snID +
" SET " + attr.map(a => a + "=?").join(", ") +
" WHERE " + H_struct.VECTOR_CLOCK + "=" + data[H_struct.VECTOR_CLOCK];
db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data));
})
}
})()

View File

@ -1,5 +1,7 @@
const http = require('http')
const WebSocket = require('ws')
const supernode = require('./supernode')
const backupProcess = require('./backupProcess')
const port = 8080;
const server = http.createServer((req, res) => {
if (req.method === "GET") {
@ -38,11 +40,10 @@ const wsServer = new WebSocket.Server({
});
wsServer.on('connection', function connection(ws) {
ws.on('message', message => {
var request = JSON.parse(message)
if (FROM_SUPERNODE) //TODO
backupProcess.processTaskFromSupernode(request, res => ws.send(res))
if (message.startsWith(backupProcess.SUPERNODE_INDICATOR))
backupProcess.processTask(message, ws);
else
supernode.processRequestFromUser(request)
supernode.processRequestFromUser(JSON.parse(message))
.then(result => ws.send(JSON.parse(result[0])))
.catch(error => ws.send(error))
});

84
src/supernode.js Normal file
View File

@ -0,0 +1,84 @@
const db = require("./database")
function processIncomingData(data) {
return new Promise((resolve, reject) => {
try {
data = JSON.parse(data);
} catch (error) {
return reject("Data not in JSON-Format")
}
let curTime = Date.now()
if (!data.time || data.time > curTime + floGlobals.supernodeConfig.delayDelta ||
data.time < curTime - floGlobals.supernodeConfig.delayDelta)
return reject("Invalid Time");
else {
let process;
if (data.request)
process = processRequestFromUser(gid, uid, data);
else if (data.message)
process = processDataFromUser(gid, uid, data);
//else if (data.edit)
// return processEditFromUser(gid, uid, data);
else if (data.mark)
process = processMarkFromUser(gid, uid, data);
//else if (data.delete)
// return processDeleteFromUser(gid, uid, data);
else
return reject("Invalid Data-format")
process.then(result => resolve(result))
.catch(error => reject(error))
}
/* if (floGlobals.supernodeConfig.errorFeedback)
floSupernode.supernodeClientWS.send(`@${uid}#${gid}:${error.toString()}`)
*/
})
}
function processDataFromUser(data) {
return new Promise((resolve, reject) => {
if (!floCrypto.validateAddr(data.receiverID))
return reject("Invalid receiverID")
let closeNode = floSupernode.kBucket.closestNode(data.receiverID)
if (!floGlobals.serveList.includes(closeNode))
return reject("Incorrect Supernode")
if (!floCrypto.validateAddr(data.receiverID))
return reject("Invalid senderID")
if (data.senderID !== floCrypto.getFloID(data.pubKey))
return reject("Invalid pubKey")
let hashcontent = ["receiverID", "time", "application", "type", "message", "comment"]
.map(d => data[d]).join("|")
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
return reject("Invalid signature")
db.addData(closeNode, {
vectorClock: `${Date.now()}_${data.senderID}`,
senderID: data.senderID,
receiverID: data.receiverID,
time: data.time,
application: data.application,
type: data.type,
message: data.message,
comment: data.comment,
sign: data.sign,
pubKey: data.pubKey
}).then(result => resolve(result))
.catch(error => reject(error))
})
}
function processRequestFromUser(request) {
return new Promise((resolve, reject) => {
if (!floCrypto.validateAddr(request.receiverID))
return reject("Invalid receiverID");
let closeNode = floSupernode.kBucket.closestNode(request.receiverID)
if (!floGlobals.serveList.includes(closeNode))
return reject("Incorrect Supernode");
db.searchData(closeNode, request)
.then(result => resolve(result))
.catch(error => reject(error))
})
}