From b0080c5e42fc1291f4c30b087381c4682b20853f Mon Sep 17 00:00:00 2001 From: sairajzero Date: Sat, 17 Jul 2021 04:34:50 +0530 Subject: [PATCH] update modules --- src/backupProcess.js | 44 +++++++++++++++++++++---- src/database.js | 64 +++++++++++++++++++++++++++++------- src/server.js | 32 ++++++++++++------ src/supernode.js | 77 ++++++++++++++++++++++++++++++++++++++------ 4 files changed, 180 insertions(+), 37 deletions(-) diff --git a/src/backupProcess.js b/src/backupProcess.js index 0246166..0a59ad9 100644 --- a/src/backupProcess.js +++ b/src/backupProcess.js @@ -405,11 +405,13 @@ function sendStoredData(lastlogs, node) { id: n, status: true })) + console.info(`START: ${snID} data sync(send) to ${node.id}`) //TODO: efficiently handle large number of data instead of loading all into memory result.forEach(d => node.send(packet_.constuct({ type: STORE_BACKUP_DATA, data: d }))) + console.info(`END: ${snID} data sync(send) to ${node.id}`) node.send(packet_.constuct({ type: DATA_SYNC, id: n, @@ -420,6 +422,7 @@ function sendStoredData(lastlogs, node) { } } +//Indicate sync of data function dataSyncIndication(snID, status, from) { console.info(`${status ? 'START':'END'}: ${snID} data sync(receive) form ${from}`); } @@ -427,20 +430,47 @@ function dataSyncIndication(snID, status, from) { //Store (backup) data function storeBackupData(data) { let closestNode = kBucket.closestNode(data.receiverID); - if (_list.stored.includes(closestNode)) + if (_list.stored.includes(closestNode)) { db.storeData(closestNode, data); -} -//tag (backup) data -function tagBackupData(data) { - let closestNode = kBucket.closestNode(data.receiverID); - if (_list.stored.includes(closestNode)) - db.storeTag(closestNode, data); + if (_list[closestNode] < backupDepth) + _nextNode.send(packet_.constuct({ + type: STORE_BACKUP_DATA, + data: data + })); + } + } +//Tag (backup) data +function tagBackupData(data) { + let closestNode = kBucket.closestNode(data.receiverID); + if (_list.stored.includes(closestNode)) { + db.storeTag(closestNode, data); + if (_list[closestNode] < backupDepth) + _nextNode.send(packet_.constuct({ + type: TAG_BACKUP_DATA, + data: data + })); + } +} + +//Forward incoming to next node +function forwardToNextNode(mode, data) { + var modeMap = { + 'TAG': TAG_BACKUP_DATA, + 'DATA': STORE_BACKUP_DATA + } + if(mode in modeMap) + _nextNode.send(packet_.constuct({ + type: modeMap[mode], + data: data + })); +} //-----EXPORTS----- module.exports = { processTaskFromSupernode, setBlockchainParameters, + forwardToNextNode, SUPERNODE_INDICATOR } \ No newline at end of file diff --git a/src/database.js b/src/database.js index 5bf3d8b..73a5a93 100644 --- a/src/database.js +++ b/src/database.js @@ -1,6 +1,32 @@ 'use strict'; var mysql = require('mysql'); +const Base_Tables = { + LastTxs: { + K: "CHAR(34) NOT NULL", + N: "INT NOT NULL", + PRIMARY: "KEY (K)" + }, + Configs: { + K: "VARCHAR(64) NOT NULL", + V: "VARCHAR(512) NOT NULL", + PRIMARY: "KEY (K)" + }, + SuperNodes: { + FLO_ID: "CHAR(34) NOT NULL", + PUB_KEY: "CHAR(66) NOT NULL", + URI: "VARCHAR(256) NOT NULL", + PRIMARY: "KEY (FLO_ID)" + }, + Applications: { + APP_NAME: "VARCHAR(64) NOT NULL", + ADMIN_ID: "CHAR(34) NOT NULL", + SUB_ADMINS: "VARCHAR(MAX) NOT NULL", + PRIMARY: "KEY (APP_NAME)" + } +} + + const H_struct = { VECTOR_CLOCK: "vectorClock", SENDER_ID: "senderID", @@ -32,9 +58,22 @@ const T_struct = { function Database(user, password, dbname, host = 'localhost') { const db = {}; + db.createBase = function() { + return new Promise((resolve, reject) => { + let statements = [] + for (t in Base_Tables) + statements.push("CREATE TABLE IF NOT EXISTS " + t + "( " + + Object.keys(Base_Tables[t]).map(a => a + " " + Base_Tables[t][a]).join(", ") + " )"); + let query = s => new Promise((res, rej) => db.conn.query(s, (e, r) => e ? res(e) : rej(r))); + Promise.all(statements.forEach(s => query(s))) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) + } + db.createTable = function(snID) { return new Promise((resolve, reject) => { - let statement1 = "CREATE TABLE " + snID + " (" + + let statement = "CREATE TABLE IF NOT EXISTS " + snID + " ( " + H_struct.VECTOR_CLOCK + " VARCHAR(50) NOT NULL, " + H_struct.SENDER_ID + " CHAR(34) NOT NULL, " + H_struct.RECEIVER_ID + " CHAR(34) NOT NULL, " + @@ -51,8 +90,9 @@ function Database(user, password, dbname, host = 'localhost') { T_struct.TAG_TIME + " INT, " + T_struct.TAG_KEY + " CHAR(66), " + T_struct.TAG_SIGN + " VARCHAR(160), " + - "PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")"; - db.conn.query(statement1, (err, res) => err ? reject(err) : resolve(res)); + "PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")" + + " )"; + db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res)); }) } @@ -70,6 +110,9 @@ function Database(user, password, dbname, host = 'localhost') { let statement = "INSERT INTO " + snID + " (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " + "VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ")"; + data = Object.fromEntries(attr.map((a, i) => [ + [a, values[i]] + ])); db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data)); }) } @@ -86,7 +129,7 @@ function Database(user, password, dbname, host = 'localhost') { } let attr = Object.keys(data); let values = attr.map(a => data[a]); - data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data + data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data let statement = "UPDATE " + snID + " SET " + attr.map(a => a + "=?").join(", ") + " WHERE " + H_struct.VECTOR_CLOCK + "=" + vectorClock; @@ -119,7 +162,7 @@ function Database(user, password, dbname, host = 'localhost') { else conditionArr.push(`${H_struct.SENDER_ID} = '${request.senderID}'`) } - console.log(conditionArr); + //console.log(conditionArr); let statement = "SELECT (" + Object.keys(H_struct).join(", ") + ")" + " FROM " + snID + " WHERE " + conditionArr.join(" AND ") + @@ -137,12 +180,11 @@ function Database(user, password, dbname, host = 'localhost') { db.createGetLastLog = function(snID) { return new Promise((resolve, reject) => { - db.createTable(snID).then(result => {}).catch(error => {}) - .finally(_ => { - db.lastLogTime(snID) - .then(result => resolve(result)) - .catch(error => reject(error)) - }) + db.createTable(snID).then(result => { + db.lastLogTime(snID) + .then(result => resolve(result)) + .catch(error => reject(error)) + }).catch(error => reject(error)) }) } diff --git a/src/server.js b/src/server.js index 6bf1fbb..20707dd 100644 --- a/src/server.js +++ b/src/server.js @@ -3,6 +3,7 @@ const WebSocket = require('ws') const supernode = require('./supernode') const backupProcess = require('./backupProcess') const port = 8080; + const server = http.createServer((req, res) => { if (req.method === "GET") { //GET request (requesting data) @@ -24,10 +25,12 @@ const server = http.createServer((req, res) => { //process the data storing supernode.processIncomingData(data).then(result => { res.end(result[0]); - if (result[1]) - relayToBackupNodes(result[1]) //TODO + if (result[1]) { + if (result[1] === 'DATA') + sendToLiveRequests(result[0]) + backupProcess.forwardToNextNode(result[1], result[0]) + } }).catch(error => res.end(error)) - }) } }); @@ -39,13 +42,24 @@ const wsServer = new WebSocket.Server({ server }); wsServer.on('connection', function connection(ws) { - ws.onmessage = function(evt){ + ws.onmessage = function(evt) { let message = evt.data; if (message.startsWith(backupProcess.SUPERNODE_INDICATOR)) backupProcess.processTaskFromSupernode(message, ws); - else - supernode.processRequestFromUser(JSON.parse(message)) - .then(result => ws.send(JSON.parse(result[0]))) - .catch(error => ws.send(error)) + else { + var request = JSON.parse(message); + supernode.processRequestFromUser(JSON.parse(message)) //TODO: set live request + .then(result => { + ws.send(JSON.parse(result[0])) + ws._liveReq = request; + }).catch(error => ws.send(error)) + } } -}); \ No newline at end of file +}); + +function sendToLiveRequests(data) { + wsServer.clients.forEach(ws => { + if (supernode.checkIfRequestSatisfy(ws._liveReq, data)) //TODO + ws.send(data) + }) +} \ No newline at end of file diff --git a/src/supernode.js b/src/supernode.js index aa813f5..bcf065b 100644 --- a/src/supernode.js +++ b/src/supernode.js @@ -1,4 +1,7 @@ -const db = require("./database") +var db; +function setParameters(db){ + db = db +} function processIncomingData(data) { return new Promise((resolve, reject) => { @@ -13,14 +16,14 @@ function processIncomingData(data) { return reject("Invalid Time"); else { let process; - if (data.request) - process = processRequestFromUser(gid, uid, data); - else if (data.message) - process = processDataFromUser(gid, uid, data); + if (data.request) //Request + process = processRequestFromUser(data.request); + else if (data.message) //Store data + process = processDataFromUser(data); //else if (data.edit) // return processEditFromUser(gid, uid, data); - else if (data.mark) - process = processMarkFromUser(gid, uid, data); + else if (data) //Tag data + process = processTagFromUser(gid, uid, data); //else if (data.delete) // return processDeleteFromUser(gid, uid, data); else @@ -64,7 +67,7 @@ function processDataFromUser(data) { comment: data.comment, sign: data.sign, pubKey: data.pubKey - }).then(result => resolve(result)) + }).then(result => resolve([result, 'DATA'])) .catch(error => reject(error)) }) } @@ -76,9 +79,63 @@ function processRequestFromUser(request) { let closeNode = floSupernode.kBucket.closestNode(request.receiverID) if (!floGlobals.serveList.includes(closeNode)) return reject("Incorrect Supernode"); - db.searchData(closeNode, request) - .then(result => resolve(result)) + .then(result => resolve([result])) .catch(error => reject(error)) }) +} + +function processTagFromUser(data) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(data.receiverID)) + return reject("Invalid receiverID") + if (!(data.application in floGlobals.appList)) + return reject("Invalid application") + if (!floCrypto.validateAddr(data.requestorID) || + !floGlobals.appSubAdmins.includes(data.requestorID)) + return reject("Invalid requestorID") + if (data.requestorID !== floCrypto.getFloID(data.pubKey)) + return reject("Invalid pubKey") + let closeNode = floSupernode.kBucket.closestNode(data.receiverID) + if (!floGlobals.serveList.includes(closeNode)) + return reject("Incorrect Supernode") + let hashcontent = ["time", "application", "tag"] + .map(d => data[d]).join("|"); + if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey)) + return reject("Invalid signature"); + db.tagData(closeNode, data.vectorClock, data.tag, data.time, data.pubKey, data.sign) + .then(result => resolve([result, 'TAG'])) + .catch(error => reject(error)) + }) +} + +function checkIfRequestSatisfy(request, data) { + if (!request || request.mostRecent || request.receiverID !== data.receiverID) + return false; + if (request.atKey && request.atKey !== data.vectorClock) + return false; + if (request.lowerVectorClock && request.lowerVectorClock > data.vectorClock) + return false; + if (request.upperVectorClock && request.upperVectorClock < data.vectorClock) + return false; + if (request.application !== data.application) + return false; + if (request.comment && request.comment !== data.comment) + return false; + if (request.type && request.type !== data.type) + return false; + if (request.senderID) { + if (Array.isArray(request.senderID) && !request.senderID.includes(data.senderID)) + return false; + else if (request.senderID !== data.senderID) + return false; + } + return true; +} + +module.exports = { + setParameters, + checkIfRequestSatisfy, + processRequestFromUser, + processIncomingData } \ No newline at end of file