update supernode modules
This commit is contained in:
parent
71846daf8a
commit
968e911b57
@ -1,7 +1,4 @@
|
|||||||
var db;
|
var DB; //container for database
|
||||||
function setParameters(db){
|
|
||||||
db = db
|
|
||||||
}
|
|
||||||
|
|
||||||
function processIncomingData(data) {
|
function processIncomingData(data) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
@ -11,8 +8,8 @@ function processIncomingData(data) {
|
|||||||
return reject("Data not in JSON-Format")
|
return reject("Data not in JSON-Format")
|
||||||
}
|
}
|
||||||
let curTime = Date.now()
|
let curTime = Date.now()
|
||||||
if (!data.time || data.time > curTime + floGlobals.supernodeConfig.delayDelta ||
|
if (!data.time || data.time > curTime + floGlobals.sn_config.delayDelta ||
|
||||||
data.time < curTime - floGlobals.supernodeConfig.delayDelta)
|
data.time < curTime - floGlobals.sn_config.delayDelta)
|
||||||
return reject("Invalid Time");
|
return reject("Invalid Time");
|
||||||
else {
|
else {
|
||||||
let process;
|
let process;
|
||||||
@ -20,31 +17,27 @@ function processIncomingData(data) {
|
|||||||
process = processRequestFromUser(data.request);
|
process = processRequestFromUser(data.request);
|
||||||
else if (data.message) //Store data
|
else if (data.message) //Store data
|
||||||
process = processDataFromUser(data);
|
process = processDataFromUser(data);
|
||||||
//else if (data.edit)
|
|
||||||
// return processEditFromUser(gid, uid, data);
|
|
||||||
else if (data) //Tag data
|
else if (data) //Tag data
|
||||||
process = processTagFromUser(gid, uid, data);
|
process = processTagFromUser(gid, uid, data);
|
||||||
//else if (data.delete)
|
/*
|
||||||
// return processDeleteFromUser(gid, uid, data);
|
else if (data.edit)
|
||||||
|
return processEditFromUser(gid, uid, data);
|
||||||
|
else if (data.delete)
|
||||||
|
return processDeleteFromUser(gid, uid, data);
|
||||||
|
*/
|
||||||
else
|
else
|
||||||
return reject("Invalid Data-format")
|
return reject("Invalid Data-format")
|
||||||
process.then(result => resolve(result))
|
process.then(result => resolve(result))
|
||||||
.catch(error => reject(error))
|
.catch(error => reject(error))
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if (floGlobals.supernodeConfig.errorFeedback)
|
|
||||||
floSupernode.supernodeClientWS.send(`@${uid}#${gid}:${error.toString()}`)
|
|
||||||
*/
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
function processDataFromUser(data) {
|
function processDataFromUser(data) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (!floCrypto.validateAddr(data.receiverID))
|
if (!floCrypto.validateAddr(data.receiverID))
|
||||||
return reject("Invalid receiverID")
|
return reject("Invalid receiverID")
|
||||||
let closeNode = floSupernode.kBucket.closestNode(data.receiverID)
|
let closeNode = kBucket.closestNode(data.receiverID)
|
||||||
if (!floGlobals.serveList.includes(closeNode))
|
if (!floGlobals.serveList.includes(closeNode))
|
||||||
return reject("Incorrect Supernode")
|
return reject("Incorrect Supernode")
|
||||||
if (!floCrypto.validateAddr(data.receiverID))
|
if (!floCrypto.validateAddr(data.receiverID))
|
||||||
@ -56,7 +49,7 @@ function processDataFromUser(data) {
|
|||||||
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
|
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
|
||||||
return reject("Invalid signature")
|
return reject("Invalid signature")
|
||||||
|
|
||||||
db.addData(closeNode, {
|
DB.addData(closeNode, {
|
||||||
vectorClock: `${Date.now()}_${data.senderID}`,
|
vectorClock: `${Date.now()}_${data.senderID}`,
|
||||||
senderID: data.senderID,
|
senderID: data.senderID,
|
||||||
receiverID: data.receiverID,
|
receiverID: data.receiverID,
|
||||||
@ -76,10 +69,10 @@ function processRequestFromUser(request) {
|
|||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (!floCrypto.validateAddr(request.receiverID))
|
if (!floCrypto.validateAddr(request.receiverID))
|
||||||
return reject("Invalid receiverID");
|
return reject("Invalid receiverID");
|
||||||
let closeNode = floSupernode.kBucket.closestNode(request.receiverID)
|
let closeNode = kBucket.closestNode(request.receiverID)
|
||||||
if (!floGlobals.serveList.includes(closeNode))
|
if (!floGlobals.serveList.includes(closeNode))
|
||||||
return reject("Incorrect Supernode");
|
return reject("Incorrect Supernode");
|
||||||
db.searchData(closeNode, request)
|
DB.searchData(closeNode, request)
|
||||||
.then(result => resolve([result]))
|
.then(result => resolve([result]))
|
||||||
.catch(error => reject(error))
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
@ -96,14 +89,14 @@ function processTagFromUser(data) {
|
|||||||
return reject("Invalid requestorID")
|
return reject("Invalid requestorID")
|
||||||
if (data.requestorID !== floCrypto.getFloID(data.pubKey))
|
if (data.requestorID !== floCrypto.getFloID(data.pubKey))
|
||||||
return reject("Invalid pubKey")
|
return reject("Invalid pubKey")
|
||||||
let closeNode = floSupernode.kBucket.closestNode(data.receiverID)
|
let closeNode = kBucket.closestNode(data.receiverID)
|
||||||
if (!floGlobals.serveList.includes(closeNode))
|
if (!floGlobals.serveList.includes(closeNode))
|
||||||
return reject("Incorrect Supernode")
|
return reject("Incorrect Supernode")
|
||||||
let hashcontent = ["time", "application", "tag"]
|
let hashcontent = ["time", "application", "tag"]
|
||||||
.map(d => data[d]).join("|");
|
.map(d => data[d]).join("|");
|
||||||
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
|
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
|
||||||
return reject("Invalid signature");
|
return reject("Invalid signature");
|
||||||
db.tagData(closeNode, data.vectorClock, data.tag, data.time, data.pubKey, data.sign)
|
DB.tagData(closeNode, data.vectorClock, data.tag, data.time, data.pubKey, data.sign)
|
||||||
.then(result => resolve([result, 'TAG']))
|
.then(result => resolve([result, 'TAG']))
|
||||||
.catch(error => reject(error))
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
@ -137,5 +130,8 @@ module.exports = {
|
|||||||
setParameters,
|
setParameters,
|
||||||
checkIfRequestSatisfy,
|
checkIfRequestSatisfy,
|
||||||
processRequestFromUser,
|
processRequestFromUser,
|
||||||
processIncomingData
|
processIncomingData,
|
||||||
|
set DB(db){
|
||||||
|
DB = db
|
||||||
|
}
|
||||||
}
|
}
|
||||||
148
src/database.js
148
src/database.js
@ -3,14 +3,14 @@ var mysql = require('mysql');
|
|||||||
|
|
||||||
const Base_Tables = {
|
const Base_Tables = {
|
||||||
LastTxs: {
|
LastTxs: {
|
||||||
K: "CHAR(34) NOT NULL",
|
ID: "CHAR(34) NOT NULL",
|
||||||
N: "INT NOT NULL",
|
N: "INT NOT NULL",
|
||||||
PRIMARY: "KEY (K)"
|
PRIMARY: "KEY (ID)"
|
||||||
},
|
},
|
||||||
Configs: {
|
Configs: {
|
||||||
K: "VARCHAR(64) NOT NULL",
|
NAME: "VARCHAR(64) NOT NULL",
|
||||||
V: "VARCHAR(512) NOT NULL",
|
VAL: "VARCHAR(512) NOT NULL",
|
||||||
PRIMARY: "KEY (K)"
|
PRIMARY: "KEY (NAME)"
|
||||||
},
|
},
|
||||||
SuperNodes: {
|
SuperNodes: {
|
||||||
FLO_ID: "CHAR(34) NOT NULL",
|
FLO_ID: "CHAR(34) NOT NULL",
|
||||||
@ -21,12 +21,11 @@ const Base_Tables = {
|
|||||||
Applications: {
|
Applications: {
|
||||||
APP_NAME: "VARCHAR(64) NOT NULL",
|
APP_NAME: "VARCHAR(64) NOT NULL",
|
||||||
ADMIN_ID: "CHAR(34) NOT NULL",
|
ADMIN_ID: "CHAR(34) NOT NULL",
|
||||||
SUB_ADMINS: "VARCHAR(MAX) NOT NULL",
|
SUB_ADMINS: "VARCHAR(MAX)",
|
||||||
PRIMARY: "KEY (APP_NAME)"
|
PRIMARY: "KEY (APP_NAME)"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
const H_struct = {
|
const H_struct = {
|
||||||
VECTOR_CLOCK: "vectorClock",
|
VECTOR_CLOCK: "vectorClock",
|
||||||
SENDER_ID: "senderID",
|
SENDER_ID: "senderID",
|
||||||
@ -57,20 +56,103 @@ const T_struct = {
|
|||||||
|
|
||||||
function Database(user, password, dbname, host = 'localhost') {
|
function Database(user, password, dbname, host = 'localhost') {
|
||||||
const db = {};
|
const db = {};
|
||||||
|
db.query = (s, v) => new Promise((res, rej) => {
|
||||||
|
const fn = (e, r) => e ? rej(e) : res(r)
|
||||||
|
v ? db.conn.query(s, v, fn) : db.conn.query(s, fn)
|
||||||
|
});
|
||||||
|
|
||||||
db.createBase = function() {
|
db.createBase = function() {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
let statements = []
|
let statements = []
|
||||||
for (t in Base_Tables)
|
for (let t in Base_Tables)
|
||||||
statements.push("CREATE TABLE IF NOT EXISTS " + t + "( " +
|
statements.push("CREATE TABLE IF NOT EXISTS " + t + "( " +
|
||||||
Object.keys(Base_Tables[t]).map(a => a + " " + Base_Tables[t][a]).join(", ") + " )");
|
Object.keys(Base_Tables[t]).map(a => a + " " + Base_Tables[t][a]).join(", ") + " )");
|
||||||
let query = s => new Promise((res, rej) => db.conn.query(s, (e, r) => e ? res(e) : rej(r)));
|
Promise.all(statements.forEach(s => db.query(s)))
|
||||||
Promise.all(statements.forEach(s => query(s)))
|
|
||||||
.then(result => resolve(result))
|
.then(result => resolve(result))
|
||||||
.catch(error => reject(error))
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
db.setLastTx = function(id, n) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
let statement = "INSERT INTO LastTxs (ID, N) VALUES (?, ?)" +
|
||||||
|
" ON DUPLICATE KEY UPDATE N=?";
|
||||||
|
db.query(statement, [id, n, n])
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
db.setConfig = function(name, value) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
let statement = "INSERT INTO Configs (NAME, VAL) VALUES (?, ?)" +
|
||||||
|
" ON DUPLICATE KEY UPDATE VAL=?";
|
||||||
|
db.query(statement, [name, value, value])
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
db.addSuperNode = function(id, pubKey, uri) {
|
||||||
|
let statement = "INSERT INTO SuperNodes (FLO_ID, PUB_KEY, URI) VALUES (?, ?, ?)" +
|
||||||
|
" ON DUPLICATE KEY UPDATE URI=?";
|
||||||
|
db.query(statement, [id, pubKey, uri, uri])
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
|
}
|
||||||
|
|
||||||
|
db.rmSuperNode = function(id) {
|
||||||
|
let statement = "DELETE FROM SuperNodes" +
|
||||||
|
" WHERE FLO_ID=?";
|
||||||
|
db.query(statement, id)
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
|
}
|
||||||
|
|
||||||
|
db.setSubAdmin = function(appName, subAdmins) {
|
||||||
|
let statement = "UPDATE Applications" +
|
||||||
|
" SET SUB_ADMINS=?" +
|
||||||
|
" WHERE APP_NAME=?";
|
||||||
|
db.query(statement, [subAdmins.join(","), appName])
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
|
}
|
||||||
|
|
||||||
|
db.addApp = function(appName, adminID) {
|
||||||
|
let statement = "INSERT INTO Applications (APP_NAME, ADMIN_ID) VALUES (?, ?)" +
|
||||||
|
" ON DUPLICATE KEY UPDATE ADMIN_ID=?";
|
||||||
|
db.query(statement, [appName, adminID, adminID])
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
|
}
|
||||||
|
|
||||||
|
db.rmApp = function(appName) {
|
||||||
|
let statement = "DELETE FROM Applications" +
|
||||||
|
" WHERE APP_NAME=" + appName;
|
||||||
|
db.query(statement)
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
|
}
|
||||||
|
|
||||||
|
db.getBase = function() {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
let tables = Object.keys(Base_Tables);
|
||||||
|
Promise.all(tables.forEach(t => db.query("SELECT * FROM " + t))).then(result => {
|
||||||
|
let tmp = Object.fromEntries(tables.map((t, i) => [t, result[i]]))
|
||||||
|
result = {};
|
||||||
|
result.lastTx = Object.fromEntries(tmp.LastTxs.map(a => [a.ID, a.N]));
|
||||||
|
result.sn_config = Object.fromEntries(tmp.Configs.map(a => [a.NAME, a.VAL]));
|
||||||
|
result.appList = Object.fromEntries(tmp.Applications.map(a => [a.APP_NAME, a.ADMIN_ID]));
|
||||||
|
result.appSubAdmins = Object.fromEntries(tmp.Applications.map(a => [a.APP_NAME, a.SUB_ADMINS.split(",")]))
|
||||||
|
result.supernodes = Object.fromEntries(tmp.SuperNodes.map(a.FLO_ID, {
|
||||||
|
pubKey: a.PUB_KEY,
|
||||||
|
uri: a.URI
|
||||||
|
}))
|
||||||
|
resolve(result)
|
||||||
|
}).catch(error => reject(error))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
db.createTable = function(snID) {
|
db.createTable = function(snID) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
let statement = "CREATE TABLE IF NOT EXISTS " + snID + " ( " +
|
let statement = "CREATE TABLE IF NOT EXISTS " + snID + " ( " +
|
||||||
@ -92,14 +174,18 @@ function Database(user, password, dbname, host = 'localhost') {
|
|||||||
T_struct.TAG_SIGN + " VARCHAR(160), " +
|
T_struct.TAG_SIGN + " VARCHAR(160), " +
|
||||||
"PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")" +
|
"PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")" +
|
||||||
" )";
|
" )";
|
||||||
db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res));
|
db.query(statement)
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
db.dropTable = function(snID) {
|
db.dropTable = function(snID) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
let statement = "DROP TABLE " + snID;
|
let statement = "DROP TABLE " + snID;
|
||||||
db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res));
|
db.query(statement)
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,7 +199,9 @@ function Database(user, password, dbname, host = 'localhost') {
|
|||||||
data = Object.fromEntries(attr.map((a, i) => [
|
data = Object.fromEntries(attr.map((a, i) => [
|
||||||
[a, values[i]]
|
[a, values[i]]
|
||||||
]));
|
]));
|
||||||
db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data));
|
db.query(statement, values)
|
||||||
|
.then(result => resolve(data))
|
||||||
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,16 +212,17 @@ function Database(user, password, dbname, host = 'localhost') {
|
|||||||
[T_struct.TAG_TIME]: tagTime,
|
[T_struct.TAG_TIME]: tagTime,
|
||||||
[T_struct.TAG_KEY]: tagKey,
|
[T_struct.TAG_KEY]: tagKey,
|
||||||
[T_struct.TAG_SIGN]: tagSign,
|
[T_struct.TAG_SIGN]: tagSign,
|
||||||
[L_struct.LOG_TIME]: Date.now(),
|
[L_struct.LOG_TIME]: Date.now()
|
||||||
[H_struct.VECTOR_CLOCK]: vectorClock
|
|
||||||
}
|
}
|
||||||
let attr = Object.keys(data);
|
let attr = Object.keys(data);
|
||||||
let values = attr.map(a => data[a]);
|
let values = attr.map(a => data[a]).concat(vectorClock);
|
||||||
data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data
|
data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data
|
||||||
let statement = "UPDATE " + snID +
|
let statement = "UPDATE " + snID +
|
||||||
" SET " + attr.map(a => a + "=?").join(", ") +
|
" SET " + attr.map(a => a + "=?").join(", ") +
|
||||||
" WHERE " + H_struct.VECTOR_CLOCK + "=" + vectorClock;
|
" WHERE " + H_struct.VECTOR_CLOCK + "=?";
|
||||||
db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data));
|
db.query(statement, values)
|
||||||
|
.then(result => resolve(data))
|
||||||
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -163,18 +252,23 @@ function Database(user, password, dbname, host = 'localhost') {
|
|||||||
conditionArr.push(`${H_struct.SENDER_ID} = '${request.senderID}'`)
|
conditionArr.push(`${H_struct.SENDER_ID} = '${request.senderID}'`)
|
||||||
}
|
}
|
||||||
//console.log(conditionArr);
|
//console.log(conditionArr);
|
||||||
let statement = "SELECT (" + Object.keys(H_struct).join(", ") + ")" +
|
let attr = Object.keys(H_struct).concat(Object.keys(B_struct))
|
||||||
|
let statement = "SELECT (" + attr.join(", ") + ")" +
|
||||||
" FROM " + snID +
|
" FROM " + snID +
|
||||||
" WHERE " + conditionArr.join(" AND ") +
|
" WHERE " + conditionArr.join(" AND ") +
|
||||||
request.mostRecent ? "LIMIT 1" : (" ORDER BY " + H_struct.VECTOR_CLOCK);
|
request.mostRecent ? "LIMIT 1" : (" ORDER BY " + H_struct.VECTOR_CLOCK);
|
||||||
db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res));
|
db.query(statement)
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
db.lastLogTime = function(snID) {
|
db.lastLogTime = function(snID) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM " + snID;
|
let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM " + snID;
|
||||||
db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res))
|
db.query(statement)
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -193,7 +287,9 @@ function Database(user, password, dbname, host = 'localhost') {
|
|||||||
let statement = "SELECT * FROM " + snID +
|
let statement = "SELECT * FROM " + snID +
|
||||||
" WHERE " + L_struct.LOG_TIME + ">=" + logtime +
|
" WHERE " + L_struct.LOG_TIME + ">=" + logtime +
|
||||||
" ORDER BY " + L_struct.LOG_TIME;
|
" ORDER BY " + L_struct.LOG_TIME;
|
||||||
db.conn.query(statement, (err, res) => err ? reject(err) : resolve(res))
|
db.query(statement)
|
||||||
|
.then(result => resolve(result))
|
||||||
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -207,7 +303,9 @@ function Database(user, password, dbname, host = 'localhost') {
|
|||||||
" (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " +
|
" (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " +
|
||||||
"VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ") " +
|
"VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ") " +
|
||||||
"ON DUPLICATE KEY UPDATE " + u_attr.map(a => a + "=?").join(", ");
|
"ON DUPLICATE KEY UPDATE " + u_attr.map(a => a + "=?").join(", ");
|
||||||
db.conn.query(statement, values.concat(u_values), (err, res) => err ? reject(err) : resolve(res));
|
db.query(statement, values.concat(u_values))
|
||||||
|
.then(result => resolve(data))
|
||||||
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,7 +316,9 @@ function Database(user, password, dbname, host = 'localhost') {
|
|||||||
let statement = "UPDATE " + snID +
|
let statement = "UPDATE " + snID +
|
||||||
" SET " + attr.map(a => a + "=?").join(", ") +
|
" SET " + attr.map(a => a + "=?").join(", ") +
|
||||||
" WHERE " + H_struct.VECTOR_CLOCK + "=" + data[H_struct.VECTOR_CLOCK];
|
" WHERE " + H_struct.VECTOR_CLOCK + "=" + data[H_struct.VECTOR_CLOCK];
|
||||||
db.conn.query(statement, values, (err, res) => err ? reject(err) : resolve(data));
|
db.query(statement, values)
|
||||||
|
.then(result => resolve(data))
|
||||||
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
52
src/intra.js
52
src/intra.js
@ -1,7 +1,4 @@
|
|||||||
'use strict';
|
'use strict';
|
||||||
const {
|
|
||||||
stat
|
|
||||||
} = require('fs');
|
|
||||||
const WebSocket = require('ws');
|
const WebSocket = require('ws');
|
||||||
|
|
||||||
//CONSTANTS
|
//CONSTANTS
|
||||||
@ -24,8 +21,7 @@ const SUPERNODE_INDICATOR = '$',
|
|||||||
BACKUP_HANDSHAKE_INIT = "handshakeInitate",
|
BACKUP_HANDSHAKE_INIT = "handshakeInitate",
|
||||||
BACKUP_HANDSHAKE_END = "handshakeEnd";
|
BACKUP_HANDSHAKE_END = "handshakeEnd";
|
||||||
|
|
||||||
//const backupNodes = [];
|
var DB //container for database
|
||||||
var backupDepth, supernodeList, appList, kBucket;
|
|
||||||
|
|
||||||
//List of node backups stored
|
//List of node backups stored
|
||||||
const _list = {};
|
const _list = {};
|
||||||
@ -143,31 +139,22 @@ packet_.s = d => [JSON.stringify(d.message), d.time].join("|");
|
|||||||
packet_.parse = function(str) {
|
packet_.parse = function(str) {
|
||||||
let packet = JSON.parse(str.substring(SUPERNODE_INDICATOR.length))
|
let packet = JSON.parse(str.substring(SUPERNODE_INDICATOR.length))
|
||||||
let curTime = Date.now();
|
let curTime = Date.now();
|
||||||
if (packet.time > curTime - delayTime &&
|
if (packet.time > curTime - floGlobals.sn_config.delayDelta &&
|
||||||
floCrypto.verifySign(this.s(packet), packet.sign, supernodeList[packet.from].pubKey)) {
|
floCrypto.verifySign(this.s(packet), packet.sign, floGlobals.supernodes[packet.from].pubKey)) {
|
||||||
if (!Array.isArray(packet.message))
|
if (!Array.isArray(packet.message))
|
||||||
packet.message = [packet.message];
|
packet.message = [packet.message];
|
||||||
return packet;
|
return packet;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Set parameters from blockchain
|
|
||||||
function setBlockchainParameters(depth, supernodes, apps, KB, delay) {
|
|
||||||
backupDepth = depth;
|
|
||||||
supernodeList = supernodes;
|
|
||||||
appList = apps;
|
|
||||||
kBucket = KB;
|
|
||||||
delayTime = delay;
|
|
||||||
}
|
|
||||||
|
|
||||||
//-----NODE CONNECTORS (WEBSOCKET)-----
|
//-----NODE CONNECTORS (WEBSOCKET)-----
|
||||||
|
|
||||||
//Connect to Node websocket
|
//Connect to Node websocket
|
||||||
function connectToNode(snID) {
|
function connectToNode(snID) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (!(snID in supernodeList))
|
if (!(snID in floGlobals.supernodes))
|
||||||
return reject(`${snID} is not a supernode`)
|
return reject(`${snID} is not a supernode`)
|
||||||
const ws = new WebSocket("wss://" + supernodeList[nextNodeID].uri + "/");
|
const ws = new WebSocket("wss://" + floGlobals.supernodes[nextNodeID].uri + "/");
|
||||||
ws.on("error", () => reject(`${snID} is offline`));
|
ws.on("error", () => reject(`${snID} is offline`));
|
||||||
ws.on('open', () => resolve(ws));
|
ws.on('open', () => resolve(ws));
|
||||||
})
|
})
|
||||||
@ -176,7 +163,7 @@ function connectToNode(snID) {
|
|||||||
//Connect to Node websocket thats online
|
//Connect to Node websocket thats online
|
||||||
function connectToActiveNode(snID, reverse = false) {
|
function connectToActiveNode(snID, reverse = false) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
if (!(snID in supernodeList))
|
if (!(snID in floGlobals.supernodes))
|
||||||
return reject(`${snID} is not a supernode`)
|
return reject(`${snID} is not a supernode`)
|
||||||
if (snID === myFloID)
|
if (snID === myFloID)
|
||||||
return reject(`Reached end of circle. Next node avaiable is self`)
|
return reject(`Reached end of circle. Next node avaiable is self`)
|
||||||
@ -335,7 +322,7 @@ function handshakeMid(id, ws) {
|
|||||||
});
|
});
|
||||||
if (!req_sync.length && !new_order.length)
|
if (!req_sync.length && !new_order.length)
|
||||||
return; //No order change and no need for any data sync
|
return; //No order change and no need for any data sync
|
||||||
Promise.all(req_sync.forEach(n => db.createGetLastLog(n))).then(result => {
|
Promise.all(req_sync.forEach(n => DB.createGetLastLog(n))).then(result => {
|
||||||
let tasks = [];
|
let tasks = [];
|
||||||
if (req_sync.length) {
|
if (req_sync.length) {
|
||||||
tasks.push({
|
tasks.push({
|
||||||
@ -373,8 +360,8 @@ function reconnectNextNode() {
|
|||||||
//Case: No other node is online
|
//Case: No other node is online
|
||||||
console.error(error);
|
console.error(error);
|
||||||
//Serve all nodes
|
//Serve all nodes
|
||||||
for (let sn in supernodeList)
|
for (let sn in floGlobals.supernodes)
|
||||||
db.createTable(sn)
|
DB.createTable(sn)
|
||||||
.then(result => _list[sn] = 0)
|
.then(result => _list[sn] = 0)
|
||||||
.catch(error => console.error(error))
|
.catch(error => console.error(error))
|
||||||
})
|
})
|
||||||
@ -388,8 +375,8 @@ function orderBackup(order) {
|
|||||||
let cur_serve = kBucket.innerNodes(_prevNode.id, myFloID);
|
let cur_serve = kBucket.innerNodes(_prevNode.id, myFloID);
|
||||||
for (let n in order) {
|
for (let n in order) {
|
||||||
if (order[n] + 1 !== _list[n]) {
|
if (order[n] + 1 !== _list[n]) {
|
||||||
if (order[n] >= backupDepth)
|
if (order[n] >= floGlobals.sn_config.backupDepth)
|
||||||
db.dropTable(n).then(_ => null)
|
DB.dropTable(n).then(_ => null)
|
||||||
.catch(error => console.error(error))
|
.catch(error => console.error(error))
|
||||||
.finally(_ => _list.delete(n))
|
.finally(_ => _list.delete(n))
|
||||||
else if (_list[n] !== 0 || !cur_serve.includes(n)) {
|
else if (_list[n] !== 0 || !cur_serve.includes(n)) {
|
||||||
@ -410,7 +397,7 @@ function orderBackup(order) {
|
|||||||
function sendStoredData(lastlogs, node) {
|
function sendStoredData(lastlogs, node) {
|
||||||
for (n in lastlogs) {
|
for (n in lastlogs) {
|
||||||
if (_list.stored.includes(n)) {
|
if (_list.stored.includes(n)) {
|
||||||
db.getData(n, lastlogs[n]).then(result => {
|
DB.getData(n, lastlogs[n]).then(result => {
|
||||||
node.send(packet_.constuct({
|
node.send(packet_.constuct({
|
||||||
type: DATA_SYNC,
|
type: DATA_SYNC,
|
||||||
id: n,
|
id: n,
|
||||||
@ -442,8 +429,9 @@ function dataSyncIndication(snID, status, from) {
|
|||||||
function storeBackupData(data, from, packet) {
|
function storeBackupData(data, from, packet) {
|
||||||
let closestNode = kBucket.closestNode(data.receiverID);
|
let closestNode = kBucket.closestNode(data.receiverID);
|
||||||
if (_list.stored.includes(closestNode)) {
|
if (_list.stored.includes(closestNode)) {
|
||||||
db.storeData(closestNode, data);
|
DB.storeData(closestNode, data);
|
||||||
if (_list[closestNode] < backupDepth && _nextNode.id !== from)
|
if (_list[closestNode] < floGlobals.sn_config.backupDepth &&
|
||||||
|
_nextNode.id !== from)
|
||||||
_nextNode.send(packet);
|
_nextNode.send(packet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -452,8 +440,9 @@ function storeBackupData(data, from, packet) {
|
|||||||
function tagBackupData(data, from, packet) {
|
function tagBackupData(data, from, packet) {
|
||||||
let closestNode = kBucket.closestNode(data.receiverID);
|
let closestNode = kBucket.closestNode(data.receiverID);
|
||||||
if (_list.stored.includes(closestNode)) {
|
if (_list.stored.includes(closestNode)) {
|
||||||
db.storeTag(closestNode, data);
|
DB.storeTag(closestNode, data);
|
||||||
if (_list[closestNode] < backupDepth && _nextNode.id !== from)
|
if (_list[closestNode] < floGlobals.sn_config.backupDepth &&
|
||||||
|
_nextNode.id !== from)
|
||||||
_nextNode.send(packet);
|
_nextNode.send(packet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -476,5 +465,8 @@ module.exports = {
|
|||||||
processTaskFromSupernode,
|
processTaskFromSupernode,
|
||||||
setBlockchainParameters,
|
setBlockchainParameters,
|
||||||
forwardToNextNode,
|
forwardToNextNode,
|
||||||
SUPERNODE_INDICATOR
|
SUPERNODE_INDICATOR,
|
||||||
|
set DB(db){
|
||||||
|
DB = db
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue
Block a user