From bb21b1c436733fd28ad848ccb97a19bbee5c6ac0 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Thu, 22 Jul 2021 01:50:20 +0530 Subject: [PATCH] update supernode modules - Auto Disk cleanup - Data migration (node addition and node deletion) - Some minor bugs and typo fixes --- src/client.js | 3 +- src/database.js | 49 ++++++--- src/intra.js | 274 +++++++++++++++++++++++++++++++++++++++++++----- src/start.js | 46 +++++--- 4 files changed, 322 insertions(+), 50 deletions(-) diff --git a/src/client.js b/src/client.js index 974de33..cf9beed 100644 --- a/src/client.js +++ b/src/client.js @@ -96,7 +96,8 @@ function processTagFromUser(data) { .map(d => data[d]).join("|"); if (!floCrypto.verifySign(hashcontent, data.sign, data.pubKey)) return reject("Invalid signature"); - DB.tagData(closeNode, data.vectorClock, data.tag, data.time, data.pubKey, data.sign) + let tag = [null, undefined, ""].includes(data.tag) ? null : [data.tag].toString(); + DB.tagData(closeNode, data.vectorClock, tag, data.time, data.pubKey, data.sign) .then(result => resolve([result, 'TAG'])) .catch(error => reject(error)) }) diff --git a/src/database.js b/src/database.js index 7b2930e..f62d3bf 100644 --- a/src/database.js +++ b/src/database.js @@ -31,14 +31,14 @@ const H_struct = { SENDER_ID: "senderID", RECEIVER_ID: "receiverID", TYPE: "type", - APPLICATION: "application" + APPLICATION: "application", + TIME: "time", + PUB_KEY: "pubKey" } const B_struct = { - TIME: "time", MESSAGE: "message", SIGNATURE: "sign", - PUB_KEY: "pubKey", COMMENT: "comment" } @@ -155,7 +155,7 @@ function Database(user, password, dbname, host = 'localhost') { db.createTable = function(snID) { return new Promise((resolve, reject) => { - let statement = "CREATE TABLE IF NOT EXISTS " + snID + " ( " + + let statement = "CREATE TABLE IF NOT EXISTS _" + snID + " ( " + H_struct.VECTOR_CLOCK + " VARCHAR(50) NOT NULL, " + H_struct.SENDER_ID + " CHAR(34) NOT NULL, " + H_struct.RECEIVER_ID + " CHAR(34) NOT NULL, " + @@ -182,7 +182,7 @@ function Database(user, password, dbname, host = 'localhost') { db.dropTable = function(snID) { return new Promise((resolve, reject) => { - let statement = "DROP TABLE " + snID; + let statement = "DROP TABLE _" + snID; db.query(statement) .then(result => resolve(result)) .catch(error => reject(error)) @@ -193,7 +193,7 @@ function Database(user, password, dbname, host = 'localhost') { return new Promise((resolve, reject) => { let attr = Object.keys(H_struct).map(a => H_struct[a]); let values = attr.map(a => data[a]); - let statement = "INSERT INTO " + snID + + let statement = "INSERT INTO _" + snID + " (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " + "VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ")"; data = Object.fromEntries(attr.map((a, i) => [ @@ -217,7 +217,7 @@ function Database(user, password, dbname, host = 'localhost') { let attr = Object.keys(data); let values = attr.map(a => data[a]).concat(vectorClock); data[H_struct.VECTOR_CLOCK] = vectorClock; //also add vectorClock to resolve data - let statement = "UPDATE " + snID + + let statement = "UPDATE _" + snID + " SET " + attr.map(a => a + "=?").join(", ") + " WHERE " + H_struct.VECTOR_CLOCK + "=?"; db.query(statement, values) @@ -254,7 +254,7 @@ function Database(user, password, dbname, host = 'localhost') { //console.log(conditionArr); let attr = Object.keys(H_struct).concat(Object.keys(B_struct)) let statement = "SELECT (" + attr.join(", ") + ")" + - " FROM " + snID + + " FROM _" + snID + " WHERE " + conditionArr.join(" AND ") + request.mostRecent ? "LIMIT 1" : (" ORDER BY " + H_struct.VECTOR_CLOCK); db.query(statement) @@ -265,7 +265,7 @@ function Database(user, password, dbname, host = 'localhost') { db.lastLogTime = function(snID) { return new Promise((resolve, reject) => { - let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM " + snID; + let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM _" + snID; db.query(statement) .then(result => resolve(result)) .catch(error => reject(error)) @@ -284,7 +284,7 @@ function Database(user, password, dbname, host = 'localhost') { db.getData = function(snID, logtime) { return new Promise((resolve, reject) => { - let statement = "SELECT * FROM " + snID + + let statement = "SELECT * FROM _" + snID + " WHERE " + L_struct.LOG_TIME + ">=" + logtime + " ORDER BY " + L_struct.LOG_TIME; db.query(statement) @@ -299,7 +299,7 @@ function Database(user, password, dbname, host = 'localhost') { let attr = Object.keys(H_struct).map(a => H_struct[a]).concat(u_attr); let values = attr.map(a => data[a]); let u_values = u_attr.map(a => data[a]); - let statement = "INSERT INTO " + snID + + let statement = "INSERT INTO _" + snID + " (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " + "VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ") " + "ON DUPLICATE KEY UPDATE " + u_attr.map(a => a + "=?").join(", "); @@ -313,7 +313,7 @@ function Database(user, password, dbname, host = 'localhost') { return new Promise((resolve, reject) => { let attr = Object.keys(T_struct).map(a => T_struct[a]).concat(L_struct.LOG_TIME); let values = attr.map(a => data[a]); - let statement = "UPDATE " + snID + + let statement = "UPDATE _" + snID + " SET " + attr.map(a => a + "=?").join(", ") + " WHERE " + H_struct.VECTOR_CLOCK + "=" + data[H_struct.VECTOR_CLOCK]; db.query(statement, values) @@ -322,6 +322,31 @@ function Database(user, password, dbname, host = 'localhost') { }) } + db.clearAuthorisedAppData = function(snID, app, adminID, subAdmins, timestamp) { + return new Promise((resolve, reject) => { + let statement = "DELETE FROM _" + snID + + " WHERE ( " + H_struct.TIME + " "?").join(", ") + ") )"; + db.query(statement, [timestamp, app].concat(subAdmins).push(adminID)) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) + } + + db.clearUnauthorisedAppData = function(snID, appList, timestamp) { + return new Promise((resolve, reject) => { + let statement = "DELETE FROM _" + snID + + "WHERE " + H_struct.TIME + " "?").join(", ") + ")"; + db.query(statement, [timestamp].concat(appList)) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) + } + db.close = function() { db.conn.end(); } diff --git a/src/intra.js b/src/intra.js index 2485d36..e7a1b7f 100644 --- a/src/intra.js +++ b/src/intra.js @@ -6,7 +6,8 @@ const SUPERNODE_INDICATOR = '$', //Message type ORDER_BACKUP = "orderBackup", STORE_BACKUP_DATA = "backupData", - //STORE_MIGRATED_DATA = "migratedData", + STORE_MIGRATED_DATA = "migratedData", + DELETE_MIGRATED_DATA = "migratedDelete", //DELETE_BACKUP_DATA = "backupDelete", TAG_BACKUP_DATA = "backupTag", //EDIT_BACKUP_DATA = "backupEdit", @@ -14,9 +15,11 @@ const SUPERNODE_INDICATOR = '$', DATA_REQUEST = "dataRequest", DATA_SYNC = "dataSync", BACKUP_HANDSHAKE_INIT = "handshakeInitate", - BACKUP_HANDSHAKE_END = "handshakeEnd"; + BACKUP_HANDSHAKE_END = "handshakeEnd", + RETRY_TIMEOUT = 5 * 60 * 1000, //5 mins + MIGRATE_WAIT_DELAY = 5 * 60 * 1000; //5 mins -var DB //container for database +var DB, refresher //container for database and refresher //List of node backups stored const _list = {}; @@ -135,6 +138,7 @@ packet_.parse = function(str) { let packet = JSON.parse(str.substring(SUPERNODE_INDICATOR.length)) let curTime = Date.now(); if (packet.time > curTime - floGlobals.sn_config.delayDelta && + packet.from in floGlobals.supernodes && floCrypto.verifySign(this.s(packet), packet.sign, floGlobals.supernodes[packet.from].pubKey)) { if (!Array.isArray(packet.message)) packet.message = [packet.message]; @@ -187,6 +191,31 @@ function connectToNextNode() { }) } +function connectToAliveNodes(nodes = null) { + if (!Array.isArray(nodes)) nodes = Object.keys(floGlobals.supernodes); + return new Promise((resolve, reject) => { + Promise.allSettled(nodes.map(n => connectToNode(n))).then(results => { + let ws_connections = {}; + nodes.forEach((n, i) => + ws_connections[n] = (results.status === "fulfilled") ? results[i].value : null); + resolve(ws_connections); + }).catch(error => reject(error)) + }) +} + +//Connect to all given nodes [Array] (Default: All super-nodes) +function connectToAllActiveNodes(nodes = null) { + if (!Array.isArray(nodes)) nodes = Object.keys(floGlobals.supernodes); + return new Promise((resolve, reject) => { + Promise.allSettled(nodes.map(n => connectToActiveNode(n))).then(results => { + let ws_connections = {}; + nodes.forEach((n, i) => + ws_connections[n] = (results.status === "fulfilled") ? results[i].value : null); + resolve(ws_connections); + }).catch(error => reject(error)) + }) +} + //-----PROCESS TASKS----- //Tasks from next-node @@ -213,6 +242,8 @@ function processTaskFromNextNode(packet) { case STORE_BACKUP_DATA: storeBackupData(task.data) break; + default: + console.log("Invalid task type:" + task.type + "from next-node") } }) } @@ -242,9 +273,11 @@ function processTaskFromPrevNode(packet) { case DATA_SYNC: dataSyncIndication(task.id, task.status, from) break; - case INITIATE_REFRESH: //TODO - initiateRefresh() + case DELETE_MIGRATED_DATA: + deleteMigratedData(task.data, from, packet) break; + default: + console.log("Invalid task type:" + task.type + "from prev-node") } }); } @@ -262,11 +295,14 @@ function processTaskFromSupernode(packet, ws) { case BACKUP_HANDSHAKE_INIT: handshakeMid(from, ws) break; - /* - case STORE_MIGRATED_DATA: //TODO + case STORE_MIGRATED_DATA: storeMigratedData(task.data) break; - */ + case INITIATE_REFRESH: + initiateRefresh() + break; + default: + console.log("Invalid task type:" + task.type + "from super-node") } }); } @@ -276,7 +312,7 @@ function processTaskFromSupernode(packet, ws) { //Acknowledge handshake function handshakeMid(id, ws) { - if (_prevNode.id) { + if (_prevNode.id && _prevNode.id in floGlobals.supernodes) { if (kBucket.innerNodes(_prevNode.id, myFloID).includes(id)) { //close existing prev-node connection _prevNode.send(packet_.constuct({ @@ -319,24 +355,48 @@ function handshakeMid(id, ws) { }); if (!req_sync.length && !new_order.length) return; //No order change and no need for any data sync - Promise.all(req_sync.forEach(n => DB.createGetLastLog(n))).then(result => { - let tasks = []; - if (req_sync.length) { + else + handshakeMid.requestData(req_sync, new_order); +} + +handshakeMid.requestData = function(req_sync, new_order) { + if (handshakeMid.timeout) { + clearTimeout(handshakeMid.timeout) + delete handshakeMid.timeout; + } + Promise.allSettled(req_sync.map(n => DB.createGetLastLog(n))).then(result => { + let tasks = [], + lastlogs = {}, + failed = [], + order = [], + failed_order = []; + + req_sync.forEach((s, i) => { + if (result[i].status === "fulfilled") + lastlogs[s] = result[i].value; + else + failed.push(s) + }); + if (Object.keys(lastlogs).length) tasks.push({ type: DATA_REQUEST, - nodes: Object.fromEntries(req_sync.map((k, i) => [k, result[i]])) - }) - } - if (new_order.length) { + nodes: lastlogs + }); + new_order.forEach(n => { + if (failed.includes(n)) + failed_order.push(n) + else + order.push(n) + }); + if (order.length) tasks.push({ type: ORDER_BACKUP, - order: _list.get(new_order) + order: _list.get(order) }) - } _nextNode.send(packet_.constuct(tasks)); - }).catch(error => { - FATAL.RECONNECTION_REQUIRED //TODO - }) + if (failed.length) + handshakeMid.timeout = setTimeout(_ => handshakeMid.requestData(failed, failed_order), RETRY_TIMEOUT) + }); } //Complete handshake @@ -444,6 +504,33 @@ function tagBackupData(data, from, packet) { } } +//Store (migrated) data +function storeMigratedData(data) { + let closestNode = kBucket.closestNode(data.receiverID); + if (_list.serving.includes(closestNode)) { + DB.storeData(closestNode, data); + _nextNode.send(packet_.constuct({ + type: STORE_BACKUP_DATA, + data: data + })); + } +} + +//Delete (migrated) data +function deleteMigratedData(old_sn, vectorClock, receiverID, from, packet) { + let closestNode = kBucket.closestNode(receiverID); + if (old_sn !== closestNode && _list.stored.includes(old_sn)) { + DB.deleteData(old_sn, vectorClock); + if (_list[old_sn] < floGlobals.sn_config.backupDepth && + _nextNode.id !== from) + _nextNode.send(packet); + } +} + +function initiateRefresh() { + refresher.invoke(false) +} + //Forward incoming to next node function forwardToNextNode(mode, data) { var modeMap = { @@ -457,9 +544,143 @@ function forwardToNextNode(mode, data) { })); } -function dataMigration(node_change){ - console.log("data migration") - //TODO +//Data migration processor +function dataMigration(node_change, flag) { + if (!Object.keys(node_change)) + return; + console.log("Node list changed! Data migration required"); + if (flag) dataMigration.intimateAllNodes(); //Initmate All nodes to call refresher + let new_nodes = [], + del_nodes = []; + for (let n in node_change) + (node_change[n] ? new_nodes : del_nodes).push(n); + if (del_nodes.includes(_prevNode.id)) { + _list[_prevNode.id] = 0; //Temporary serve for the deleted node + _prevNode.close(); + } + setTimeout(() => { + //reconnect next node if current next node is deleted + if (del_nodes.includes(_nextNode.id)) + reconnectNextNode(); + else { //reconnect next node if there are newly added nodes in between self and current next node + let innerNodes = kBucket.innerNodes(myFloID, _nextNode.id) + if (new_nodes.filter(n => innerNodes.includes(n)).length) + reconnectNextNode(); + } + setTimeout(() => { + dataMigration.process_new(new_nodes); + dataMigration.process_del(del_nodes); + }, MIGRATE_WAIT_DELAY); + }, MIGRATE_WAIT_DELAY) +} + +//data migration sub-process: Deleted nodes +dataMigration.process_del = async function(del_nodes) { + if (!del_nodes.length) + return; + let process_nodes = del_nodes.filter(n => _list.serving.includes(n)) + if (process_nodes.length) { + connectToAllActiveNodes().then(ws_connections => { + let remaining = process_nodes.length; + process_nodes.forEach(n => { + DB.getData(n, 0).then(result => { + console.info(`START: Data migration for ${n}`); + //TODO: efficiently handle large number of data instead of loading all into memory + result.forEach(d => { + let closest = kBucket.closestNode(d.receiverID); + if (_list.serving.includes(closest)) { + DB.storeData(closest, d); + _nextNode.send(packet_.constuct({ + type: STORE_BACKUP_DATA, + data: d + })); + } else + ws_connections[closest].send(packet_.constuct({ + type: STORE_MIGRATED_DATA, + data: d + })); + }) + console.info(`END: Data migration for ${n}`); + _list.delete(n); + DB.dropTable(n); + remaining--; + }).catch(error => reject(error)) + }); + const interval = setInterval(() => { + if (remaining <= 0) { + for (let c in ws_connections) + if (ws_connections[c]) + ws_connections[c].close() + clearInterval(interval); + } + }, RETRY_TIMEOUT); + }).catch(error => reject(error)) + } + del_nodes.forEach(n => { + if (!process_nodes.includes(n) && _list.stored.includes(n)) { + _list.delete(n); + DB.dropTable(n); + } + }) +} + +//data migration sub-process: Added nodes +dataMigration.process_new = async function(new_nodes) { + if (!new_nodes.length) + return; + connectToAllActiveNodes(new_nodes).then(ws_connections => { + let process_nodes = _list.serving, + remaining = process_nodes.length; + process_nodes.forEach(n => { + DB.getData(n, 0).then(result => { + //TODO: efficiently handle large number of data instead of loading all into memory + result.forEach(d => { + let closest = kBucket.closestNode(d.receiverID); + if (new_nodes.includes(closest)) { + if (_list.serving.includes(closest)) { + DB.storeData(closest, d); + _nextNode.send(packet_.constuct({ + type: STORE_BACKUP_DATA, + data: d + })); + } else + ws_connections[closest].send(packet_.constuct({ + type: STORE_MIGRATED_DATA, + data: d + })); + _nextNode.send(packet_.constuct({ + type: DELETE_MIGRATED_DATA, + vectorClock: d.vectorClock, + receiverID: d.receiverID, + snID: n + })); + } + }) + remaining--; + }).catch(error => reject(error)) + }); + const interval = setInterval(() => { + if (remaining <= 0) { + for (let c in ws_connections) + if (ws_connections[c]) + ws_connections[c].close() + clearInterval(interval); + } + }, RETRY_TIMEOUT); + }).catch(error => reject(error)) +} + +dataMigration.intimateAllNodes = function() { + connectToAliveNodes().then(ws_connections => { + let packet = packet_.constuct({ + type: INITIATE_REFRESH + }) + for (let n in ws_connections) + if (ws_connections[n]) { + ws_connections[n].send(packet) + ws_connections[n].close() + } + }).catch(error => reject(error)) } //-----EXPORTS----- @@ -470,7 +691,10 @@ module.exports = { dataMigration, SUPERNODE_INDICATOR, _list, - set DB(db){ + set DB(db) { DB = db + }, + set refresher(r) { + refresher = r } } \ No newline at end of file diff --git a/src/start.js b/src/start.js index ae2c132..04028ce 100644 --- a/src/start.js +++ b/src/start.js @@ -34,6 +34,7 @@ function startNode() { //Start Server const server = new Server(config["port"], client, intra); server.refresher = refreshData; + intra.refresher = refreshData; }).catch(error => reject(error)) }).catch(error => reject(error)) } @@ -51,13 +52,13 @@ function loadBase(DB) { const refreshData = { count: null, base: null, - invoke() { + invoke(flag = true) { this.count = floGlobals.sn_config.refreshDelay; - refreshBlockchainData(this.base).then(result => { + refreshBlockchainData(this.base, flag).then(result => { console.log(result) - diskCleanUp() + diskCleanUp(this.base) .then(result => console.info(result)) - .catch(error => console.error(error)) + .catch(warn => console.warn(warn)) }).catch(error => console.error(error)) }, get countdown() { @@ -67,22 +68,22 @@ const refreshData = { } } -function refreshBlockchainData(base) { +function refreshBlockchainData(base, flag) { return new Promise((resolve, reject) => { - readSupernodeConfigFromAPI(base).then(result => { + readSupernodeConfigFromAPI(base, flag).then(result => { console.log(result) kBucket.launch().then(result => { console.log(result) readAppSubAdminListFromAPI(base) .then(result => console.log(result)) - .catch(error => console.warn(error)) + .catch(warn => console.warn(warn)) .finally(_ => resolve("Refreshed Data from blockchain")) }).catch(error => reject(error)) }).catch(error => reject(error)) }) } -function readSupernodeConfigFromAPI(base) { +function readSupernodeConfigFromAPI(base, flag) { return new Promise((resolve, reject) => { floBlockchainAPI.readData(floGlobals.SNStorageID, { ignoreOld: base.lastTx[floGlobals.SNStorageID], @@ -140,7 +141,7 @@ function readSupernodeConfigFromAPI(base) { }); //Process data migration if nodes are changed if (Object.keys(node_change)) - intra.dataMigration(node_change) + intra.dataMigration(node_change, flag) resolve('Updated Supernode Configuration'); }).catch(error => reject(error)) }) @@ -177,7 +178,7 @@ function readAppSubAdminListFromAPI(base) { })); } return new Promise((resolve, reject) => { - Promise.allSettled(results => { + Promise.allSettled(promises).then(results => { if (results.reduce((a, r) => r.status === "rejected" ? ++a : a, 0)) { let error = Object.fromEntries(results.filter(r => r.status === "rejected").map(r => r.reason)); console.error(JSON.stringify(error)); @@ -188,8 +189,29 @@ function readAppSubAdminListFromAPI(base) { }) } -function diskCleanUp(){ - //TODO: Clear all unauthorised data from before deleteDelay +function diskCleanUp(base) { + return new Promise((resolve, reject) => { + let time = Date.now() - base.sn_config.deleteDelay, + promises = []; + + intra._list.serving.forEach(sn => { + //delete all when app is not authorised. + promises.push(DB.clearUnauthorisedAppData(sn, Object.keys(base.appList), time)); + //for each authorised app: delete unofficial data (untaged, unknown sender/receiver) + for (let app in base.appList) + promises.push(DB.clearAuthorisedAppData(sn, app, base.appList[app], base.subAdmins[app], time)); + }) + + Promise.allSettled(promises).then(results => { + let failed = results.filter(r => r.status === "rejected").map(r => r.reason) + if (failed.length) { + console.error(JSON.stringify(failed)); + let success = results.length - failed.length; + reject(`Disk clean-up process has failed at ${100 * success/results.length}%. (Success:${success}|Failed:${failed.count})`) + } else + resolve("Disk clean-up process finished successfully (100%)"); + }).catch(error => reject(error)) + }) } module.exports = startNode; \ No newline at end of file