From c1e56354b01018e890835058ce085620cd5364f6 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 16 Jul 2021 04:37:00 +0530 Subject: [PATCH] Adding SuperNode modules --- src/backupProcess.js | 335 +++++++++++++++++++++++++++++++++++++++++++ src/database.js | 65 +++++++-- src/server.js | 9 +- src/supernode.js | 84 +++++++++++ 4 files changed, 474 insertions(+), 19 deletions(-) create mode 100644 src/backupProcess.js create mode 100644 src/supernode.js diff --git a/src/backupProcess.js b/src/backupProcess.js new file mode 100644 index 0000000..9264f06 --- /dev/null +++ b/src/backupProcess.js @@ -0,0 +1,335 @@ +'use strict'; +const WebSocket = require('ws'); + +//CONSTANTS +const SUPERNODE_INDICATOR = '$', + //Message type + ORDER_BACKUP = "orderBackup", + START_BACKUP_SERVE = "startBackupServe", + STOP_BACKUP_SERVE = "stopBackupServe", + START_BACKUP_STORE = "startBackupStore", + STOP_BACKUP_STORE = "stopBackupStore", + STORE_BACKUP_DATA = "backupData", + STORE_MIGRATED_DATA = "migratedData", + DELETE_BACKUP_DATA = "backupDelete", + TAG_BACKUP_DATA = "backupTag", + EDIT_BACKUP_DATA = "backupEdit", + NODE_ONLINE = "supernodeUp", + INITIATE_REFRESH = "initiateRefresh", + DATA_REQUEST = "dataRequest", + DATA_SYNC = "dataSync", + BACKUP_HANDSHAKE_INIT = "handshakeInitate", + BACKUP_HANDSHAKE_END = "handshakeEnd"; + +/*Supernode Backup and migration functions*/ +//const backupNodes = []; +var backupDepth, supernodeList, appList, kBucket; + +const _list = {}; +Object.defineProperty(_list, 'delete', { + value: function(id) { + delete this[id]; + } +}); +Object.defineProperty(_list, 'get', { + value: function(keys = null) { + if (keys === null) keys = Object.keys(this); + if (Array.isArray(keys)) + return Object.fromEntries(keys.map(k => [k, this[k]])); + else + return this[keys]; + } +}) +Object.defineProperty(_list, 'stored', { + get: function() { + return Object.keys(this); + } +}); +Object.defineProperty(_list, 'serving', { + get: function() { + let serveList = [] + for (let id in this) + if (this[id] === 0) + serveList.push(id); + return serveList; + } +}); + +const _nextNode = {}; +Object.defineProperty(_nextNode, 'set', { + value: function(id, ws) { + this.id = id; + this.ws = ws; + } +}) +Object.defineProperty(_nextNode, 'send', { + value: function(packet) { + this.ws.send(packet); + } +}); + +const _prevNode = {}; +Object.defineProperty(_prevNode, 'set', { + value: function(id, ws) { + this.id = id; + this.ws = ws; + } +}) +Object.defineProperty(_prevNode, 'send', { + value: function(packet) { + this.ws.send(packet); + } +}); + +const packet_ = {} +packet_.constuct = function(message) { + const packet = { + from: myFloID, + message: message, + time: Date.now() + } + packet.sign = floCrypto.signData(this.s(packet), myPrivKey); + return SUPERNODE_INDICATOR + JSON.stringify(packet) +} +packet_.s = d => [JSON.stringify(d.message), d.time].join("|"); +packet_.parse = function(str) { + let packet = JSON.parse(str.substring(SUPERNODE_INDICATOR.length)) + let curTime = Date.now(); + if (packet.time > curTime - delayTime && + floCrypto.verifySign(this.s(packet), packet.sign, supernodeList[packet.from].pubKey)) { + if (!Array.isArray(packet.message)) + packet.message = [packet.message]; + return packet; + } +} + +function setBlockchainParameters(depth, supernodes, apps, KB, delay) { + backupDepth = depth; + supernodeList = supernodes; + appList = apps; + kBucket = KB; + delayTime = delay; +} + +function connectToNextNode(nodeID = myFloID) { + return new Promise((resolve, reject) => { + let nextNodeID = kBucket.nextNode(nodeID); + + const ws = new WebSocket("wss://" + supernodeList[nextNodeID].uri + "/"); + ws.on("error", () => { + connectToNextNode(nextNodeID) + .then(result => resolve(result)) + .catch(error => reject(error)) + }); + + ws.on('open', function open() { + _nextNode.set(nextNodeID, ws) + ws.send(packet_.constuct({ + type: BACKUP_HANDSHAKE_INIT + })); + resolve("BACKUP_HANDSHAKE_INIT: " + nextNodeID) + }); + + ws.on('message', function incoming(packet) { + processTaskFromNextNode(packet) + }); + }) +} + +function handshakeMid(id, ws) { + if (_prevNode.id) { + if (kBucket.innerNodes(_prevNode.id, myFloID).includes(id)) { + //close existing prev-node connection + _prevNode.send(packet_.constuct({ + type: RECONNECT_NEXT_NODE + })); + _prevNode.close(); + //set the new prev-node connection + _prevNode.set(id, ws); + _prevNode.send(packet_.constuct({ + type: BACKUP_HANDSHAKE_END + })) + } else { + //Incorrect order, existing prev-node is already after the incoming node + ws.send(packet_.constuct({ + type: RECONNECT_NEXT_NODE + })) + return; + } + } else { + //set the new prev-node connection + _prevNode.set(id, ws); + _prevNode.send(packet_.constuct({ + type: BACKUP_HANDSHAKE_END + })) + } + //Reorder storelist + let nodes = kBucket.innerNodes(_prevNode.id, myFloID).concat(myFloID), + req_sync = [], + new_order = []; + nodes.forEach(n => { + switch (_list[n]) { + case 0: //Do nothing + break; + case undefined: + req_sync.push(n); + default: + _list[n] = 0; + new_order.push(n); + } + }); + if (!req_sync.length && !new_order.length) + return; //No order change and no need for any data sync + Promise.all(req_sync.forEach(n => db.createGetLastLog(n))).then(result => { + let tasks = []; + if (req_sync.length) { + tasks.push({ + type: DATA_REQUEST, + nodes: Object.fromEntries(req_sync.map((k, i) => [k, result[i]])) + }) + } + if (new_order.length) { + tasks.push({ + type: ORDER_BACKUP, + order: _list.get(new_order) + }) + } + _nextNode.send(packet_.constuct(tasks)); + }).catch(error => { + FATAL.RECONNECTION_REQUIRED //TODO + }) +} + +function handshakeEnd() { + console.log("Backup connected: " + _nextNode.id) + _nextNode.send(packet_.constuct({ + type: ORDER_BACKUP, + order: _list.get() + })) +} + +function reconnectNextNode() { + connectToNextNode() + .then(result => console.log(result)) + .catch(error => console.error(error)) +} + +function orderBackup(order) { + let new_order = []; + for (let n in order) { + if (order[n] + 1 !== _list[n]) { + if (order[n] >= backupDepth) + REMOVE_STORING_if_exist(N) //TODO + else if (condition) { //TODO: condition to check roll over when less nodes online + _list[n] = order[n] + 1; + new_order.push(n); + } + } + } + if (new_order.length) { + _nextNode.send(packet_.constuct({ + type: ORDER_BACKUP, + order: _list.get(new_order) + })); + } +} + +function storeBackupData(data) { + let closestNode = kBucket.closestNode(data.receiverID); + if (_list.stored.includes(closestNode)) + db.storeData(closestNode, data); +} + +function tagBackupData(data) { + let closestNode = kBucket.closestNode(data.receiverID); + if (_list.stored.includes(closestNode)) + db.storeTag(closestNode, data); +} + +function processTaskFromNextNode(packet) { + var { + from, + message + } = packet_.parse(packet) + if (message) { + message.forEach(task => { + switch (task.type) { + case RECONNECT_NEXT_NODE: //Triggered when a node inbetween is available + reconnectNextNode(); + break; + case BACKUP_HANDSHAKE_END: + handshakeEnd(); + break; + case DATA_REQUEST: + sendStoredData(task, from) + break; + case DATA_SYNC: + dataSyncIndication(task, from) + break; + } + }) + } +} + +function processTaskFromPrevNode(packet) { + var { + from, + message + } = packet_.parse(packet) + if (message) { + message.forEach(task => { + switch (task.type) { + case ORDER_BACKUP: + orderBackup(task.order); + break; + case STORE_BACKUP_DATA: + storeBackupData(task.data) + break; + case TAG_BACKUP_DATA: + tagBackupData(task) + break; + case DATA_REQUEST: + sendStoredData(data.sn_msg.snID, data.from) + break; + case DATA_SYNC: + dataSyncIndication(data.sn_msg.snID, data.sn_msg.mode, data.from) + break; + case INITIATE_REFRESH: + initiateRefresh() + break; + } + }); + } +} + +function processTaskFromSupernode(packet, ws) { + var { + from, + message + } = packet_.parse(packet) + if (message) { + message.forEach(task => { + switch (task.type) { + case BACKUP_HANDSHAKE_INIT: + handshakeMid(from, ws) + break; + case STORE_MIGRATED_DATA: + storeMigratedData(data.sn_msg) + break; + } + }); + } +} + +function processTask(packet, ws) { + if (_prevNode.is(ws)) + processTaskFromPrevNode(packet) + else + processTaskFromSupernode(packet, ws) +} + +module.exports = { + processTask, + setBlockchainParameters, + SUPERNODE_INDICATOR +} \ No newline at end of file diff --git a/src/database.js b/src/database.js index df4512f..4acecb0 100644 --- a/src/database.js +++ b/src/database.js @@ -42,7 +42,7 @@ var mysql = require('mysql'); }); conn.connect((err) => { if (err) return reject(err); - this.conn = conn; + db.conn = conn; resolve("Connected") }) }) @@ -68,14 +68,14 @@ var mysql = require('mysql'); T_struct.TAG_KEY + " CHAR(66), " + T_struct.TAG_SIGN + " VARCHAR(160), " + "PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")"; - this.conn.query(statement1, (err, res) => err ? reject(err) : resolve(res)); + db.conn.query(statement1, (err, res) => err ? reject(err) : resolve(res)); }) } db.dropTable = function(snID) { return new Promise((resolve, reject) => { let statement = "DROP TABLE " + snID; - this.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)); + db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)); }) } @@ -86,21 +86,27 @@ var mysql = require('mysql'); let statement = "INSERT INTO " + snID + " (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " + "VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ")"; - this.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data)); + db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data)); }) } db.tagData = function(snID, vectorClock, tag, tagTime, tagKey, tagSign) { return new Promise((resolve, reject) => { + let data = { + [T_struct.TAG]: tag, + [T_struct.TAG_TIME]: tagTime, + [T_struct.TAG_KEY]: tagKey, + [T_struct.TAG_SIGN]: tagSign, + [L_struct.LOG_TIME]: Date.now(), + [H_struct.VECTOR_CLOCK]: vectorClock + } + let attr = Object.keys(data); + let values = attr.map(a => data[a]); + data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data let statement = "UPDATE " + snID + - " SET " + - T_struct.TAG + "=?, " + - T_struct.TAG_TIME + "=?, " + - T_struct.TAG_KEY + "=?, " + - T_struct.TAG_SIGN + "=? " + - L_struct.LOG_TIME + "=?" - " WHERE " + H_struct.VECTOR_CLOCK + "=" + vectorClock; - this.conn.query(statement, [tag, tagTime, tagKey, tagSign, Date.now()], (err, res) => err ? reject(err) : resolve(res)); + " SET " + attr.map(a => a + "=?").join(", ") + + " WHERE " + H_struct.VECTOR_CLOCK + "=" + vectorClock; + db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data)); }) } @@ -134,7 +140,25 @@ var mysql = require('mysql'); " FROM " + snID + " WHERE " + conditionArr.join(" AND ") + request.mostRecent ? "LIMIT 1" : (" ORDER BY " + H_struct.VECTOR_CLOCK); - this.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)); + db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)); + }) + } + + db.lastLogTime = function(snID) { + return new Promise((resolve, reject) => { + let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM " + snID; + db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)) + }) + } + + db.createGetLastLog = function(snID) { + return new Promise((resolve, reject) => { + db.createTable(snID).then(result => {}).catch(error => {}) + .finally(_ => { + db.lastLogTime(snID) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) }) } @@ -143,7 +167,7 @@ var mysql = require('mysql'); let statement = "SELECT * FROM " + snID + " WHERE " + L_struct.LOG_TIME + ">=" + logtime + " ORDER BY " + L_struct.LOG_TIME; - this.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)) + db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)) }) } @@ -157,7 +181,18 @@ var mysql = require('mysql'); " (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " + "VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ") " + "ON DUPLICATE KEY UPDATE " + u_attr.map(a => a + "=?").join(", "); - this.conn.query(statement, values.concat(u_values), (err, res) => err ? reject(err) : resolve(res)); + db.conn.query(statement, values.concat(u_values), (err, res) => err ? reject(err) : resolve(res)); + }) + } + + db.storeTag = function(snID, data) { + return new Promise((resolve, reject) => { + let attr = Object.keys(T_struct).map(a => T_struct[a]).concat(L_struct.LOG_TIME); + let values = attr.map(a => data[a]); + let statement = "UPDATE " + snID + + " SET " + attr.map(a => a + "=?").join(", ") + + " WHERE " + H_struct.VECTOR_CLOCK + "=" + data[H_struct.VECTOR_CLOCK]; + db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data)); }) } })() \ No newline at end of file diff --git a/src/server.js b/src/server.js index e3a0f0a..646952e 100644 --- a/src/server.js +++ b/src/server.js @@ -1,5 +1,7 @@ const http = require('http') const WebSocket = require('ws') +const supernode = require('./supernode') +const backupProcess = require('./backupProcess') const port = 8080; const server = http.createServer((req, res) => { if (req.method === "GET") { @@ -38,11 +40,10 @@ const wsServer = new WebSocket.Server({ }); wsServer.on('connection', function connection(ws) { ws.on('message', message => { - var request = JSON.parse(message) - if (FROM_SUPERNODE) //TODO - backupProcess.processTaskFromSupernode(request, res => ws.send(res)) + if (message.startsWith(backupProcess.SUPERNODE_INDICATOR)) + backupProcess.processTask(message, ws); else - supernode.processRequestFromUser(request) + supernode.processRequestFromUser(JSON.parse(message)) .then(result => ws.send(JSON.parse(result[0]))) .catch(error => ws.send(error)) }); diff --git a/src/supernode.js b/src/supernode.js new file mode 100644 index 0000000..aa813f5 --- /dev/null +++ b/src/supernode.js @@ -0,0 +1,84 @@ +const db = require("./database") + +function processIncomingData(data) { + return new Promise((resolve, reject) => { + try { + data = JSON.parse(data); + } catch (error) { + return reject("Data not in JSON-Format") + } + let curTime = Date.now() + if (!data.time || data.time > curTime + floGlobals.supernodeConfig.delayDelta || + data.time < curTime - floGlobals.supernodeConfig.delayDelta) + return reject("Invalid Time"); + else { + let process; + if (data.request) + process = processRequestFromUser(gid, uid, data); + else if (data.message) + process = processDataFromUser(gid, uid, data); + //else if (data.edit) + // return processEditFromUser(gid, uid, data); + else if (data.mark) + process = processMarkFromUser(gid, uid, data); + //else if (data.delete) + // return processDeleteFromUser(gid, uid, data); + else + return reject("Invalid Data-format") + process.then(result => resolve(result)) + .catch(error => reject(error)) + } + + /* if (floGlobals.supernodeConfig.errorFeedback) + floSupernode.supernodeClientWS.send(`@${uid}#${gid}:${error.toString()}`) + */ + }) + + +} + +function processDataFromUser(data) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(data.receiverID)) + return reject("Invalid receiverID") + let closeNode = floSupernode.kBucket.closestNode(data.receiverID) + if (!floGlobals.serveList.includes(closeNode)) + return reject("Incorrect Supernode") + if (!floCrypto.validateAddr(data.receiverID)) + return reject("Invalid senderID") + if (data.senderID !== floCrypto.getFloID(data.pubKey)) + return reject("Invalid pubKey") + let hashcontent = ["receiverID", "time", "application", "type", "message", "comment"] + .map(d => data[d]).join("|") + if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey)) + return reject("Invalid signature") + + db.addData(closeNode, { + vectorClock: `${Date.now()}_${data.senderID}`, + senderID: data.senderID, + receiverID: data.receiverID, + time: data.time, + application: data.application, + type: data.type, + message: data.message, + comment: data.comment, + sign: data.sign, + pubKey: data.pubKey + }).then(result => resolve(result)) + .catch(error => reject(error)) + }) +} + +function processRequestFromUser(request) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(request.receiverID)) + return reject("Invalid receiverID"); + let closeNode = floSupernode.kBucket.closestNode(request.receiverID) + if (!floGlobals.serveList.includes(closeNode)) + return reject("Incorrect Supernode"); + + db.searchData(closeNode, request) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) +} \ No newline at end of file