update modules

This commit is contained in:
sairajzero 2021-07-17 04:34:50 +05:30
parent aa23c30287
commit b0080c5e42
4 changed files with 180 additions and 37 deletions

View File

@ -405,11 +405,13 @@ function sendStoredData(lastlogs, node) {
id: n,
status: true
}))
console.info(`START: ${snID} data sync(send) to ${node.id}`)
//TODO: efficiently handle large number of data instead of loading all into memory
result.forEach(d => node.send(packet_.constuct({
type: STORE_BACKUP_DATA,
data: d
})))
console.info(`END: ${snID} data sync(send) to ${node.id}`)
node.send(packet_.constuct({
type: DATA_SYNC,
id: n,
@ -420,6 +422,7 @@ function sendStoredData(lastlogs, node) {
}
}
//Indicate sync of data
function dataSyncIndication(snID, status, from) {
console.info(`${status ? 'START':'END'}: ${snID} data sync(receive) form ${from}`);
}
@ -427,20 +430,47 @@ function dataSyncIndication(snID, status, from) {
//Store (backup) data
function storeBackupData(data) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.stored.includes(closestNode))
if (_list.stored.includes(closestNode)) {
db.storeData(closestNode, data);
}
//tag (backup) data
function tagBackupData(data) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.stored.includes(closestNode))
db.storeTag(closestNode, data);
if (_list[closestNode] < backupDepth)
_nextNode.send(packet_.constuct({
type: STORE_BACKUP_DATA,
data: data
}));
}
}
//Tag (backup) data
function tagBackupData(data) {
let closestNode = kBucket.closestNode(data.receiverID);
if (_list.stored.includes(closestNode)) {
db.storeTag(closestNode, data);
if (_list[closestNode] < backupDepth)
_nextNode.send(packet_.constuct({
type: TAG_BACKUP_DATA,
data: data
}));
}
}
//Forward incoming to next node
function forwardToNextNode(mode, data) {
var modeMap = {
'TAG': TAG_BACKUP_DATA,
'DATA': STORE_BACKUP_DATA
}
if(mode in modeMap)
_nextNode.send(packet_.constuct({
type: modeMap[mode],
data: data
}));
}
//-----EXPORTS-----
module.exports = {
processTaskFromSupernode,
setBlockchainParameters,
forwardToNextNode,
SUPERNODE_INDICATOR
}

View File

