From 968e911b57cdb5ae5bd305cc13c1287bc478094d Mon Sep 17 00:00:00 2001 From: sairajzero Date: Sun, 18 Jul 2021 05:08:33 +0530 Subject: [PATCH] update supernode modules --- src/client.js | 42 +++++++------- src/database.js | 148 ++++++++++++++++++++++++++++++++++++++++-------- src/intra.js | 52 +++++++---------- 3 files changed, 165 insertions(+), 77 deletions(-) diff --git a/src/client.js b/src/client.js index bcf065b..6f6de78 100644 --- a/src/client.js +++ b/src/client.js @@ -1,7 +1,4 @@ -var db; -function setParameters(db){ - db = db -} +var DB; //container for database function processIncomingData(data) { return new Promise((resolve, reject) => { @@ -11,8 +8,8 @@ function processIncomingData(data) { return reject("Data not in JSON-Format") } let curTime = Date.now() - if (!data.time || data.time > curTime + floGlobals.supernodeConfig.delayDelta || - data.time < curTime - floGlobals.supernodeConfig.delayDelta) + if (!data.time || data.time > curTime + floGlobals.sn_config.delayDelta || + data.time < curTime - floGlobals.sn_config.delayDelta) return reject("Invalid Time"); else { let process; @@ -20,31 +17,27 @@ function processIncomingData(data) { process = processRequestFromUser(data.request); else if (data.message) //Store data process = processDataFromUser(data); - //else if (data.edit) - // return processEditFromUser(gid, uid, data); else if (data) //Tag data process = processTagFromUser(gid, uid, data); - //else if (data.delete) - // return processDeleteFromUser(gid, uid, data); + /* + else if (data.edit) + return processEditFromUser(gid, uid, data); + else if (data.delete) + return processDeleteFromUser(gid, uid, data); + */ else return reject("Invalid Data-format") process.then(result => resolve(result)) .catch(error => reject(error)) } - - /* if (floGlobals.supernodeConfig.errorFeedback) - floSupernode.supernodeClientWS.send(`@${uid}#${gid}:${error.toString()}`) - */ }) - - } function processDataFromUser(data) { return new Promise((resolve, reject) => { if (!floCrypto.validateAddr(data.receiverID)) return reject("Invalid receiverID") - let closeNode = floSupernode.kBucket.closestNode(data.receiverID) + let closeNode = kBucket.closestNode(data.receiverID) if (!floGlobals.serveList.includes(closeNode)) return reject("Incorrect Supernode") if (!floCrypto.validateAddr(data.receiverID)) @@ -56,7 +49,7 @@ function processDataFromUser(data) { if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey)) return reject("Invalid signature") - db.addData(closeNode, { + DB.addData(closeNode, { vectorClock: `${Date.now()}_${data.senderID}`, senderID: data.senderID, receiverID: data.receiverID, @@ -76,10 +69,10 @@ function processRequestFromUser(request) { return new Promise((resolve, reject) => { if (!floCrypto.validateAddr(request.receiverID)) return reject("Invalid receiverID"); - let closeNode = floSupernode.kBucket.closestNode(request.receiverID) + let closeNode = kBucket.closestNode(request.receiverID) if (!floGlobals.serveList.includes(closeNode)) return reject("Incorrect Supernode"); - db.searchData(closeNode, request) + DB.searchData(closeNode, request) .then(result => resolve([result])) .catch(error => reject(error)) }) @@ -96,14 +89,14 @@ function processTagFromUser(data) { return reject("Invalid requestorID") if (data.requestorID !== floCrypto.getFloID(data.pubKey)) return reject("Invalid pubKey") - let closeNode = floSupernode.kBucket.closestNode(data.receiverID) + let closeNode = kBucket.closestNode(data.receiverID) if (!floGlobals.serveList.includes(closeNode)) return reject("Incorrect Supernode") let hashcontent = ["time", "application", "tag"] .map(d => data[d]).join("|"); if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey)) return reject("Invalid signature"); - db.tagData(closeNode, data.vectorClock, data.tag, data.time, data.pubKey, data.sign) + DB.tagData(closeNode, data.vectorClock, data.tag, data.time, data.pubKey, data.sign) .then(result => resolve([result, 'TAG'])) .catch(error => reject(error)) }) @@ -137,5 +130,8 @@ module.exports = { setParameters, checkIfRequestSatisfy, processRequestFromUser, - processIncomingData + processIncomingData, + set DB(db){ + DB = db + } } \ No newline at end of file diff --git a/src/database.js b/src/database.js index 73a5a93..7b2930e 100644 --- a/src/database.js +++ b/src/database.js @@ -3,14 +3,14 @@ var mysql = require('mysql'); const Base_Tables = { LastTxs: { - K: "CHAR(34) NOT NULL", + ID: "CHAR(34) NOT NULL", N: "INT NOT NULL", - PRIMARY: "KEY (K)" + PRIMARY: "KEY (ID)" }, Configs: { - K: "VARCHAR(64) NOT NULL", - V: "VARCHAR(512) NOT NULL", - PRIMARY: "KEY (K)" + NAME: "VARCHAR(64) NOT NULL", + VAL: "VARCHAR(512) NOT NULL", + PRIMARY: "KEY (NAME)" }, SuperNodes: { FLO_ID: "CHAR(34) NOT NULL", @@ -21,12 +21,11 @@ const Base_Tables = { Applications: { APP_NAME: "VARCHAR(64) NOT NULL", ADMIN_ID: "CHAR(34) NOT NULL", - SUB_ADMINS: "VARCHAR(MAX) NOT NULL", + SUB_ADMINS: "VARCHAR(MAX)", PRIMARY: "KEY (APP_NAME)" } } - const H_struct = { VECTOR_CLOCK: "vectorClock", SENDER_ID: "senderID", @@ -57,20 +56,103 @@ const T_struct = { function Database(user, password, dbname, host = 'localhost') { const db = {}; + db.query = (s, v) => new Promise((res, rej) => { + const fn = (e, r) => e ? rej(e) : res(r) + v ? db.conn.query(s, v, fn) : db.conn.query(s, fn) + }); db.createBase = function() { return new Promise((resolve, reject) => { let statements = [] - for (t in Base_Tables) + for (let t in Base_Tables) statements.push("CREATE TABLE IF NOT EXISTS " + t + "( " + Object.keys(Base_Tables[t]).map(a => a + " " + Base_Tables[t][a]).join(", ") + " )"); - let query = s => new Promise((res, rej) => db.conn.query(s, (e, r) => e ? res(e) : rej(r))); - Promise.all(statements.forEach(s => query(s))) + Promise.all(statements.forEach(s => db.query(s))) .then(result => resolve(result)) .catch(error => reject(error)) }) } + db.setLastTx = function(id, n) { + return new Promise((resolve, reject) => { + let statement = "INSERT INTO LastTxs (ID, N) VALUES (?, ?)" + + " ON DUPLICATE KEY UPDATE N=?"; + db.query(statement, [id, n, n]) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) + } + + db.setConfig = function(name, value) { + return new Promise((resolve, reject) => { + let statement = "INSERT INTO Configs (NAME, VAL) VALUES (?, ?)" + + " ON DUPLICATE KEY UPDATE VAL=?"; + db.query(statement, [name, value, value]) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) + } + + db.addSuperNode = function(id, pubKey, uri) { + let statement = "INSERT INTO SuperNodes (FLO_ID, PUB_KEY, URI) VALUES (?, ?, ?)" + + " ON DUPLICATE KEY UPDATE URI=?"; + db.query(statement, [id, pubKey, uri, uri]) + .then(result => resolve(result)) + .catch(error => reject(error)) + } + + db.rmSuperNode = function(id) { + let statement = "DELETE FROM SuperNodes" + + " WHERE FLO_ID=?"; + db.query(statement, id) + .then(result => resolve(result)) + .catch(error => reject(error)) + } + + db.setSubAdmin = function(appName, subAdmins) { + let statement = "UPDATE Applications" + + " SET SUB_ADMINS=?" + + " WHERE APP_NAME=?"; + db.query(statement, [subAdmins.join(","), appName]) + .then(result => resolve(result)) + .catch(error => reject(error)) + } + + db.addApp = function(appName, adminID) { + let statement = "INSERT INTO Applications (APP_NAME, ADMIN_ID) VALUES (?, ?)" + + " ON DUPLICATE KEY UPDATE ADMIN_ID=?"; + db.query(statement, [appName, adminID, adminID]) + .then(result => resolve(result)) + .catch(error => reject(error)) + } + + db.rmApp = function(appName) { + let statement = "DELETE FROM Applications" + + " WHERE APP_NAME=" + appName; + db.query(statement) + .then(result => resolve(result)) + .catch(error => reject(error)) + } + + db.getBase = function() { + return new Promise((resolve, reject) => { + let tables = Object.keys(Base_Tables); + Promise.all(tables.forEach(t => db.query("SELECT * FROM " + t))).then(result => { + let tmp = Object.fromEntries(tables.map((t, i) => [t, result[i]])) + result = {}; + result.lastTx = Object.fromEntries(tmp.LastTxs.map(a => [a.ID, a.N])); + result.sn_config = Object.fromEntries(tmp.Configs.map(a => [a.NAME, a.VAL])); + result.appList = Object.fromEntries(tmp.Applications.map(a => [a.APP_NAME, a.ADMIN_ID])); + result.appSubAdmins = Object.fromEntries(tmp.Applications.map(a => [a.APP_NAME, a.SUB_ADMINS.split(",")])) + result.supernodes = Object.fromEntries(tmp.SuperNodes.map(a.FLO_ID, { + pubKey: a.PUB_KEY, + uri: a.URI + })) + resolve(result) + }).catch(error => reject(error)) + }) + } + db.createTable = function(snID) { return new Promise((resolve, reject) => { let statement = "CREATE TABLE IF NOT EXISTS " + snID + " ( " + @@ -92,14 +174,18 @@ function Database(user, password, dbname, host = 'localhost') { T_struct.TAG_SIGN + " VARCHAR(160), " + "PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")" + " )"; - db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)); + db.query(statement) + .then(result => resolve(result)) + .catch(error => reject(error)) }) } db.dropTable = function(snID) { return new Promise((resolve, reject) => { let statement = "DROP TABLE " + snID; - db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)); + db.query(statement) + .then(result => resolve(result)) + .catch(error => reject(error)) }) } @@ -113,7 +199,9 @@ function Database(user, password, dbname, host = 'localhost') { data = Object.fromEntries(attr.map((a, i) => [ [a, values[i]] ])); - db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data)); + db.query(statement, values) + .then(result => resolve(data)) + .catch(error => reject(error)) }) } @@ -124,16 +212,17 @@ function Database(user, password, dbname, host = 'localhost') { [T_struct.TAG_TIME]: tagTime, [T_struct.TAG_KEY]: tagKey, [T_struct.TAG_SIGN]: tagSign, - [L_struct.LOG_TIME]: Date.now(), - [H_struct.VECTOR_CLOCK]: vectorClock + [L_struct.LOG_TIME]: Date.now() } let attr = Object.keys(data); - let values = attr.map(a => data[a]); + let values = attr.map(a => data[a]).concat(vectorClock); data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data let statement = "UPDATE " + snID + " SET " + attr.map(a => a + "=?").join(", ") + - " WHERE " + H_struct.VECTOR_CLOCK + "=" + vectorClock; - db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data)); + " WHERE " + H_struct.VECTOR_CLOCK + "=?"; + db.query(statement, values) + .then(result => resolve(data)) + .catch(error => reject(error)) }) } @@ -163,18 +252,23 @@ function Database(user, password, dbname, host = 'localhost') { conditionArr.push(`${H_struct.SENDER_ID} = '${request.senderID}'`) } //console.log(conditionArr); - let statement = "SELECT (" + Object.keys(H_struct).join(", ") + ")" + + let attr = Object.keys(H_struct).concat(Object.keys(B_struct)) + let statement = "SELECT (" + attr.join(", ") + ")" + " FROM " + snID + " WHERE " + conditionArr.join(" AND ") + request.mostRecent ? "LIMIT 1" : (" ORDER BY " + H_struct.VECTOR_CLOCK); - db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)); + db.query(statement) + .then(result => resolve(result)) + .catch(error => reject(error)) }) } db.lastLogTime = function(snID) { return new Promise((resolve, reject) => { let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM " + snID; - db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)) + db.query(statement) + .then(result => resolve(result)) + .catch(error => reject(error)) }) } @@ -193,7 +287,9 @@ function Database(user, password, dbname, host = 'localhost') { let statement = "SELECT * FROM " + snID + " WHERE " + L_struct.LOG_TIME + ">=" + logtime + " ORDER BY " + L_struct.LOG_TIME; - db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)) + db.query(statement) + .then(result => resolve(result)) + .catch(error => reject(error)) }) } @@ -207,7 +303,9 @@ function Database(user, password, dbname, host = 'localhost') { " (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " + "VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ") " + "ON DUPLICATE KEY UPDATE " + u_attr.map(a => a + "=?").join(", "); - db.conn.query(statement, values.concat(u_values), (err, res) => err ? reject(err) : resolve(res)); + db.query(statement, values.concat(u_values)) + .then(result => resolve(data)) + .catch(error => reject(error)) }) } @@ -218,7 +316,9 @@ function Database(user, password, dbname, host = 'localhost') { let statement = "UPDATE " + snID + " SET " + attr.map(a => a + "=?").join(", ") + " WHERE " + H_struct.VECTOR_CLOCK + "=" + data[H_struct.VECTOR_CLOCK]; - db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data)); + db.query(statement, values) + .then(result => resolve(data)) + .catch(error => reject(error)) }) } diff --git a/src/intra.js b/src/intra.js index 6a2f48b..7b87a0b 100644 --- a/src/intra.js +++ b/src/intra.js @@ -1,7 +1,4 @@ 'use strict'; -const { - stat -} = require('fs'); const WebSocket = require('ws'); //CONSTANTS @@ -24,8 +21,7 @@ const SUPERNODE_INDICATOR = '$', BACKUP_HANDSHAKE_INIT = "handshakeInitate", BACKUP_HANDSHAKE_END = "handshakeEnd"; -//const backupNodes = []; -var backupDepth, supernodeList, appList, kBucket; +var DB //container for database //List of node backups stored const _list = {}; @@ -143,31 +139,22 @@ packet_.s = d => [JSON.stringify(d.message), d.time].join("|"); packet_.parse = function(str) { let packet = JSON.parse(str.substring(SUPERNODE_INDICATOR.length)) let curTime = Date.now(); - if (packet.time > curTime - delayTime && - floCrypto.verifySign(this.s(packet), packet.sign, supernodeList[packet.from].pubKey)) { + if (packet.time > curTime - floGlobals.sn_config.delayDelta && + floCrypto.verifySign(this.s(packet), packet.sign, floGlobals.supernodes[packet.from].pubKey)) { if (!Array.isArray(packet.message)) packet.message = [packet.message]; return packet; } } -//Set parameters from blockchain -function setBlockchainParameters(depth, supernodes, apps, KB, delay) { - backupDepth = depth; - supernodeList = supernodes; - appList = apps; - kBucket = KB; - delayTime = delay; -} - //-----NODE CONNECTORS (WEBSOCKET)----- //Connect to Node websocket function connectToNode(snID) { return new Promise((resolve, reject) => { - if (!(snID in supernodeList)) + if (!(snID in floGlobals.supernodes)) return reject(`${snID} is not a supernode`) - const ws = new WebSocket("wss://" + supernodeList[nextNodeID].uri + "/"); + const ws = new WebSocket("wss://" + floGlobals.supernodes[nextNodeID].uri + "/"); ws.on("error", () => reject(`${snID} is offline`)); ws.on('open', () => resolve(ws)); }) @@ -176,7 +163,7 @@ function connectToNode(snID) { //Connect to Node websocket thats online function connectToActiveNode(snID, reverse = false) { return new Promise((resolve, reject) => { - if (!(snID in supernodeList)) + if (!(snID in floGlobals.supernodes)) return reject(`${snID} is not a supernode`) if (snID === myFloID) return reject(`Reached end of circle. Next node avaiable is self`) @@ -335,7 +322,7 @@ function handshakeMid(id, ws) { }); if (!req_sync.length && !new_order.length) return; //No order change and no need for any data sync - Promise.all(req_sync.forEach(n => db.createGetLastLog(n))).then(result => { + Promise.all(req_sync.forEach(n => DB.createGetLastLog(n))).then(result => { let tasks = []; if (req_sync.length) { tasks.push({ @@ -373,8 +360,8 @@ function reconnectNextNode() { //Case: No other node is online console.error(error); //Serve all nodes - for (let sn in supernodeList) - db.createTable(sn) + for (let sn in floGlobals.supernodes) + DB.createTable(sn) .then(result => _list[sn] = 0) .catch(error => console.error(error)) }) @@ -388,8 +375,8 @@ function orderBackup(order) { let cur_serve = kBucket.innerNodes(_prevNode.id, myFloID); for (let n in order) { if (order[n] + 1 !== _list[n]) { - if (order[n] >= backupDepth) - db.dropTable(n).then(_ => null) + if (order[n] >= floGlobals.sn_config.backupDepth) + DB.dropTable(n).then(_ => null) .catch(error => console.error(error)) .finally(_ => _list.delete(n)) else if (_list[n] !== 0 || !cur_serve.includes(n)) { @@ -410,7 +397,7 @@ function orderBackup(order) { function sendStoredData(lastlogs, node) { for (n in lastlogs) { if (_list.stored.includes(n)) { - db.getData(n, lastlogs[n]).then(result => { + DB.getData(n, lastlogs[n]).then(result => { node.send(packet_.constuct({ type: DATA_SYNC, id: n, @@ -442,8 +429,9 @@ function dataSyncIndication(snID, status, from) { function storeBackupData(data, from, packet) { let closestNode = kBucket.closestNode(data.receiverID); if (_list.stored.includes(closestNode)) { - db.storeData(closestNode, data); - if (_list[closestNode] < backupDepth && _nextNode.id !== from) + DB.storeData(closestNode, data); + if (_list[closestNode] < floGlobals.sn_config.backupDepth && + _nextNode.id !== from) _nextNode.send(packet); } } @@ -452,8 +440,9 @@ function storeBackupData(data, from, packet) { function tagBackupData(data, from, packet) { let closestNode = kBucket.closestNode(data.receiverID); if (_list.stored.includes(closestNode)) { - db.storeTag(closestNode, data); - if (_list[closestNode] < backupDepth && _nextNode.id !== from) + DB.storeTag(closestNode, data); + if (_list[closestNode] < floGlobals.sn_config.backupDepth && + _nextNode.id !== from) _nextNode.send(packet); } } @@ -476,5 +465,8 @@ module.exports = { processTaskFromSupernode, setBlockchainParameters, forwardToNextNode, - SUPERNODE_INDICATOR + SUPERNODE_INDICATOR, + set DB(db){ + DB = db + } } \ No newline at end of file