Improvement: streamed DB.readAllData

- DB.readAllData is now a streamed function (readlAllDataStream)
- Large number data from database can be processed without any issue
This commit is contained in:
sairajzero 2022-11-30 03:50:12 +05:30
parent fd2aa002c0
commit ee8089f858
3 changed files with 117 additions and 96 deletions

View File

@ -342,12 +342,12 @@ function Database(user, password, dbname, host = 'localhost') {
}); });
}; };
db.readAllData = function(snID, logtime) { db.readAllDataStream = function (snID, logtime, callback) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let statement = "SELECT * FROM _" + snID + let statement = "SELECT * FROM _" + snID +
" WHERE " + L_struct.LOG_TIME + ">" + logtime + " WHERE " + L_struct.LOG_TIME + ">" + logtime +
" ORDER BY " + L_struct.LOG_TIME; " ORDER BY " + L_struct.LOG_TIME;
db.query(statement) db.query_stream(statement, callback)
.then(result => resolve(result)) .then(result => resolve(result))
.catch(error => reject(error)); .catch(error => reject(error));
}); });
@ -462,6 +462,29 @@ function Database(user, password, dbname, host = 'localhost') {
}) })
}); });
Object.defineProperty(db, "query_stream", {
value: (sql, value, callback) => new Promise((resolve, reject) => {
db.connect.then(conn => {
if (!callback && value instanceof Function) {
callback = value;
value = undefined;
}
var err_flag, row_count = 0;
(value ? conn.query(sql, values) : conn.query(sql))
.on('error', err => err_flag = err)
.on('result', row => {
row_count++;
conn.pause();
callback(row);
conn.resume();
}).on('end', _ => {
conn.release();
err_flag ? reject(err_flag) : resolve(row_count);
});
})
})
})
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
db.pool = mysql.createPool({ db.pool = mysql.createPool({
host: host, host: host,

View File

@ -517,25 +517,24 @@ orderBackup.requestData = function (req_sync, new_order) {
function sendStoredData(lastlogs, node) { function sendStoredData(lastlogs, node) {
for (let n in lastlogs) { for (let n in lastlogs) {
if (_list.stored.includes(n)) { if (_list.stored.includes(n)) {
DB.readAllData(n, lastlogs[n]).then(result => {
node.send(packet_.construct({ node.send(packet_.construct({
type: DATA_SYNC, type: DATA_SYNC,
id: n, id: n,
status: true status: true
})); }));
console.info(`START: ${n} data sync(send) to ${node.id}`); console.info(`START: ${n} data sync(send) to ${node.id}`);
//TODO: efficiently handle large number of data instead of loading all into memory DB.readAllDataStream(n, lastlogs[n], d => node.send(packet_.construct({
result.forEach(d => node.send(packet_.construct({
type: STORE_BACKUP_DATA, type: STORE_BACKUP_DATA,
data: d data: d
}))); }))).then(result => console.info(`END: ${n} data sync(send) to ${node.id} (${result} records)`))
console.info(`END: ${n} data sync(send) to ${node.id}`); .catch(error => {
node.send(packet_.construct({ console.info(`ERROR: ${n} data sync(send) to ${node.id}`)
console.error(error);
}).finally(_ => node.send(packet_.construct({
type: DATA_SYNC, type: DATA_SYNC,
id: n, id: n,
status: false status: false
})); })));
}).catch(error => console.error(error));
}; };
}; };
}; };
@ -664,10 +663,8 @@ dataMigration.process_del = async function (del_nodes, old_kb) {
connectToAllActiveNodes().then(ws_connections => { connectToAllActiveNodes().then(ws_connections => {
let remaining = process_nodes.length; let remaining = process_nodes.length;
process_nodes.forEach(n => { process_nodes.forEach(n => {
DB.readAllData(n, 0).then(result => { console.info(`START: Data migration (del) for ${n}`);
console.info(`START: Data migration for ${n}`); DB.readAllDataStream(n, 0, d => {
//TODO: efficiently handle large number of data instead of loading all into memory
result.forEach(d => {
let closest = cloud.closestNode(d.receiverID); let closest = cloud.closestNode(d.receiverID);
if (_list.serving.includes(closest)) { if (_list.serving.includes(closest)) {
DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e)); DB.storeData(closest, d, true).then(_ => null).catch(e => console.error(e));
@ -681,12 +678,14 @@ dataMigration.process_del = async function (del_nodes, old_kb) {
type: STORE_MIGRATED_DATA, type: STORE_MIGRATED_DATA,
data: d data: d
})); }));
}); }).then(result => {
console.info(`END: Data migration for ${n}`); console.info(`END: Data migration (del) for ${n}`);
_list.delete(n); _list.delete(n);
DB.dropTable(n).then(_ => null).catch(e => console.error(e)); DB.dropTable(n).then(_ => null).catch(e => console.error(e));
remaining--; }).catch(error => {
}).catch(error => console.error(error)); console.info(`ERROR: Data migration (del) for ${n}`);
console.error(error);
}).finally(_ => remaining--);
}); });
const interval = setInterval(() => { const interval = setInterval(() => {
if (remaining <= 0) { if (remaining <= 0) {
@ -714,9 +713,8 @@ dataMigration.process_new = async function (new_nodes) {
let process_nodes = _list.serving, let process_nodes = _list.serving,
remaining = process_nodes.length; remaining = process_nodes.length;
process_nodes.forEach(n => { process_nodes.forEach(n => {
DB.readAllData(n, 0).then(result => { console.info(`START: Data migration (re-new) for ${n}`);
//TODO: efficiently handle large number of data instead of loading all into memory DB.readAllDataStream(n, 0, d => {
result.forEach(d => {
let closest = cloud.closestNode(d.receiverID); let closest = cloud.closestNode(d.receiverID);
if (new_nodes.includes(closest)) { if (new_nodes.includes(closest)) {
if (_list.serving.includes(closest)) { if (_list.serving.includes(closest)) {
@ -742,9 +740,11 @@ dataMigration.process_new = async function (new_nodes) {
} }
})); }));
}; };
}); }).then(result => console.info(`END: Data migration (re-new) for ${n}`))
remaining--; .catch(error => {
}).catch(error => console.error(error)); console.info(`ERROR: Data migration (re-new) for ${n}`);
console.error(error);
}).finally(_ => remaining--);
}); });
const interval = setInterval(() => { const interval = setInterval(() => {
if (remaining <= 0) { if (remaining <= 0) {

View File

@ -266,13 +266,11 @@ function selfDiskMigration(node_change) {
disks.forEach(n => { disks.forEach(n => {
if (node_change[n] === false) if (node_change[n] === false)
DB.dropTable(n).then(_ => null).catch(e => console.error(e)); DB.dropTable(n).then(_ => null).catch(e => console.error(e));
DB.readAllData(n, 0).then(result => { DB.readAllDataStream(n, 0, d => {
result.forEach(d => {
let closest = cloud.closestNode(d.receiverID); let closest = cloud.closestNode(d.receiverID);
if (closest !== n) if (closest !== n)
DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e)); DB.deleteData(n, d.vectorClock).then(_ => null).catch(e => console.error(e));
}); }).then(result => console.debug(`Completed self-disk migration for ${n}`)).catch(error => console.error(error));
}).catch(error => console.error(error));
}); });
}).catch(error => console.error(error)); }).catch(error => console.error(error));
}; };