@ -1,6 +1,32 @@
'use strict';
var mysql = require('mysql');
const Base_Tables = {
LastTxs: {
K: "CHAR(34) NOT NULL",
N: "INT NOT NULL",
PRIMARY: "KEY (K)"
},
Configs: {
K: "VARCHAR(64) NOT NULL",
V: "VARCHAR(512) NOT NULL",
PRIMARY: "KEY (K)"
},
SuperNodes: {
FLO_ID: "CHAR(34) NOT NULL",
PUB_KEY: "CHAR(66) NOT NULL",
URI: "VARCHAR(256) NOT NULL",
PRIMARY: "KEY (FLO_ID)"
},
Applications: {
APP_NAME: "VARCHAR(64) NOT NULL",
ADMIN_ID: "CHAR(34) NOT NULL",
SUB_ADMINS: "VARCHAR(MAX) NOT NULL",
PRIMARY: "KEY (APP_NAME)"
}
}
const H_struct = {
VECTOR_CLOCK: "vectorClock",
SENDER_ID: "senderID",
@ -32,9 +58,22 @@ const T_struct = {
function Database(user, password, dbname, host = 'localhost') {
const db = {};
db.createBase = function() {
return new Promise((resolve, reject) => {
let statements = []
for (t in Base_Tables)
statements.push("CREATE TABLE IF NOT EXISTS " + t + "( " +
Object.keys(Base_Tables[t]).map(a => a + " " + Base_Tables[t][a]).join(", ") + " )");
let query = s => new Promise((res, rej) => db.conn.query(s, (e, r) => e ? res(e) : rej(r)));
Promise.all(statements.forEach(s => query(s)))
.then(result => resolve(result))
.catch(error => reject(error))
})
}
db.createTable = function(snID) {
return new Promise((resolve, reject) => {
let statement1 = "CREATE TABLE " + snID + " (" +
let statement = "CREATE TABLE IF NOT EXISTS " + snID + " ( " +
H_struct.VECTOR_CLOCK + " VARCHAR(50) NOT NULL, " +
H_struct.SENDER_ID + " CHAR(34) NOT NULL, " +
H_struct.RECEIVER_ID + " CHAR(34) NOT NULL, " +
@ -51,8 +90,9 @@ function Database(user, password, dbname, host = 'localhost') {
T_struct.TAG_TIME + " INT, " +
T_struct.TAG_KEY + " CHAR(66), " +
T_struct.TAG_SIGN + " VARCHAR(160), " +
"PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")";
db.conn.query(statement1, (err, res) => err ? reject(err) : resolve(res));
"PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")" +
" )";
db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res));
})
}
@ -70,6 +110,9 @@ function Database(user, password, dbname, host = 'localhost') {
let statement = "INSERT INTO " + snID +
" (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " +
"VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ")";
data = Object.fromEntries(attr.map((a, i) => [
[a, values[i]]
]));
db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data));
})
}
@ -86,7 +129,7 @@ function Database(user, password, dbname, host = 'localhost') {
}
let attr = Object.keys(data);
let values = attr.map(a => data[a]);
data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data
data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data
let statement = "UPDATE " + snID +
" SET " + attr.map(a => a + "=?").join(", ") +
" WHERE " + H_struct.VECTOR_CLOCK + "=" + vectorClock;
@ -119,7 +162,7 @@ function Database(user, password, dbname, host = 'localhost') {
else
conditionArr.push(`${H_struct.SENDER_ID} = '${request.senderID}'`)
}
console.log(conditionArr);
//console.log(conditionArr);
let statement = "SELECT (" + Object.keys(H_struct).join(", ") + ")" +
" FROM " + snID +
" WHERE " + conditionArr.join(" AND ") +
@ -137,12 +180,11 @@ function Database(user, password, dbname, host = 'localhost') {
db.createGetLastLog = function(snID) {
return new Promise((resolve, reject) => {
db.createTable(snID).then(result => {}).catch(error => {})
.finally(_ => {
db.lastLogTime(snID)
.then(result => resolve(result))
.catch(error => reject(error))
})
db.createTable(snID).then(result => {
db.lastLogTime(snID)
.then(result => resolve(result))
.catch(error => reject(error))
}).catch(error => reject(error))
})
}

View File

@ -3,6 +3,7 @@ const WebSocket = require('ws')
const supernode = require('./supernode')
const backupProcess = require('./backupProcess')
const port = 8080;
const server = http.createServer((req, res) => {
if (req.method === "GET") {
//GET request (requesting data)
@ -24,10 +25,12 @@ const server = http.createServer((req, res) => {
//process the data storing
supernode.processIncomingData(data).then(result => {
res.end(result[0]);
if (result[1])
relayToBackupNodes(result[1]) //TODO
if (result[1]) {
if (result[1] === 'DATA')
sendToLiveRequests(result[0])
backupProcess.forwardToNextNode(result[1], result[0])
}
}).catch(error => res.end(error))
})
}
});
@ -39,13 +42,24 @@ const wsServer = new WebSocket.Server({
server
});
wsServer.on('connection', function connection(ws) {
ws.onmessage = function(evt){
ws.onmessage = function(evt) {
let message = evt.data;
if (message.startsWith(backupProcess.SUPERNODE_INDICATOR))
backupProcess.processTaskFromSupernode(message, ws);
else
supernode.processRequestFromUser(JSON.parse(message))
.then(result => ws.send(JSON.parse(result[0])))
.catch(error => ws.send(error))
else {
var request = JSON.parse(message);
supernode.processRequestFromUser(JSON.parse(message)) //TODO: set live request
.then(result => {
ws.send(JSON.parse(result[0]))
ws._liveReq = request;
}).catch(error => ws.send(error))
}
}
});
});
function sendToLiveRequests(data) {
wsServer.clients.forEach(ws => {
if (supernode.checkIfRequestSatisfy(ws._liveReq, data)) //TODO
ws.send(data)
})
}

