From ee8089f858112cbb72e5c0218ba2fd3f8e5f934d Mon Sep 17 00:00:00 2001 From: sairajzero Date: Wed, 30 Nov 2022 03:50:12 +0530 Subject: [PATCH] Improvement: streamed DB.readAllData - DB.readAllData is now a streamed function (readlAllDataStream) - Large number data from database can be processed without any issue --- src/database.js | 75 ++++++++++++++++++---------- src/intra.js | 126 ++++++++++++++++++++++++------------------------ src/main.js | 12 ++--- 3 files changed, 117 insertions(+), 96 deletions(-) diff --git a/src/database.js b/src/database.js index 02e2f19..0af1d00 100644 --- a/src/database.js +++ b/src/database.js @@ -65,7 +65,7 @@ const F_struct = { function Database(user, password, dbname, host = 'localhost') { const db = {}; - db.createBase = function() { + db.createBase = function () { return new Promise((resolve, reject) => { let statements = []; for (let t in Base_Tables) @@ -77,7 +77,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.setLastTx = function(id, n) { + db.setLastTx = function (id, n) { return new Promise((resolve, reject) => { let statement = "INSERT INTO LastTxs (ID, N) VALUES (?, ?)" + " ON DUPLICATE KEY UPDATE N=?"; @@ -87,7 +87,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.setConfig = function(name, value) { + db.setConfig = function (name, value) { return new Promise((resolve, reject) => { let statement = "INSERT INTO Configs (NAME, VAL) VALUES (?, ?)" + " ON DUPLICATE KEY UPDATE VAL=?"; @@ -97,7 +97,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.addSuperNode = function(id, pubKey, uri) { + db.addSuperNode = function (id, pubKey, uri) { return new Promise((resolve, reject) => { let statement = "INSERT INTO SuperNodes (FLO_ID, PUB_KEY, URI) VALUES (?, ?, ?)" + " ON DUPLICATE KEY UPDATE URI=?"; @@ -107,7 +107,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.rmSuperNode = function(id) { + db.rmSuperNode = function (id) { return new Promise((resolve, reject) => { let statement = "DELETE FROM SuperNodes" + " WHERE FLO_ID=?"; @@ -117,7 +117,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.setSubAdmin = function(appName, subAdmins) { + db.setSubAdmin = function (appName, subAdmins) { return new Promise((resolve, reject) => { let statement = "UPDATE Applications" + " SET SUB_ADMINS=?" + @@ -128,7 +128,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.addApp = function(appName, adminID) { + db.addApp = function (appName, adminID) { return new Promise((resolve, reject) => { let statement = "INSERT INTO Applications (APP_NAME, ADMIN_ID) VALUES (?, ?)" + " ON DUPLICATE KEY UPDATE ADMIN_ID=?"; @@ -138,7 +138,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.rmApp = function(appName) { + db.rmApp = function (appName) { return new Promise((resolve, reject) => { let statement = "DELETE FROM Applications" + " WHERE APP_NAME=" + appName; @@ -148,7 +148,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.getBase = function() { + db.getBase = function () { return new Promise((resolve, reject) => { let tables = Object.keys(Base_Tables); Promise.all(tables.map(t => db.query("SELECT * FROM " + t))).then(result => { @@ -167,7 +167,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.createTable = function(snID) { + db.createTable = function (snID) { return new Promise((resolve, reject) => { let statement = "CREATE TABLE IF NOT EXISTS _" + snID + " ( " + H_struct.VECTOR_CLOCK + " VARCHAR(88) NOT NULL, " + @@ -199,7 +199,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.dropTable = function(snID) { + db.dropTable = function (snID) { return new Promise((resolve, reject) => { let statement = "DROP TABLE _" + snID; db.query(statement) @@ -208,7 +208,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.addData = function(snID, data) { + db.addData = function (snID, data) { return new Promise((resolve, reject) => { data[L_struct.STATUS] = 1; data[L_struct.LOG_TIME] = Date.now(); @@ -228,7 +228,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.getData = function(snID, vectorClock) { + db.getData = function (snID, vectorClock) { return new Promise((resolve, reject) => { let statement = "SELECT * FROM _" + snID + " WHERE " + H_struct.VECTOR_CLOCK + "=?"; @@ -238,7 +238,7 @@ function Database(user, password, dbname, host = 'localhost') { }) }; - db.tagData = function(snID, vectorClock, tag, tagTime, tagKey, tagSign) { + db.tagData = function (snID, vectorClock, tag, tagTime, tagKey, tagSign) { return new Promise((resolve, reject) => { let data = { [T_struct.TAG]: tag, @@ -259,7 +259,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.noteData = function(snID, vectorClock, note, noteTime, noteKey, noteSign) { + db.noteData = function (snID, vectorClock, note, noteTime, noteKey, noteSign) { return new Promise((resolve, reject) => { let data = { [F_struct.NOTE]: note, @@ -280,7 +280,7 @@ function Database(user, password, dbname, host = 'localhost') { }); } - db.searchData = function(snID, request) { + db.searchData = function (snID, request) { return new Promise((resolve, reject) => { let conditionArr = []; if (request.lowerVectorClock || request.upperVectorClock || request.atVectorClock) { @@ -322,7 +322,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.lastLogTime = function(snID) { + db.lastLogTime = function (snID) { return new Promise((resolve, reject) => { let attr = "MAX(" + L_struct.LOG_TIME + ")"; let statement = "SELECT " + attr + " FROM _" + snID; @@ -332,7 +332,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.createGetLastLog = function(snID) { + db.createGetLastLog = function (snID) { return new Promise((resolve, reject) => { db.createTable(snID).then(result => { db.lastLogTime(snID) @@ -342,18 +342,18 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.readAllData = function(snID, logtime) { + db.readAllDataStream = function (snID, logtime, callback) { return new Promise((resolve, reject) => { let statement = "SELECT * FROM _" + snID + " WHERE " + L_struct.LOG_TIME + ">" + logtime + " ORDER BY " + L_struct.LOG_TIME; - db.query(statement) + db.query_stream(statement, callback) .then(result => resolve(result)) .catch(error => reject(error)); }); }; - db.storeData = function(snID, data, updateLogTime = false) { + db.storeData = function (snID, data, updateLogTime = false) { return new Promise((resolve, reject) => { if (updateLogTime) data[L_struct.LOG_TIME] = Date.now(); @@ -373,7 +373,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.storeTag = function(snID, data) { + db.storeTag = function (snID, data) { return new Promise((resolve, reject) => { let attr = Object.keys(T_struct).map(a => T_struct[a]).concat(L_struct.LOG_TIME); let values = attr.map(a => data[a]).concat(data[H_struct.VECTOR_CLOCK]); @@ -386,7 +386,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.storeNote = function(snID, data) { + db.storeNote = function (snID, data) { let attr = Object.keys(F_struct).map(a => F_struct[a]).concat(L_struct.LOG_TIME); let values = attr.map(a => data[a]).concat(data[H_struct.VECTOR_CLOCK]); let statement = "UPDATE _" + snID + @@ -397,7 +397,7 @@ function Database(user, password, dbname, host = 'localhost') { .catch(error => reject(error)); } - db.deleteData = function(snID, vectorClock) { + db.deleteData = function (snID, vectorClock) { return new Promise((resolve, reject) => { let statement = "DELETE FROM _" + snID + " WHERE " + H_struct.VECTOR_CLOCK + "=?"; @@ -407,7 +407,7 @@ function Database(user, password, dbname, host = 'localhost') { }); }; - db.clearAuthorisedAppData = function(snID, app, adminID, subAdmins, timestamp) { + db.clearAuthorisedAppData = function (snID, app, adminID, subAdmins, timestamp) { return new Promise((resolve, reject) => { let statement = "DELETE FROM _" + snID + " WHERE ( " + H_struct.TIME + " { let statement = "DELETE FROM _" + snID + " WHERE " + H_struct.TIME + " new Promise((resolve, reject) => { + db.connect.then(conn => { + if (!callback && value instanceof Function) { + callback = value; + value = undefined; + } + var err_flag, row_count = 0; + (value ? conn.query(sql, values) : conn.query(sql)) + .on('error', err => err_flag = err) + .on('result', row => { + row_count++; + conn.pause(); + callback(row); + conn.resume(); + }).on('end', _ => { + conn.release(); + err_flag ? reject(err_flag) : resolve(row_count); + }); + }) + }) + }) + return new Promise((resolve, reject) => { db.pool = mysql.createPool({ host: host, diff --git a/src/intra.js b/src/intra.js index 8915592..9015055 100644 --- a/src/intra.js +++ b/src/intra.js @@ -517,25 +517,24 @@ orderBackup.requestData = function (req_sync, new_order) { function sendStoredData(lastlogs, node) { for (let n in lastlogs) { if (_list.stored.includes(n)) { - DB.readAllData(n, lastlogs[n]).then(result => { - node.send(packet_.construct({ - type: DATA_SYNC, - id: n, - status: true - })); - console.info(`START: ${n} data sync(send) to ${node.id}`); - //TODO: efficiently handle large number of data instead of loading all into memory - result.forEach(d => node.send(packet_.construct({ - type: STORE_BACKUP_DATA, - data: d - }))); - console.info(`END: ${n} data sync(send) to ${node.id}`); - node.send(packet_.construct({ + node.send(packet_.construct({ + type: DATA_SYNC, + id: n, + status: true + })); + console.info(`START: ${n} data sync(send) to ${node.id}`); + DB.readAllDataStream(n, lastlogs[n], d => node.send(packet_.construct({ + type: STORE_BACKUP_DATA, + data: d + }))).then(result => console.info(`END: ${n} data sync(send) to ${node.id} (${result} records)`)) + .catch(error => { + console.info(`ERROR: ${n} data sync(send) to ${node.id}`) + console.error(error); + }).finally(_ => node.send(packet_.construct({ type: DATA_SYNC, id: n, status: false - })); - }).catch(error => console.error(error)); + }))); }; }; }; @@ -664,29 +663,29 @@ dataMigration.process_del = async function (del_nodes, old_kb) { connectToAllActiveNodes().then(ws_connections => { let remaining = process_nodes.length; process_nodes.forEach(n => { - DB.readAllData(n, 0).then(result => { - console.info(`START: Data migration for ${n}`); - //TODO: efficiently handle large number of data instead of loading all into memory - result.forEach(d => { - let closest = cloud.closestNode(d.receiverID); - if (_list.serving.includes(closest)) { - DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e)); - if (_nextNode.id) - _nextNode.send(packet_.construct({ - type: STORE_BACKUP_DATA, - data: d - })); - } else - ws_connections[closest].send(packet_.construct({ - type: STORE_MIGRATED_DATA, + console.info(`START: Data migration (del) for ${n}`); + DB.readAllDataStream(n, 0, d => { + let closest = cloud.closestNode(d.receiverID); + if (_list.serving.includes(closest)) { + DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e)); + if (_nextNode.id) + _nextNode.send(packet_.construct({ + type: STORE_BACKUP_DATA, data: d })); - }); - console.info(`END: Data migration for ${n}`); + } else + ws_connections[closest].send(packet_.construct({ + type: STORE_MIGRATED_DATA, + data: d + })); + }).then(result => { + console.info(`END: Data migration (del) for ${n}`); _list.delete(n); DB.dropTable(n).then(_ => null).catch(e => console.error(e)); - remaining--; - }).catch(error => console.error(error)); + }).catch(error => { + console.info(`ERROR: Data migration (del) for ${n}`); + console.error(error); + }).finally(_ => remaining--); }); const interval = setInterval(() => { if (remaining <= 0) { @@ -714,37 +713,38 @@ dataMigration.process_new = async function (new_nodes) { let process_nodes = _list.serving, remaining = process_nodes.length; process_nodes.forEach(n => { - DB.readAllData(n, 0).then(result => { - //TODO: efficiently handle large number of data instead of loading all into memory - result.forEach(d => { - let closest = cloud.closestNode(d.receiverID); - if (new_nodes.includes(closest)) { - if (_list.serving.includes(closest)) { - DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e)); - if (_nextNode.id) - _nextNode.send(packet_.construct({ - type: STORE_BACKUP_DATA, - data: d - })); - } else - ws_connections[closest].send(packet_.construct({ - type: STORE_MIGRATED_DATA, - data: d - })); - DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e)); + console.info(`START: Data migration (re-new) for ${n}`); + DB.readAllDataStream(n, 0, d => { + let closest = cloud.closestNode(d.receiverID); + if (new_nodes.includes(closest)) { + if (_list.serving.includes(closest)) { + DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e)); if (_nextNode.id) _nextNode.send(packet_.construct({ - type: DELETE_MIGRATED_DATA, - data: { - vectorClock: d.vectorClock, - receiverID: d.receiverID, - snID: n - } + type: STORE_BACKUP_DATA, + data: d })); - }; - }); - remaining--; - }).catch(error => console.error(error)); + } else + ws_connections[closest].send(packet_.construct({ + type: STORE_MIGRATED_DATA, + data: d + })); + DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e)); + if (_nextNode.id) + _nextNode.send(packet_.construct({ + type: DELETE_MIGRATED_DATA, + data: { + vectorClock: d.vectorClock, + receiverID: d.receiverID, + snID: n + } + })); + }; + }).then(result => console.info(`END: Data migration (re-new) for ${n}`)) + .catch(error => { + console.info(`ERROR: Data migration (re-new) for ${n}`); + console.error(error); + }).finally(_ => remaining--); }); const interval = setInterval(() => { if (remaining <= 0) { diff --git a/src/main.js b/src/main.js index 789c701..dd37cf5 100644 --- a/src/main.js +++ b/src/main.js @@ -266,13 +266,11 @@ function selfDiskMigration(node_change) { disks.forEach(n => { if (node_change[n] === false) DB.dropTable(n).then(_ => null).catch(e => console.error(e)); - DB.readAllData(n, 0).then(result => { - result.forEach(d => { - let closest = cloud.closestNode(d.receiverID); - if (closest !== n) - DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e)); - }); - }).catch(error => console.error(error)); + DB.readAllDataStream(n, 0, d => { + let closest = cloud.closestNode(d.receiverID); + if (closest !== n) + DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e)); + }).then(result => console.debug(`Completed self-disk migration for ${n}`)).catch(error => console.error(error)); }); }).catch(error => console.error(error)); };