New feature: Note And some other improvements
Added new feature 'Note': - Receivers can add a tiny note to data received. - For data received by adminID of authorised app, subAdmins will also be able to add/edit note Improved Invalid error and Internal error feedback: - Only Invalid errors are relayed to user ( Invalid error code: 400). - Internal errors caused just indicate 'unable to process request' to user (Internal error code: 500) Other changes: - Processing Tag will now check application from data stored. application parameter is not required (Signature will require vectorClock instead of application). - Fixed bug: clear-authorized-data using incorrect floID values
This commit is contained in:
parent
312e24629f
commit
52571ccf8b
@ -1,24 +1,32 @@
|
||||
var DB, _list; //container for database and _list (stored n serving)
|
||||
|
||||
global.INVALID = function(message) {
|
||||
if (!(this instanceof INVALID))
|
||||
return new INVALID(message);
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
function processIncomingData(data) {
|
||||
return new Promise((resolve, reject) => {
|
||||
try {
|
||||
data = JSON.parse(data);
|
||||
} catch (error) {
|
||||
return reject("Data not in JSON-Format");
|
||||
return reject(INVALID("Data not in JSON-Format"));
|
||||
};
|
||||
let curTime = Date.now();
|
||||
if (!data.time || data.time > curTime + floGlobals.sn_config.delayDelta ||
|
||||
data.time < curTime - floGlobals.sn_config.delayDelta)
|
||||
return reject("Invalid Time");
|
||||
return reject(INVALID("Invalid Time"));
|
||||
else {
|
||||
let process;
|
||||
if (data.request) //Request
|
||||
if (request in data) //Request
|
||||
process = processRequestFromUser(data.request);
|
||||
else if (data.message) //Store data
|
||||
else if (message in data) //Store data
|
||||
process = processDataFromUser(data);
|
||||
else if (data) //Tag data
|
||||
else if (tag in data) //Tag data
|
||||
process = processTagFromUser(data);
|
||||
else if (note in data)
|
||||
process = processNoteFromUser(data);
|
||||
/*
|
||||
else if (data.edit)
|
||||
return processEditFromUser(gid, uid, data);
|
||||
@ -26,7 +34,7 @@ function processIncomingData(data) {
|
||||
return processDeleteFromUser(gid, uid, data);
|
||||
*/
|
||||
else
|
||||
return reject("Invalid Data-format");
|
||||
return reject(INVALID("Invalid Data-format"));
|
||||
process.then(result => {
|
||||
console.debug(result);
|
||||
resolve(result);
|
||||
@ -41,18 +49,18 @@ function processIncomingData(data) {
|
||||
function processDataFromUser(data) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!floCrypto.validateAddr(data.receiverID))
|
||||
return reject("Invalid receiverID");
|
||||
return reject(INVALID("Invalid receiverID"));
|
||||
let closeNode = kBucket.closestNode(data.receiverID);
|
||||
if (!_list.serving.includes(closeNode))
|
||||
return reject("Incorrect Supernode");
|
||||
return reject(INVALID("Incorrect Supernode"));
|
||||
if (!floCrypto.validateAddr(data.receiverID))
|
||||
return reject("Invalid senderID");
|
||||
return reject(INVALID("Invalid senderID"));
|
||||
if (data.senderID !== floCrypto.getFloID(data.pubKey))
|
||||
return reject("Invalid pubKey");
|
||||
return reject(INVALID("Invalid pubKey"));
|
||||
let hashcontent = ["receiverID", "time", "application", "type", "message", "comment"]
|
||||
.map(d => data[d]).join("|");
|
||||
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
|
||||
return reject("Invalid signature");
|
||||
return reject(INVALID("Invalid signature"));
|
||||
|
||||
DB.addData(closeNode, {
|
||||
vectorClock: `${Date.now()}_${data.senderID}`,
|
||||
@ -73,10 +81,10 @@ function processDataFromUser(data) {
|
||||
function processRequestFromUser(request) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!floCrypto.validateAddr(request.receiverID))
|
||||
return reject("Invalid receiverID");
|
||||
return reject(INVALID("Invalid receiverID"));
|
||||
let closeNode = kBucket.closestNode(request.receiverID);
|
||||
if (!_list.serving.includes(closeNode))
|
||||
return reject("Incorrect Supernode");
|
||||
return reject(INVALID("Incorrect Supernode"));
|
||||
DB.searchData(closeNode, request)
|
||||
.then(result => resolve([result]))
|
||||
.catch(error => reject(error));
|
||||
@ -86,27 +94,55 @@ function processRequestFromUser(request) {
|
||||
function processTagFromUser(data) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!floCrypto.validateAddr(data.receiverID))
|
||||
return reject("Invalid receiverID");
|
||||
if (!(data.application in floGlobals.appList))
|
||||
return reject("Invalid application");
|
||||
if (!floCrypto.validateAddr(data.requestorID) ||
|
||||
!floGlobals.appSubAdmins.includes(data.requestorID))
|
||||
return reject("Invalid requestorID");
|
||||
if (data.requestorID !== floCrypto.getFloID(data.pubKey))
|
||||
return reject("Invalid pubKey");
|
||||
return reject(INVALID("Invalid receiverID"));
|
||||
let closeNode = kBucket.closestNode(data.receiverID);
|
||||
if (!_list.serving.includes(closeNode))
|
||||
return reject("Incorrect Supernode");
|
||||
let hashcontent = ["time", "application", "tag"].map(d => data[d]).join("|");
|
||||
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
|
||||
return reject("Invalid signature");
|
||||
let tag = ([null, undefined, ""].includes(data.tag) ? null : [data.tag].toString());
|
||||
DB.tagData(closeNode, data.vectorClock, tag, data.time, data.pubKey, data.sign)
|
||||
.then(result => resolve([result, 'TAG']))
|
||||
.catch(error => reject(error));
|
||||
return reject(INVALID("Incorrect Supernode"));
|
||||
DB.getData(closeNode, data.vectorClock).then(result => {
|
||||
if (!(result.application in floGlobals.appList))
|
||||
return reject(INVALID("Application not authorised"));
|
||||
if (!floCrypto.validateAddr(data.requestorID) ||
|
||||
!floGlobals.appSubAdmins[result.application].includes(data.requestorID))
|
||||
return reject(INVALID("Invalid requestorID"));
|
||||
if (data.requestorID !== floCrypto.getFloID(data.pubKey))
|
||||
return reject(INVALID("Invalid pubKey"));
|
||||
let hashcontent = ["time", "vectorClock", "tag"].map(d => data[d]).join("|");
|
||||
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
|
||||
return reject(INVALID("Invalid signature"));
|
||||
let tag = ([null, undefined, ""].includes(data.tag) ? null : data.tag.toString());
|
||||
DB.tagData(closeNode, data.vectorClock, tag, data.time, data.pubKey, data.sign)
|
||||
.then(result => resolve([result, 'TAG']))
|
||||
.catch(error => reject(error));
|
||||
}).catch(error => reject(error))
|
||||
});
|
||||
};
|
||||
|
||||
function processNoteFromUser(data) {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!floCrypto.validateAddr(data.receiverID))
|
||||
return reject(INVALID("Invalid receiverID"));
|
||||
let closeNode = kBucket.closestNode(data.receiverID);
|
||||
if (!_list.serving.includes(closeNode))
|
||||
return reject(INVALID("Incorrect Supernode"));
|
||||
DB.getData(closeNode, data.vectorClock).then(result => {
|
||||
if (result.application in floGlobals.appList && floGlobals.appList[result.application] === result.receiverID) {
|
||||
if (!floGlobals.appSubAdmins[result.application].includes(data.requestorID) && result.receiverID !== data.requestorID)
|
||||
return reject(INVALID("Invalid requestorID"));
|
||||
} else if (result.receiverID !== data.requestorID)
|
||||
return reject(INVALID("Invalid requestorID"));
|
||||
if (data.requestorID !== floCrypto.getFloID(data.pubKey))
|
||||
return reject(INVALID("Invalid pubKey"));
|
||||
let hashcontent = ["time", "vectorClock", "note"].map(d => data[d]).join("|");
|
||||
if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey))
|
||||
return reject(INVALID("Invalid signature"));
|
||||
let note = ([null, undefined, ""].includes(data.note) ? null : data.note.toString());
|
||||
DB.noteData(closeNode, data.vectorClock, note, data.time, data.pubKey, data.sign)
|
||||
.then(result => resolve([result, 'NOTE']))
|
||||
.catch(error => reject(error));
|
||||
}).catch(error => reject(error))
|
||||
})
|
||||
}
|
||||
|
||||
function checkIfRequestSatisfy(request, data) {
|
||||
if (!request || request.mostRecent || request.receiverID !== data.receiverID)
|
||||
return false;
|
||||
|
||||
@ -54,6 +54,13 @@ const T_struct = {
|
||||
TAG_SIGN: "tag_sign"
|
||||
};
|
||||
|
||||
const F_struct = {
|
||||
NOTE: "note",
|
||||
NOTE_TIME: "note_time",
|
||||
NOTE_KEY: "note_key",
|
||||
NOTE_SIGN: "note_sign",
|
||||
}
|
||||
|
||||
function Database(user, password, dbname, host = 'localhost') {
|
||||
const db = {};
|
||||
|
||||
@ -178,6 +185,10 @@ function Database(user, password, dbname, host = 'localhost') {
|
||||
T_struct.TAG_TIME + " BIGINT, " +
|
||||
T_struct.TAG_KEY + " CHAR(66), " +
|
||||
T_struct.TAG_SIGN + " VARCHAR(160), " +
|
||||
F_struct.NOTE + " TINYTEXT, " +
|
||||
F_struct.NOTE_TIME + " BIGINT, " +
|
||||
F_struct.NOTE_KEY + " CHAR(66), " +
|
||||
F_struct.NOTE_SIGN + " VARCHAR(160), " +
|
||||
"PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")" +
|
||||
" )";
|
||||
db.query(statement)
|
||||
@ -213,6 +224,16 @@ function Database(user, password, dbname, host = 'localhost') {
|
||||
});
|
||||
};
|
||||
|
||||
db.getData = function(snID, vectorClock) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let statement = "SELECT * FROM _" + snID +
|
||||
"WHERE " + H_struct.VECTOR_CLOCK + "=?";
|
||||
db.query(statement, [vectorClock])
|
||||
.then(result => resolve(result))
|
||||
.catch(error => reject(error))
|
||||
})
|
||||
};
|
||||
|
||||
db.tagData = function(snID, vectorClock, tag, tagTime, tagKey, tagSign) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let data = {
|
||||
@ -234,6 +255,27 @@ function Database(user, password, dbname, host = 'localhost') {
|
||||
});
|
||||
};
|
||||
|
||||
db.noteData = function(snID, vectorClock, note, noteTime, noteKey, noteSign) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let data = {
|
||||
[F_struct.NOTE]: note,
|
||||
[F_struct.NOTE_TIME]: noteTime,
|
||||
[F_struct.NOTE_KEY]: noteKey,
|
||||
[F_struct.NOTE_SIGN]: noteSign,
|
||||
[L_struct.LOG_TIME]: Date.now()
|
||||
};
|
||||
let attr = Object.keys(data);
|
||||
let values = attr.map(a => data[a]).concat(vectorClock);
|
||||
data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data
|
||||
let statement = "UPDATE _" + snID +
|
||||
" SET " + attr.map(a => a + "=?").join(", ") +
|
||||
" WHERE " + H_struct.VECTOR_CLOCK + "=?";
|
||||
db.query(statement, values)
|
||||
.then(result => resolve(data))
|
||||
.catch(error => reject(error));
|
||||
});
|
||||
}
|
||||
|
||||
db.searchData = function(snID, request) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let conditionArr = [];
|
||||
@ -291,7 +333,7 @@ function Database(user, password, dbname, host = 'localhost') {
|
||||
});
|
||||
};
|
||||
|
||||
db.getData = function(snID, logtime) {
|
||||
db.readAllData = function(snID, logtime) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let statement = "SELECT * FROM _" + snID +
|
||||
" WHERE " + L_struct.LOG_TIME + ">" + logtime +
|
||||
@ -325,16 +367,27 @@ function Database(user, password, dbname, host = 'localhost') {
|
||||
db.storeTag = function(snID, data) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let attr = Object.keys(T_struct).map(a => T_struct[a]).concat(L_struct.LOG_TIME);
|
||||
let values = attr.map(a => data[a]);
|
||||
let values = attr.map(a => data[a]).concat(data[H_struct.VECTOR_CLOCK]);
|
||||
let statement = "UPDATE _" + snID +
|
||||
" SET " + attr.map(a => a + "=?").join(", ") +
|
||||
" WHERE " + H_struct.VECTOR_CLOCK + "=" + data[H_struct.VECTOR_CLOCK];
|
||||
" WHERE " + H_struct.VECTOR_CLOCK + "=?";
|
||||
db.query(statement, values)
|
||||
.then(result => resolve(data))
|
||||
.catch(error => reject(error));
|
||||
});
|
||||
};
|
||||
|
||||
db.storeNote = function(snID, data) {
|
||||
let attr = Object.keys(F_struct).map(a => F_struct[a]).concat(L_struct.LOG_TIME);
|
||||
let values = attr.map(a => data[a]).concat(data[H_struct.VECTOR_CLOCK]);
|
||||
let statement = "UPDATE _" + snID +
|
||||
" SET " + attr.map(a => a + "=?").join(", ") +
|
||||
" WHERE " + H_struct.VECTOR_CLOCK + "=?";
|
||||
db.query(statement, values)
|
||||
.then(result => resolve(data))
|
||||
.catch(error => reject(error));
|
||||
}
|
||||
|
||||
db.deleteData = function(snID, vectorClock) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let statement = "DELETE FROM _" + snID +
|
||||
@ -355,7 +408,7 @@ function Database(user, password, dbname, host = 'localhost') {
|
||||
H_struct.RECEIVER_ID + " != ? OR " +
|
||||
H_struct.SENDER_ID + " NOT IN (" + subAdmins.map(a => "?").join(", ") + ") )" :
|
||||
"");
|
||||
db.query(statement, [timestamp, app].concat(subAdmins).push(adminID))
|
||||
db.query(statement, [timestamp, app, adminID].concat(subAdmins))
|
||||
.then(result => resolve(result))
|
||||
.catch(error => reject(error));
|
||||
});
|
||||
|
||||
21
src/intra.js
21
src/intra.js
@ -10,6 +10,7 @@ const SUPERNODE_INDICATOR = '$',
|
||||
DELETE_MIGRATED_DATA = "migratedDelete",
|
||||
//DELETE_BACKUP_DATA = "backupDelete",
|
||||
TAG_BACKUP_DATA = "backupTag",
|
||||
NOTE_BACKUP_DATA = "backupNote",
|
||||
//EDIT_BACKUP_DATA = "backupEdit",
|
||||
INITIATE_REFRESH = "initiateRefresh",
|
||||
DATA_REQUEST = "dataRequest",
|
||||
@ -285,6 +286,9 @@ function processTaskFromPrevNode(packet) {
|
||||
case TAG_BACKUP_DATA:
|
||||
tagBackupData(task.data, from, packet);
|
||||
break;
|
||||
case NOTE_BACKUP_DATA:
|
||||
noteBackupData(task.data, from, packet);
|
||||
break;
|
||||
case DATA_REQUEST:
|
||||
sendStoredData(task.nodes, _prevNode);
|
||||
break;
|
||||
@ -512,7 +516,7 @@ orderBackup.requestData = function(req_sync, new_order) {
|
||||
function sendStoredData(lastlogs, node) {
|
||||
for (let n in lastlogs) {
|
||||
if (_list.stored.includes(n)) {
|
||||
DB.getData(n, lastlogs[n]).then(result => {
|
||||
DB.readAllData(n, lastlogs[n]).then(result => {
|
||||
node.send(packet_.construct({
|
||||
type: DATA_SYNC,
|
||||
id: n,
|
||||
@ -560,6 +564,16 @@ function tagBackupData(data, from, packet) {
|
||||
};
|
||||
};
|
||||
|
||||
//Note (backup) data
|
||||
function noteBackupData(data, from, packet) {
|
||||
let closestNode = kBucket.closestNode(data.receiverID);
|
||||
if (_list.stored.includes(closestNode)) {
|
||||
DB.storeNote(closestNode, data).then(_ => null).catch(e => console.error(e));
|
||||
if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from)
|
||||
_nextNode.send(packet);
|
||||
};
|
||||
};
|
||||
|
||||
//Store (migrated) data
|
||||
function storeMigratedData(data) {
|
||||
let closestNode = kBucket.closestNode(data.receiverID);
|
||||
@ -590,6 +604,7 @@ function initiateRefresh() {
|
||||
function forwardToNextNode(mode, data) {
|
||||
var modeMap = {
|
||||
'TAG': TAG_BACKUP_DATA,
|
||||
'NOTE': NOTE_BACKUP_DATA,
|
||||
'DATA': STORE_BACKUP_DATA
|
||||
};
|
||||
if (mode in modeMap && _nextNode.id)
|
||||
@ -648,7 +663,7 @@ dataMigration.process_del = async function(del_nodes, old_kb) {
|
||||
connectToAllActiveNodes().then(ws_connections => {
|
||||
let remaining = process_nodes.length;
|
||||
process_nodes.forEach(n => {
|
||||
DB.getData(n, 0).then(result => {
|
||||
DB.readAllData(n, 0).then(result => {
|
||||
console.log(`START: Data migration for ${n}`);
|
||||
//TODO: efficiently handle large number of data instead of loading all into memory
|
||||
result.forEach(d => {
|
||||
@ -698,7 +713,7 @@ dataMigration.process_new = async function(new_nodes) {
|
||||
let process_nodes = _list.serving,
|
||||
remaining = process_nodes.length;
|
||||
process_nodes.forEach(n => {
|
||||
DB.getData(n, 0).then(result => {
|
||||
DB.readAllData(n, 0).then(result => {
|
||||
//TODO: efficiently handle large number of data instead of loading all into memory
|
||||
result.forEach(d => {
|
||||
let closest = kBucket.closestNode(d.receiverID);
|
||||
|
||||
@ -241,7 +241,7 @@ function selfDiskMigration(node_change) {
|
||||
disks.forEach(n => {
|
||||
if (node_change[n] === false)
|
||||
DB.dropTable(n).then(_ => null).catch(e => console.error(e));
|
||||
DB.getData(n, 0).then(result => {
|
||||
DB.readAllData(n, 0).then(result => {
|
||||
result.forEach(d => {
|
||||
let closest = kBucket.closestNode(d.receiverID);
|
||||
if (closest !== n)
|
||||
|
||||
@ -2,6 +2,9 @@ const http = require('http');
|
||||
const WebSocket = require('ws');
|
||||
const url = require('url');
|
||||
|
||||
const INVALID_E_CODE = 400,
|
||||
INTERNAL_E_CODE = 500;
|
||||
|
||||
module.exports = function Server(port, client, intra) {
|
||||
|
||||
var refresher; //container for refresher
|
||||
@ -16,14 +19,21 @@ module.exports = function Server(port, client, intra) {
|
||||
console.debug("GET:", u.search);
|
||||
client.processRequestFromUser(u.query)
|
||||
.then(result => res.end(JSON.stringify(result[0])))
|
||||
.catch(error => res.end(error.toString()));
|
||||
.catch(error => {
|
||||
if (error instanceof INVALID)
|
||||
res.writeHead(INVALID_E_CODE).end(error.message);
|
||||
else {
|
||||
console.error(error);
|
||||
res.writeHead(INTERNAL_E_CODE).end("Unable to process request");
|
||||
}
|
||||
});
|
||||
} else if (req.method === "POST") {
|
||||
//POST: All data processing (required JSON input)
|
||||
let data = '';
|
||||
req.on('data', chunk => data += chunk);
|
||||
req.on('end', () => {
|
||||
console.debug("POST:", data);
|
||||
//process the data storing
|
||||
//process the all data request types
|
||||
client.processIncomingData(data).then(result => {
|
||||
res.end(JSON.stringify(result[0]));
|
||||
if (result[1]) {
|
||||
@ -32,7 +42,14 @@ module.exports = function Server(port, client, intra) {
|
||||
sendToLiveRequests(result[0]);
|
||||
intra.forwardToNextNode(result[1], result[0]);
|
||||
};
|
||||
}).catch(error => res.end(error.toString()));
|
||||
}).catch(error => {
|
||||
if (error instanceof INVALID)
|
||||
res.writeHead(INVALID_E_CODE).end(error.message);
|
||||
else {
|
||||
console.error(error);
|
||||
res.writeHead(INTERNAL_E_CODE).end("Unable to process request");
|
||||
}
|
||||
});
|
||||
});
|
||||
};
|
||||
});
|
||||
@ -60,13 +77,19 @@ module.exports = function Server(port, client, intra) {
|
||||
ws.send(JSON.stringify(result[0]));
|
||||
ws._liveReq = request;
|
||||
}).catch(error => {
|
||||
if (floGlobals.sn_config.errorFeedback)
|
||||
ws.send(error.toString());
|
||||
if (floGlobals.sn_config.errorFeedback) {
|
||||
if (error instanceof INVALID)
|
||||
ws.send(`${INVALID_E_CODE}: ${error.message}`);
|
||||
else {
|
||||
console.error(error);
|
||||
ws.send(`${INTERNAL_E_CODE}: Unable to process request`);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
if (floGlobals.sn_config.errorFeedback)
|
||||
ws.send("Request not in JSON format");
|
||||
ws.send(`${INVALID_E_CODE}: Request not in JSON format`);
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
Loading…
Reference in New Issue
Block a user