From 6b69469477b2577b58ba665454f1c108abf18383 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Mon, 26 Jul 2021 05:31:54 +0530 Subject: [PATCH] Bug fixes - Changed some console.log to console.debug (Also added few more). - Fixed: DB.lastLogTime not returning correct value - Fixed: MySQL disconnection bug. Now using Pool connect for persistent connection. - Fixed: connectNode not connecting properly due to a typo. - Fixed: RECONNECT_NEXT_NODE const value not added - Fixed: reconnectNextNode not triggered when only one node was online and a node comes online. - Fixed: orderBackup not storing the order properly when less or just sufficient nodes available. - Added: orderBackup to requesting data when sync is required - Fixed: typo in sendStoredData - Added: logInterval of _prevNode, _nextNode and _list --- src/client.js | 4 +-- src/database.js | 47 ++++++++++++++++++++---------- src/intra.js | 77 +++++++++++++++++++++++++++++++++++++++---------- src/server.js | 5 ++-- 4 files changed, 99 insertions(+), 34 deletions(-) diff --git a/src/client.js b/src/client.js index 35548a6..8982086 100644 --- a/src/client.js +++ b/src/client.js @@ -28,10 +28,10 @@ function processIncomingData(data) { else return reject("Invalid Data-format"); process.then(result => { - console.log(result); + console.debug(result); resolve(result); }).catch(error => { - console.error(error); + console.debug(error); reject(error); }); }; diff --git a/src/database.js b/src/database.js index 61771c9..1ea4ff4 100644 --- a/src/database.js +++ b/src/database.js @@ -56,11 +56,6 @@ const T_struct = { function Database(user, password, dbname, host = 'localhost') { const db = {}; - db.query = (s, v) => new Promise((res, rej) => { - //console.log("\nSQL:",s, v); - const fn = ((e, r) => e ? rej(e) : res(r)); - v ? db.conn.query(s, v, fn) : db.conn.query(s, fn); - }); db.createBase = function() { return new Promise((resolve, reject) => { @@ -274,9 +269,10 @@ function Database(user, password, dbname, host = 'localhost') { db.lastLogTime = function(snID) { return new Promise((resolve, reject) => { - let statement = "SELECT MAX(" + L_struct.LOG_TIME + ") FROM _" + snID; + let attr = "MAX(" + L_struct.LOG_TIME + ")"; + let statement = "SELECT " + attr + " FROM _" + snID; db.query(statement) - .then(result => resolve(result)) + .then(result => resolve(result[0][attr] || 0)) .catch(error => reject(error)); }); }; @@ -356,22 +352,43 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.close = function() { - db.conn.end(); - }; + Object.defineProperty(db, "connect", { + get: () => new Promise((resolve, reject) => { + db.pool.getConnection((error, conn) => { + if (error) + reject(error); + else + resolve(conn); + }); + }) + }); + + Object.defineProperty(db, "query", { + value: (sql, values) => new Promise((resolve, reject) => { + db.connect.then(conn => { + const fn = (err, res) => { + conn.release(); + (err ? reject(err) : resolve(res)); + }; + if (values) + conn.query(sql, values, fn); + else + conn.query(sql, fn); + }).catch(error => reject(error)); + }) + }); return new Promise((resolve, reject) => { - let conn = mysql.createConnection({ + db.pool = mysql.createPool({ host: host, user: user, password: password, database: dbname }); - conn.connect((err) => { - if (err) return reject(err); - db.conn = conn; + db.connect.then(conn => { + conn.release(); resolve(db); - }); + }).catch(error => reject(error)); }); }; diff --git a/src/intra.js b/src/intra.js index ab8f9ba..3aa760e 100644 --- a/src/intra.js +++ b/src/intra.js @@ -16,6 +16,7 @@ const SUPERNODE_INDICATOR = '$', DATA_SYNC = "dataSync", BACKUP_HANDSHAKE_INIT = "handshakeInitate", BACKUP_HANDSHAKE_END = "handshakeEnd", + RECONNECT_NEXT_NODE = "reconnectNextNode", RETRY_TIMEOUT = 5 * 60 * 1000, //5 mins MIGRATE_WAIT_DELAY = 5 * 60 * 1000; //5 mins @@ -153,7 +154,7 @@ function connectToNode(snID) { return new Promise((resolve, reject) => { if (!(snID in floGlobals.supernodes)) return reject(`${snID} is not a supernode`); - const ws = new WebSocket("wss://" + floGlobals.supernodes[nextNodeID].uri + "/"); + const ws = new WebSocket("wss://" + floGlobals.supernodes[snID].uri + "/"); ws.on("error", () => reject(`${snID} is offline`)); ws.on('open', () => resolve(ws)); }); @@ -220,6 +221,7 @@ function connectToAllActiveNodes(nodes = null) { //Tasks from next-node function processTaskFromNextNode(packet) { + console.debug("_nextNode: ", packet); var { from, message @@ -251,6 +253,7 @@ function processTaskFromNextNode(packet) { //Tasks from prev-node function processTaskFromPrevNode(packet) { + console.debug("_prevNode: ", packet); var { from, message @@ -285,6 +288,7 @@ function processTaskFromPrevNode(packet) { //Tasks from any supernode function processTaskFromSupernode(packet, ws) { + console.debug("superNode: ", packet); var { from, message @@ -338,6 +342,8 @@ function handshakeMid(id, ws) { type: BACKUP_HANDSHAKE_END })); }; + if (!_nextNode.id) + reconnectNextNode(); //Reorder storelist let nodes = kBucket.innerNodes(_prevNode.id, myFloID).concat(myFloID), req_sync = [], @@ -410,7 +416,7 @@ function handshakeEnd() { //Reconnect to next available node function reconnectNextNode() { - if(_nextNode.ws) + if (_nextNode.ws) _nextNode.close(); connectToNextNode() .then(result => console.log(result)) @@ -429,31 +435,67 @@ function reconnectNextNode() { //Order the stored backup function orderBackup(order) { - let new_order = []; - let cur_serve = kBucket.innerNodes(_prevNode.id, myFloID); + let new_order = [], + req_sync = []; + let cur_serve = kBucket.innerNodes(_prevNode.id, myFloID).concat(myFloID); for (let n in order) { - if (order[n] + 1 !== _list[n]) { + if (!cur_serve.includes(n) && order[n] + 1 !== _list[n]) { if (order[n] >= floGlobals.sn_config.backupDepth) DB.dropTable(n).then(_ => null) .catch(error => console.error(error)) .finally(_ => _list.delete(n)); - else if (_list[n] !== 0 || !cur_serve.includes(n)) { + else { + if (_list[n] === undefined) + req_sync.push(n); _list[n] = order[n] + 1; new_order.push(n); }; }; }; - if (new_order.length) { - _nextNode.send(packet_.constuct({ - type: ORDER_BACKUP, - order: _list.get(new_order) - })); - }; + if (!req_sync.length && !new_order.length) + return; //No order change and no need for any data sync + else + orderBackup.requestData(req_sync, new_order); +}; + +orderBackup.requestData = function(req_sync, new_order) { + Promise.allSettled(req_sync.map(n => DB.createGetLastLog(n))).then(result => { + let + lastlogs = {}, + failed = [], + order = [], + failed_order = []; + + req_sync.forEach((s, i) => { + if (result[i].status === "fulfilled") + lastlogs[s] = result[i].value; + else + failed.push(s); + }); + if (Object.keys(lastlogs).length) + _prevNode.send(packet_.constuct({ + type: DATA_REQUEST, + nodes: lastlogs + })); + new_order.forEach(n => { + if (failed.includes(n)) + failed_order.push(n); + else + order.push(n); + }); + if (order.length) //TODO: maybe should wait for sync to finish? + _nextNode.send(packet_.constuct({ + type: ORDER_BACKUP, + order: _list.get(order) + })); + if (failed.length) + setTimeout(_ => orderBackup.requestData(failed, failed_order), RETRY_TIMEOUT); + }); }; //Send stored data function sendStoredData(lastlogs, node) { - for (n in lastlogs) { + for (let n in lastlogs) { if (_list.stored.includes(n)) { DB.getData(n, lastlogs[n]).then(result => { node.send(packet_.constuct({ @@ -461,13 +503,13 @@ function sendStoredData(lastlogs, node) { id: n, status: true })); - console.log(`START: ${snID} data sync(send) to ${node.id}`); + console.log(`START: ${n} data sync(send) to ${node.id}`); //TODO: efficiently handle large number of data instead of loading all into memory result.forEach(d => node.send(packet_.constuct({ type: STORE_BACKUP_DATA, data: d }))); - console.log(`END: ${snID} data sync(send) to ${node.id}`); + console.log(`END: ${n} data sync(send) to ${node.id}`); node.send(packet_.constuct({ type: DATA_SYNC, id: n, @@ -681,12 +723,17 @@ dataMigration.intimateAllNodes = function() { }); }; +const logInterval = setInterval(() => { + console.debug(_prevNode.id, _nextNode.id, _list.get()); +}, RETRY_TIMEOUT); + //-----EXPORTS----- module.exports = { reconnectNextNode, processTaskFromSupernode, forwardToNextNode, dataMigration, + logInterval, SUPERNODE_INDICATOR, _list, set DB(db) { diff --git a/src/server.js b/src/server.js index 5eca981..76290db 100644 --- a/src/server.js +++ b/src/server.js @@ -13,7 +13,7 @@ module.exports = function Server(port, client, intra) { let u = url.parse(req.url, true); if (!u.search) return res.end(""); - console.log("GET:", u.search); + console.debug("GET:", u.search); client.processRequestFromUser(u.query) .then(result => res.end(JSON.stringify(result[0]))) .catch(error => res.end(error.toString())); @@ -22,7 +22,7 @@ module.exports = function Server(port, client, intra) { let data = ''; req.on('data', chunk => data += chunk); req.on('end', () => { - console.log("POST:", data); + console.debug("POST:", data); //process the data storing client.processIncomingData(data).then(result => { res.end(JSON.stringify(result[0])); @@ -50,6 +50,7 @@ module.exports = function Server(port, client, intra) { if (message.startsWith(intra.SUPERNODE_INDICATOR)) intra.processTaskFromSupernode(message, ws); else { + console.debug("WS: ", message); var request = JSON.parse(message); client.processRequestFromUser(request) .then(result => {