From 50c820310a51511323ab2f63b1a4e3485b73d8f0 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 6 Aug 2021 04:42:39 +0530 Subject: [PATCH] Bug fixes --- src/database.js | 46 +++++++++++++++------ src/intra.js | 106 +++++++++++++++++++++++++++++------------------- src/main.js | 36 +++++++++++++--- 3 files changed, 130 insertions(+), 58 deletions(-) diff --git a/src/database.js b/src/database.js index ff4da33..7f94a14 100644 --- a/src/database.js +++ b/src/database.js @@ -197,11 +197,15 @@ function Database(user, password, dbname, host = 'localhost') { db.addData = function(snID, data) { return new Promise((resolve, reject) => { - let attr = Object.keys(H_struct).map(a => H_struct[a]).concat(Object.keys(B_struct).map(a => B_struct[a])); + data[L_struct.STATUS] = 1; + data[L_struct.LOG_TIME] = Date.now(); + let attr = Object.keys(H_struct).map(a => H_struct[a]) + .concat(Object.keys(B_struct).map(a => B_struct[a])) + .concat(Object.keys(L_struct).map(a => L_struct[a])); let values = attr.map(a => data[a]); let statement = "INSERT INTO _" + snID + - " (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " + - "VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ")"; + " (" + attr.join(", ") + ") " + + "VALUES (" + attr.map(a => '?').join(", ") + ")"; data = Object.fromEntries(attr.map((a, i) => [a, values[i]])); db.query(statement, values) .then(result => resolve(data)) @@ -298,15 +302,19 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.storeData = function(snID, data) { + db.storeData = function(snID, data, updateLogTime = false) { return new Promise((resolve, reject) => { - let u_attr = Object.keys(B_struct).map(a => B_struct[a]); + if (updateLogTime) + data[L_struct.LOG_TIME] = Date.now(); + let u_attr = Object.keys(B_struct).map(a => B_struct[a]) + .concat(Object.keys(L_struct).map(a => L_struct[a])) + .concat(Object.keys(T_struct).map(a => T_struct[a])); let attr = Object.keys(H_struct).map(a => H_struct[a]).concat(u_attr); let values = attr.map(a => data[a]); let u_values = u_attr.map(a => data[a]); let statement = "INSERT INTO _" + snID + - " (" + attr.join(", ") + ", " + L_struct.STATUS + ", " + L_struct.LOG_TIME + ") " + - "VALUES (" + attr.map(a => '?').join(", ") + ", 1, " + Date.now() + ") " + + " (" + attr.join(", ") + ") " + + "VALUES (" + attr.map(a => '?').join(", ") + ") " + "ON DUPLICATE KEY UPDATE " + u_attr.map(a => a + "=?").join(", "); db.query(statement, values.concat(u_values)) .then(result => resolve(data)) @@ -327,14 +335,26 @@ function Database(user, password, dbname, host = 'localhost') { }); }; + db.deleteData = function(snID, vectorClock) { + return new Promise((resolve, reject) => { + let statement = "DELETE FROM _" + snID + + " WHERE " + H_struct.VECTOR_CLOCK + "=?"; + db.query(statement, [vectorClock]) + .then(result => resolve(result)) + .catch(error => reject(error)); + }); + } + db.clearAuthorisedAppData = function(snID, app, adminID, subAdmins, timestamp) { return new Promise((resolve, reject) => { let statement = "DELETE FROM _" + snID + " WHERE ( " + H_struct.TIME + " "?").join(", ") + ") )"; + T_struct.TAG + " IS NULL )" + + (subAdmins.length ? " AND ( " + + H_struct.RECEIVER_ID + " != ? OR " + + H_struct.SENDER_ID + " NOT IN (" + subAdmins.map(a => "?").join(", ") + ") )" : + ""); db.query(statement, [timestamp, app].concat(subAdmins).push(adminID)) .then(result => resolve(result)) .catch(error => reject(error)); @@ -344,8 +364,10 @@ function Database(user, password, dbname, host = 'localhost') { db.clearUnauthorisedAppData = function(snID, appList, timestamp) { return new Promise((resolve, reject) => { let statement = "DELETE FROM _" + snID + - "WHERE " + H_struct.TIME + " "?").join(", ") + ")"; + " WHERE " + H_struct.TIME + " "?").join(", ") + ")" : + ""); db.query(statement, [timestamp].concat(appList)) .then(result => resolve(result)) .catch(error => reject(error)); diff --git a/src/intra.js b/src/intra.js index fee5d19..7b1d27b 100644 --- a/src/intra.js +++ b/src/intra.js @@ -145,7 +145,8 @@ packet_.parse = function(str) { if (!Array.isArray(packet.message)) packet.message = [packet.message]; return packet; - }; + } else + return false; } catch (error) { console.error(str, error); return false; @@ -184,26 +185,35 @@ function connectToActiveNode(snID, reverse = false) { }; //Connect to next available node -function connectToNextNode() { +function connectToNextNode(curNode = myFloID) { return new Promise((resolve, reject) => { - let nextNodeID = kBucket.nextNode(myFloID); - connectToActiveNode(nextNodeID).then(ws => { + if (curNode === myFloID && !(myFloID in floGlobals.supernodes)) + return reject(`This (${myFloID}) is not a supernode`); + let nextNodeID = kBucket.nextNode(curNode); + if (nextNodeID === myFloID) + return reject("No other node online"); + connectToNode(nextNodeID).then(ws => { _nextNode.set(nextNodeID, ws); _nextNode.send(packet_.construct({ type: BACKUP_HANDSHAKE_INIT })); resolve("BACKUP_HANDSHAKE_INIT: " + nextNodeID); - }).catch(error => reject(error)); + }).catch(error => { + connectToNextNode(nextNodeID) + .then(result => resolve(result)) + .catch(error => reject(error)); + }); }); }; function connectToAliveNodes(nodes = null) { if (!Array.isArray(nodes)) nodes = Object.keys(floGlobals.supernodes); + nodes = nodes.filter(n => n !== myFloID); return new Promise((resolve, reject) => { Promise.allSettled(nodes.map(n => connectToNode(n))).then(results => { let ws_connections = {}; nodes.forEach((n, i) => - ws_connections[n] = (results.status === "fulfilled") ? results[i].value : null); + ws_connections[n] = (results[i].status === "fulfilled") ? results[i].value : null); resolve(ws_connections); }); }); @@ -216,7 +226,7 @@ function connectToAllActiveNodes(nodes = null) { Promise.allSettled(nodes.map(n => connectToActiveNode(n))).then(results => { let ws_connections = {}; nodes.forEach((n, i) => - ws_connections[n] = (results.status === "fulfilled") ? results[i].value : null); + ws_connections[n] = (results[i].status === "fulfilled") ? results[i].value : null); resolve(ws_connections); }); }); @@ -427,7 +437,7 @@ function reconnectNextNode() { .then(result => console.log(result)) .catch(error => { //Case: No other node is online - console.info("No other node online"); + console.info(error); //Serve all nodes for (let sn in floGlobals.supernodes) DB.createTable(sn) @@ -534,7 +544,7 @@ function dataSyncIndication(snID, status, from) { function storeBackupData(data, from, packet) { let closestNode = kBucket.closestNode(data.receiverID); if (_list.stored.includes(closestNode)) { - DB.storeData(closestNode, data); + DB.storeData(closestNode, data).then(_ => null).catch(e => console.error(e)); if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from) _nextNode.send(packet); }; @@ -544,7 +554,7 @@ function storeBackupData(data, from, packet) { function tagBackupData(data, from, packet) { let closestNode = kBucket.closestNode(data.receiverID); if (_list.stored.includes(closestNode)) { - DB.storeTag(closestNode, data); + DB.storeTag(closestNode, data).then(_ => null).catch(e => console.error(e)); if (_list[closestNode] < floGlobals.sn_config.backupDepth && _nextNode.id !== from) _nextNode.send(packet); }; @@ -554,7 +564,7 @@ function tagBackupData(data, from, packet) { function storeMigratedData(data) { let closestNode = kBucket.closestNode(data.receiverID); if (_list.serving.includes(closestNode)) { - DB.storeData(closestNode, data); + DB.storeData(closestNode, data, true).then(_ => null).catch(e => console.error(e)); _nextNode.send(packet_.construct({ type: STORE_BACKUP_DATA, data: data @@ -566,14 +576,14 @@ function storeMigratedData(data) { function deleteMigratedData(data, from, packet) { let closestNode = kBucket.closestNode(data.receiverID); if (data.snID !== closestNode && _list.stored.includes(data.snID)) { - DB.deleteData(data.snID, data.vectorClock); + DB.deleteData(data.snID, data.vectorClock).then(_ => null).catch(e => console.error(e)); if (_list[data.snID] < floGlobals.sn_config.backupDepth && _nextNode.id !== from) _nextNode.send(packet); }; }; function initiateRefresh() { - refresher.invoke(false).then(_ => null).catch(_ => null); + refresher.invoke(false).then(_ => null).catch(e => console.error(e)); }; //Forward incoming to next node @@ -599,19 +609,29 @@ function dataMigration(node_change, flag) { del_nodes = []; for (let n in node_change) (node_change[n] ? new_nodes : del_nodes).push(n); - if (del_nodes.includes(_prevNode.id)) { + if (_prevNode.id && del_nodes.includes(_prevNode.id)) { _list[_prevNode.id] = 0; //Temporary serve for the deleted node _prevNode.close(); }; setTimeout(() => { //reconnect next node if current next node is deleted - if (del_nodes.includes(_nextNode.id)) - reconnectNextNode(); - else { //reconnect next node if there are newly added nodes in between self and current next node - let innerNodes = kBucket.innerNodes(myFloID, _nextNode.id); - if (new_nodes.filter(n => innerNodes.includes(n)).length) + if (_nextNode.id) { + if (del_nodes.includes(_nextNode.id)) reconnectNextNode(); + else { //reconnect next node if there are newly added nodes in between self and current next node + let innerNodes = kBucket.innerNodes(myFloID, _nextNode.id); + if (new_nodes.filter(n => innerNodes.includes(n)).length) + reconnectNextNode(); + }; + } else if (!_prevNode.id) { + //no other nodes online: server all new nodes too + new_nodes.forEach(n => { + DB.createTable(n) + .then(result => _list[n] = 0) + .catch(error => console.error(error)); + }); }; + setTimeout(() => { dataMigration.process_new(new_nodes); dataMigration.process_del(del_nodes); @@ -634,11 +654,12 @@ dataMigration.process_del = async function(del_nodes) { result.forEach(d => { let closest = kBucket.closestNode(d.receiverID); if (_list.serving.includes(closest)) { - DB.storeData(closest, d); - _nextNode.send(packet_.construct({ - type: STORE_BACKUP_DATA, - data: d - })); + DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e)); + if (_nextNode.id) + _nextNode.send(packet_.construct({ + type: STORE_BACKUP_DATA, + data: d + })); } else ws_connections[closest].send(packet_.construct({ type: STORE_MIGRATED_DATA, @@ -647,9 +668,9 @@ dataMigration.process_del = async function(del_nodes) { }); console.log(`END: Data migration for ${n}`); _list.delete(n); - DB.dropTable(n); + DB.dropTable(n).then(_ => null).catch(e => console.error(e)); remaining--; - }).catch(error => reject(error)); + }).catch(error => console.error(error)); }); const interval = setInterval(() => { if (remaining <= 0) { @@ -664,7 +685,7 @@ dataMigration.process_del = async function(del_nodes) { del_nodes.forEach(n => { if (!process_nodes.includes(n) && _list.stored.includes(n)) { _list.delete(n); - DB.dropTable(n); + DB.dropTable(n).then(_ => null).catch(e => console.error(e)); }; }); }; @@ -683,28 +704,31 @@ dataMigration.process_new = async function(new_nodes) { let closest = kBucket.closestNode(d.receiverID); if (new_nodes.includes(closest)) { if (_list.serving.includes(closest)) { - DB.storeData(closest, d); - _nextNode.send(packet_.construct({ - type: STORE_BACKUP_DATA, - data: d - })); + DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e)); + if (_nextNode.id) + _nextNode.send(packet_.construct({ + type: STORE_BACKUP_DATA, + data: d + })); } else ws_connections[closest].send(packet_.construct({ type: STORE_MIGRATED_DATA, data: d })); - _nextNode.send(packet_.construct({ - type: DELETE_MIGRATED_DATA, - data: { - vectorClock: d.vectorClock, - receiverID: d.receiverID, - snID: n - } - })); + DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e)); + if (_nextNode.id) + _nextNode.send(packet_.construct({ + type: DELETE_MIGRATED_DATA, + data: { + vectorClock: d.vectorClock, + receiverID: d.receiverID, + snID: n + } + })); }; }); remaining--; - }).catch(error => reject(error)); + }).catch(error => console.error(error)); }); const interval = setInterval(() => { if (remaining <= 0) { diff --git a/src/main.js b/src/main.js index bbbe930..bf3a5d7 100644 --- a/src/main.js +++ b/src/main.js @@ -34,8 +34,8 @@ function startNode() { floGlobals.appList = base.appList; floGlobals.appSubAdmins = base.appSubAdmins; refreshData.base = base; - refreshData.invoke() - .then(_ => intra.reconnectNextNode()).catch(_ => null) + refreshData.invoke(null) + .then(_ => intra.reconnectNextNode()).catch(_ => null); //Start Server const server = new Server(config["port"], client, intra); server.refresher = refreshData; @@ -59,10 +59,10 @@ const refreshData = { base: null, invoke(flag = true) { return new Promise((resolve, reject) => { - this.count = floGlobals.sn_config.refreshDelay; console.info("Refresher processor has started at " + Date()); refreshBlockchainData(this.base, flag).then(result => { console.log(result); + this.count = floGlobals.sn_config.refreshDelay; diskCleanUp(this.base) .then(result => console.log(result)) .catch(warn => console.warn(warn)) @@ -153,8 +153,13 @@ function readSupernodeConfigFromAPI(base, flag) { console.warn("Some data might not have been saved in database correctly"); }); //Process data migration if nodes are changed - if (Object.keys(node_change).length) - intra.dataMigration(node_change, flag); + if (Object.keys(node_change).length) { + if (flag === null) + selfDiskMigration(node_change); //When node starts for the 1st time after been inactive, but migration has already taken place. + else + intra.dataMigration(node_change, flag); + } + resolve('Updated Supernode Configuration'); }).catch(error => reject(error)); }); @@ -227,4 +232,25 @@ function diskCleanUp(base) { }); }; +function selfDiskMigration(node_change) { + DB.query("SHOW TABLES").then(result => { + const disks = []; + for (let i in result) + for (let j in result[i]) + if (result[i][j].startsWith("_")) + disks.push(result[i][j].split("_")[1]); + disks.forEach(n => { + if (node_change[n] === false) + DB.dropTable(n).then(_ => null).catch(_ => null); + DB.getData(n, 0).then(result => { + result.forEach(d => { + let closest = kBucket.closestNode(d.receiverID); + if (closest !== n) + DB.deleteData(n, d.vectorClock).then(_ => null).catch(_ => null); + }); + }).catch(error => console.error(error)); + }); + }).catch(error => console.error(error)); +}; + module.exports = startNode; \ No newline at end of file