View File

@ -1,4 +1,7 @@
const db = require("./database")
var db;
function setParameters(db){
db = db
}
function processIncomingData(data) {
return new Promise((resolve, reject) => {
@ -13,14 +16,14 @@ function processIncomingData(data) {
return reject("Invalid Time");
else {
let process;
if (data.request)
process = processRequestFromUser(gid, uid, data);
else if (data.message)
process = processDataFromUser(gid, uid, data);
if (data.request) //Request
process = processRequestFromUser(data.request);
else if (data.message) //Store data
process = processDataFromUser(data);
//else if (data.edit)
// return processEditFromUser(gid, uid, data);
else if (data.mark)
process = processMarkFromUser(gid, uid, data);
else if (data) //Tag data
process = processTagFromUser(gid, uid, data);
//else if (data.delete)
// return processDeleteFromUser(gid, uid, data);
else
@ -64,7 +67,7 @@ function processDataFromUser(data) {
comment: data.comment,
sign: data.sign,
pubKey: data.pubKey
}).then(result => resolve(result))
}).then(result => resolve([result, 'DATA']))
.catch(error => reject(error))
})
}
@ -76,9 +79,63 @@ function processRequestFromUser(request) {
let closeNode = floSupernode.kBucket.closestNode(request.receiverID)
if (!floGlobals.serveList.includes(closeNode))
return reject("Incorrect Supernode");
db.searchData(closeNode, request)
.then(result => resolve(result))
.then(result => resolve([result]))
.catch(error => reject(error))
})
}
function processTagFromUser(data) {
return new Promise((resolve, reject) => {
if (!floCrypto.validateAddr(data.receiverID))
return reject("Invalid receiverID")
if (!(data.application in floGlobals.appList))
return reject("Invalid application")
if (!floCrypto.validateAddr(data.requestorID) ||
!floGlobals.appSubAdmins.includes(data.requestorID))
return reject("Invalid requestorID")
if (data.requestorID !== floCrypto.getFloID(data.pubKey))
return reject("Invalid pubKey")
let closeNode = floSupernode.kBucket.closestNode(data.receiverID)
if (!floGlobals.serveList.includes(closeNode))
return reject("Incorrect Supernode")
let hashcontent = ["time", "application", "tag"]
.map(d => data[d]).join("|");
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
return reject("Invalid signature");
db.tagData(closeNode, data.vectorClock, data.tag, data.time, data.pubKey, data.sign)
.then(result => resolve([result, 'TAG']))
.catch(error => reject(error))
})
}
function checkIfRequestSatisfy(request, data) {
if (!request || request.mostRecent || request.receiverID !== data.receiverID)
return false;
if (request.atKey && request.atKey !== data.vectorClock)
return false;
if (request.lowerVectorClock && request.lowerVectorClock > data.vectorClock)
return false;
if (request.upperVectorClock && request.upperVectorClock < data.vectorClock)
return false;
if (request.application !== data.application)
return false;
if (request.comment && request.comment !== data.comment)
return false;
if (request.type && request.type !== data.type)
return false;
if (request.senderID) {
if (Array.isArray(request.senderID) && !request.senderID.includes(data.senderID))
return false;
else if (request.senderID !== data.senderID)
return false;
}
return true;
}
module.exports = {
setParameters,
checkIfRequestSatisfy,
processRequestFromUser,
processIncomingData
}