diff --git a/src/client.js b/src/client.js index 41123f4..735ca41 100644 --- a/src/client.js +++ b/src/client.js @@ -1,24 +1,32 @@ var DB, _list; //container for database and _list (stored n serving) +global.INVALID = function(message) { + if (!(this instanceof INVALID)) + return new INVALID(message); + this.message = message; +} + function processIncomingData(data) { return new Promise((resolve, reject) => { try { data = JSON.parse(data); } catch (error) { - return reject("Data not in JSON-Format"); + return reject(INVALID("Data not in JSON-Format")); }; let curTime = Date.now(); if (!data.time || data.time > curTime + floGlobals.sn_config.delayDelta || data.time < curTime - floGlobals.sn_config.delayDelta) - return reject("Invalid Time"); + return reject(INVALID("Invalid Time")); else { let process; - if (data.request) //Request + if (request in data) //Request process = processRequestFromUser(data.request); - else if (data.message) //Store data + else if (message in data) //Store data process = processDataFromUser(data); - else if (data) //Tag data + else if (tag in data) //Tag data process = processTagFromUser(data); + else if (note in data) + process = processNoteFromUser(data); /* else if (data.edit) return processEditFromUser(gid, uid, data); @@ -26,7 +34,7 @@ function processIncomingData(data) { return processDeleteFromUser(gid, uid, data); */ else - return reject("Invalid Data-format"); + return reject(INVALID("Invalid Data-format")); process.then(result => { console.debug(result); resolve(result); @@ -41,18 +49,18 @@ function processIncomingData(data) { function processDataFromUser(data) { return new Promise((resolve, reject) => { if (!floCrypto.validateAddr(data.receiverID)) - return reject("Invalid receiverID"); + return reject(INVALID("Invalid receiverID")); let closeNode = kBucket.closestNode(data.receiverID); if (!_list.serving.includes(closeNode)) - return reject("Incorrect Supernode"); + return reject(INVALID("Incorrect Supernode")); if (!floCrypto.validateAddr(data.receiverID)) - return reject("Invalid senderID"); + return reject(INVALID("Invalid senderID")); if (data.senderID !== floCrypto.getFloID(data.pubKey)) - return reject("Invalid pubKey"); + return reject(INVALID("Invalid pubKey")); let hashcontent = ["receiverID", "time", "application", "type", "message", "comment"] .map(d => data[d]).join("|"); if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey)) - return reject("Invalid signature"); + return reject(INVALID("Invalid signature")); DB.addData(closeNode, { vectorClock: `${Date.now()}_${data.senderID}`, @@ -73,10 +81,10 @@ function processDataFromUser(data) { function processRequestFromUser(request) { return new Promise((resolve, reject) => { if (!floCrypto.validateAddr(request.receiverID)) - return reject("Invalid receiverID"); + return reject(INVALID("Invalid receiverID")); let closeNode = kBucket.closestNode(request.receiverID); if (!_list.serving.includes(closeNode)) - return reject("Incorrect Supernode"); + return reject(INVALID("Incorrect Supernode")); DB.searchData(closeNode, request) .then(result => resolve([result])) .catch(error => reject(error)); @@ -86,27 +94,55 @@ function processRequestFromUser(request) { function processTagFromUser(data) { return new Promise((resolve, reject) => { if (!floCrypto.validateAddr(data.receiverID)) - return reject("Invalid receiverID"); - if (!(data.application in floGlobals.appList)) - return reject("Invalid application"); - if (!floCrypto.validateAddr(data.requestorID) || - !floGlobals.appSubAdmins.includes(data.requestorID)) - return reject("Invalid requestorID"); - if (data.requestorID !== floCrypto.getFloID(data.pubKey)) - return reject("Invalid pubKey"); + return reject(INVALID("Invalid receiverID")); let closeNode = kBucket.closestNode(data.receiverID); if (!_list.serving.includes(closeNode)) - return reject("Incorrect Supernode"); - let hashcontent = ["time", "application", "tag"].map(d => data[d]).join("|"); - if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey)) - return reject("Invalid signature"); - let tag = ([null, undefined, ""].includes(data.tag) ? null : [data.tag].toString()); - DB.tagData(closeNode, data.vectorClock, tag, data.time, data.pubKey, data.sign) - .then(result => resolve([result, 'TAG'])) - .catch(error => reject(error)); + return reject(INVALID("Incorrect Supernode")); + DB.getData(closeNode, data.vectorClock).then(result => { + if (!(result.application in floGlobals.appList)) + return reject(INVALID("Application not authorised")); + if (!floCrypto.validateAddr(data.requestorID) || + !floGlobals.appSubAdmins[result.application].includes(data.requestorID)) + return reject(INVALID("Invalid requestorID")); + if (data.requestorID !== floCrypto.getFloID(data.pubKey)) + return reject(INVALID("Invalid pubKey")); + let hashcontent = ["time", "vectorClock", "tag"].map(d => data[d]).join("|"); + if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey)) + return reject(INVALID("Invalid signature")); + let tag = ([null, undefined, ""].includes(data.tag) ? null : data.tag.toString()); + DB.tagData(closeNode, data.vectorClock, tag, data.time, data.pubKey, data.sign) + .then(result => resolve([result, 'TAG'])) + .catch(error => reject(error)); + }).catch(error => reject(error)) }); }; +function processNoteFromUser(data) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(data.receiverID)) + return reject(INVALID("Invalid receiverID")); + let closeNode = kBucket.closestNode(data.receiverID); + if (!_list.serving.includes(closeNode)) + return reject(INVALID("Incorrect Supernode")); + DB.getData(closeNode, data.vectorClock).then(result => { + if (result.application in floGlobals.appList && floGlobals.appList[result.application] === result.receiverID) { + if (!floGlobals.appSubAdmins[result.application].includes(data.requestorID) && result.receiverID !== data.requestorID) + return reject(INVALID("Invalid requestorID")); + } else if (result.receiverID !== data.requestorID) + return reject(INVALID("Invalid requestorID")); + if (data.requestorID !== floCrypto.getFloID(data.pubKey)) + return reject(INVALID("Invalid pubKey")); + let hashcontent = ["time", "vectorClock", "note"].map(d => data[d]).join("|"); + if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey)) + return reject(INVALID("Invalid signature")); + let note = ([null, undefined, ""].includes(data.note) ? null : data.note.toString()); + DB.noteData(closeNode, data.vectorClock, note, data.time, data.pubKey, data.sign) + .then(result => resolve([result, 'NOTE'])) + .catch(error => reject(error)); + }).catch(error => reject(error)) + }) +} + function checkIfRequestSatisfy(request, data) { if (!request || request.mostRecent || request.receiverID !== data.receiverID) return false; diff --git a/src/database.js b/src/database.js index 99a3934..d070cd8 100644 --- a/src/database.js +++ b/src/database.js @@ -54,6 +54,13 @@ const T_struct = { TAG_SIGN: "tag_sign" }; +const F_struct = { + NOTE: "note", + NOTE_TIME: "note_time", + NOTE_KEY: "note_key", + NOTE_SIGN: "note_sign", +} + function Database(user, password, dbname, host = 'localhost') { const db = {}; @@ -178,6 +185,10 @@ function Database(user, password, dbname, host = 'localhost') { T_struct.TAG_TIME + " BIGINT, " + T_struct.TAG_KEY + " CHAR(66), " + T_struct.TAG_SIGN + " VARCHAR(160), " + + F_struct.NOTE + " TINYTEXT, " + + F_struct.NOTE_TIME + " BIGINT, " + + F_struct.NOTE_KEY + " CHAR(66), " + + F_struct.NOTE_SIGN + " VARCHAR(160), " + "PRIMARY KEY (" + H_struct.VECTOR_CLOCK + ")" + " )"; db.query(statement) @@ -213,6 +224,16 @@ function Database(user, password, dbname, host = 'localhost') { }); }; + db.getData = function(snID, vectorClock) { + return new Promise((resolve, reject) => { + let statement = "SELECT * FROM _" + snID + + "WHERE " + H_struct.VECTOR_CLOCK + "=?"; + db.query(statement, [vectorClock]) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) + }; + db.tagData = function(snID, vectorClock, tag, tagTime, tagKey, tagSign) { return new Promise((resolve, reject) => { let data = { @@ -234,6 +255,27 @@ function Database(user, password, dbname, host = 'localhost') { }); }; + db.noteData = function(snID, vectorClock, note, noteTime, noteKey, noteSign) { + return new Promise((resolve, reject) => { + let data = { + [F_struct.NOTE]: note, + [F_struct.NOTE_TIME]: noteTime, + [F_struct.NOTE_KEY]: noteKey, + [F_struct.NOTE_SIGN]: noteSign, + [L_struct.LOG_TIME]: Date.now() + }; + let attr = Object.keys(data); + let values = attr.map(a => data[a]).concat(vectorClock); + data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data + let statement = "UPDATE _" + snID + + " SET " + attr.map(a => a + "=?").join(", ") + + " WHERE " + H_struct.VECTOR_CLOCK + "=?"; + db.query(statement, values) + .then(result => resolve(data)) + .catch(error => reject(error)); + }); + } + db.searchData = function(snID, request) { return new Promise((resolve, reject) => { let conditionArr = []; @@ -291,7 +333,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.getData = function(snID, logtime) { + db.readAllData = function(snID, logtime) { return new Promise((resolve, reject) => { let statement = "SELECT * FROM _" + snID + " WHERE " + L_struct.LOG_TIME + ">" + logtime + @@ -325,16 +367,27 @@ function Database(user, password, dbname, host = 'localhost') { db.storeTag = function(snID, data) { return new Promise((resolve, reject) => { let attr = Object.keys(T_struct).map(a => T_struct[a]).concat(L_struct.LOG_TIME); - let values = attr.map(a => data[a]); + let values = attr.map(a => data[a]).concat(data[H_struct.VECTOR_CLOCK]); let statement = "UPDATE _" + snID + " SET " + attr.map(a => a + "=?").join(", ") + - " WHERE " + H_struct.VECTOR_CLOCK + "=" + data[H_struct.VECTOR_CLOCK]; + " WHERE " + H_struct.VECTOR_CLOCK + "=?"; db.query(statement, values) .then(result => resolve(data)) .catch(error => reject(error)); }); }; + db.storeNote = function(snID, data) { + let attr = Object.keys(F_struct).map(a => F_struct[a]).concat(L_struct.LOG_TIME); + let values = attr.map(a => data[a]).concat(data[H_struct.VECTOR_CLOCK]); + let statement = "UPDATE _" + snID + + " SET " + attr.map(a => a + "=?").join(", ") + + " WHERE " + H_struct.VECTOR_CLOCK + "=?"; + db.query(statement, values) + .then(result => resolve(data)) + .catch(error => reject(error)); + } + db.deleteData = function(snID, vectorClock) { return new Promise((resolve, reject) => { let statement = "DELETE FROM _" + snID + @@ -355,7 +408,7 @@ function Database(user, password, dbname, host = 'localhost') { H_struct.RECEIVER_ID + " != ? OR " + H_struct.SENDER_ID + " NOT IN (" + subAdmins.map(a => "?").join(", ") + ") )" : ""); - db.query(statement, [timestamp, app].concat(subAdmins).push(adminID)) + db.query(statement, [timestamp, app, adminID].concat(subAdmins)) .then(result => resolve(result)) .catch(error => reject(error)); }); diff --git a/src/intra.js b/src/intra.js index f1bb0b7..b784327 100644 --- a/src/intra.js +++ b/src/intra.js @@ -10,6 +10,7 @@ const SUPERNODE_INDICATOR = '$', DELETE_MIGRATED_DATA = "migratedDelete", //DELETE_BACKUP_DATA = "backupDelete", TAG_BACKUP_DATA = "backupTag", + NOTE_BACKUP_DATA = "backupNote", //EDIT_BACKUP_DATA = "backupEdit", INITIATE_REFRESH = "initiateRefresh", DATA_REQUEST = "dataRequest", @@ -285,6 +286,9 @@ function processTaskFromPrevNode(packet) { case TAG_BACKUP_DATA: tagBackupData(task.data, from, packet); break; + case NOTE_BACKUP_DATA: + noteBackupData(task.data, from, packet); + break; case DATA_REQUEST: sendStoredData(task.nodes, _prevNode); break; @@ -512,7 +516,7 @@ orderBackup.requestData = function(req_sync, new_order) { function sendStoredData(lastlogs, node) { for (let n in lastlogs) { if (_list.stored.includes(n)) { - DB.getData(n, lastlogs[n]).then(result => { + DB.readAllData(n, lastlogs[n]).then(result => { node.send(packet_.construct({ type: DATA_SYNC, id: n, @@ -560,6 +564,16 @@ function tagBackupData(data, from, packet) { }; }; +//Note (backup) data +function noteBackupData(data, from, packet) { + let closestNode = kBucket.closestNode(data.receiverID); + if (_list.stored.includes(closestNode)) { + DB.storeNote(closestNode, data).then(_ => null).catch(e => console.error(e)); + if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from) + _nextNode.send(packet); + }; +}; + //Store (migrated) data function storeMigratedData(data) { let closestNode = kBucket.closestNode(data.receiverID); @@ -590,6 +604,7 @@ function initiateRefresh() { function forwardToNextNode(mode, data) { var modeMap = { 'TAG': TAG_BACKUP_DATA, + 'NOTE': NOTE_BACKUP_DATA, 'DATA': STORE_BACKUP_DATA }; if (mode in modeMap && _nextNode.id) @@ -648,7 +663,7 @@ dataMigration.process_del = async function(del_nodes, old_kb) { connectToAllActiveNodes().then(ws_connections => { let remaining = process_nodes.length; process_nodes.forEach(n => { - DB.getData(n, 0).then(result => { + DB.readAllData(n, 0).then(result => { console.log(`START: Data migration for ${n}`); //TODO: efficiently handle large number of data instead of loading all into memory result.forEach(d => { @@ -698,7 +713,7 @@ dataMigration.process_new = async function(new_nodes) { let process_nodes = _list.serving, remaining = process_nodes.length; process_nodes.forEach(n => { - DB.getData(n, 0).then(result => { + DB.readAllData(n, 0).then(result => { //TODO: efficiently handle large number of data instead of loading all into memory result.forEach(d => { let closest = kBucket.closestNode(d.receiverID); diff --git a/src/main.js b/src/main.js index 9f3da52..5c2dd83 100644 --- a/src/main.js +++ b/src/main.js @@ -241,7 +241,7 @@ function selfDiskMigration(node_change) { disks.forEach(n => { if (node_change[n] === false) DB.dropTable(n).then(_ => null).catch(e => console.error(e)); - DB.getData(n, 0).then(result => { + DB.readAllData(n, 0).then(result => { result.forEach(d => { let closest = kBucket.closestNode(d.receiverID); if (closest !== n) diff --git a/src/server.js b/src/server.js index 1862844..70c0b16 100644 --- a/src/server.js +++ b/src/server.js @@ -2,6 +2,9 @@ const http = require('http'); const WebSocket = require('ws'); const url = require('url'); +const INVALID_E_CODE = 400, + INTERNAL_E_CODE = 500; + module.exports = function Server(port, client, intra) { var refresher; //container for refresher @@ -16,14 +19,21 @@ module.exports = function Server(port, client, intra) { console.debug("GET:", u.search); client.processRequestFromUser(u.query) .then(result => res.end(JSON.stringify(result[0]))) - .catch(error => res.end(error.toString())); + .catch(error => { + if (error instanceof INVALID) + res.writeHead(INVALID_E_CODE).end(error.message); + else { + console.error(error); + res.writeHead(INTERNAL_E_CODE).end("Unable to process request"); + } + }); } else if (req.method === "POST") { //POST: All data processing (required JSON input) let data = ''; req.on('data', chunk => data += chunk); req.on('end', () => { console.debug("POST:", data); - //process the data storing + //process the all data request types client.processIncomingData(data).then(result => { res.end(JSON.stringify(result[0])); if (result[1]) { @@ -32,7 +42,14 @@ module.exports = function Server(port, client, intra) { sendToLiveRequests(result[0]); intra.forwardToNextNode(result[1], result[0]); }; - }).catch(error => res.end(error.toString())); + }).catch(error => { + if (error instanceof INVALID) + res.writeHead(INVALID_E_CODE).end(error.message); + else { + console.error(error); + res.writeHead(INTERNAL_E_CODE).end("Unable to process request"); + } + }); }); }; }); @@ -60,13 +77,19 @@ module.exports = function Server(port, client, intra) { ws.send(JSON.stringify(result[0])); ws._liveReq = request; }).catch(error => { - if (floGlobals.sn_config.errorFeedback) - ws.send(error.toString()); + if (floGlobals.sn_config.errorFeedback) { + if (error instanceof INVALID) + ws.send(`${INVALID_E_CODE}: ${error.message}`); + else { + console.error(error); + ws.send(`${INTERNAL_E_CODE}: Unable to process request`); + } + } }); } catch (error) { console.error(error); if (floGlobals.sn_config.errorFeedback) - ws.send("Request not in JSON format"); + ws.send(`${INVALID_E_CODE}: Request not in JSON format`); }; }; };