Re-sync Improvements

When table data needs resync (ie, when checksum fails), Instead of requesting the entire data, following procedure is done
- row-group (based on id column) hashes is requested from master
- hashes are checked on the DB
- request chunks that have failed hash check
This commit is contained in:
sairajzero 2022-02-17 04:43:05 +05:30
parent d30603134f
commit 12e90abe8b
3 changed files with 152 additions and 36 deletions

View File

@ -288,6 +288,9 @@ function startBackupTransmitter(server) {
case "BACKUP_SYNC":
sync.sendBackupData(request.last_time, request.checksum, ws);
break;
case "HASH_SYNC":
sync.sendTableHash(request.tables, ws);
break;
case "RE_SYNC":
sync.sendTableData(request.tables, ws);
break;

View File

@ -3,7 +3,8 @@
const WAIT_TIME = 10 * 60 * 1000,
BACKUP_INTERVAL = 1 * 60 * 1000,
CHECKSUM_INTERVAL = 15, //times of BACKUP_INTERVAL
SINK_KEY_INDICATOR = '$$$';
SINK_KEY_INDICATOR = '$$$',
HASH_ROW_COUNT = 100;
var DB; //Container for Database connection
var masterWS = null; //Container for Master websocket connection
@ -219,6 +220,14 @@ function processBackupData(response) {
case "SYNC_CHECKSUM":
self.checksum = response.checksum;
break;
case "SYNC_HASH":
verifyHash(response.hashes)
.then(mismatch => requestTableChunks(mismatch))
.catch(error => {
console.error(error);
self.close();
});
break;
}
}
@ -337,12 +346,11 @@ function verifyChecksum(checksum_ref) {
mismatch.push(table);
console.debug("Mismatch:", mismatch);
if (!mismatch.length) //Checksum of every table is verified.
return resolve(true);
else //If one or more tables checksum is not correct, re-request the table data
Promise.allSettled(mismatch.map(t => DB.query("TRUNCATE " + t))).then(_ => {
requestReSync(mismatch);
resolve(false);
})
resolve(true);
else { //If one or more tables checksum is not correct, re-request the table data
requestHash(mismatch);
resolve(false);
}
}).catch(error => {
console.error(error);
reject(false);
@ -350,12 +358,13 @@ function verifyChecksum(checksum_ref) {
})
}
function requestReSync(tables) {
function requestHash(tables) {
//TODO: resync only necessary data (instead of entire table)
let self = requestInstance;
let request = {
floID: global.myFloID,
pubKey: global.myPubKey,
type: "RE_SYNC",
type: "HASH_SYNC",
tables: tables,
req_time: Date.now()
};
@ -368,6 +377,60 @@ function requestReSync(tables) {
self.add_count = self.delete_count = 0;
}
function verifyHash(hashes) {
const getHash = table => new Promise((res, rej) => {
DB.query("SHOW COLUMNS FROM " + table).then(result => {
let columns = result.map(r => r["Field"]).sort();
DB.query(`SELECT CEIL(id/${HASH_ROW_COUNT}) as group_id, MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash FROM ${table} GROUP BY group_id ORDER BY group_id`)
.then(result => res(Object.fromEntries(result.map(r => [r.group_id, r.hash]))))
.catch(error => rej(error))
}).catch(error => rej(error))
});
const convertIntArray = obj => Object.keys(obj).map(i => parseInt(i));
const checkHash = (table, hash_ref) => new Promise((res, rej) => {
getHash(table).then(hash_cur => {
console.debug(hash_ref, hash_cur)
for (let i in hash_ref)
if (hash_ref[i] === hash_cur[i]) {
delete hash_ref[i];
delete hash_cur[i];
}
console.debug("HashDiff:", hash_ref, hash_cur);
res([convertIntArray(hash_ref), convertIntArray(hash_cur)]);
}).catch(error => rej(error))
})
return new Promise((resolve, reject) => {
let tables = Object.keys(hashes);
Promise.allSettled(tables.map(t => checkHash(t, hashes[t]))).then(result => {
let mismatch = {};
for (let t in tables)
if (result[t].status === "fulfilled") {
mismatch[tables[t]] = result[t].value; //Data that are incorrect/missing/deleted
//Data to be deleted (incorrect data will be added by resync)
let id_end = result[t].value[1].map(i => i * HASH_ROW_COUNT); //eg if i=2 AND H_R_C = 5 then id_end = 2 * 5 = 10 (ie, range 6-10)
Promise.allSettled(id_end.map(i =>
DB.query(`DELETE FROM ${tables[t]} WHERE id BETWEEN ${i - HASH_ROW_COUNT + 1} AND ${i}`))) //eg, i - HASH_ROW_COUNT + 1 = 10 - 5 + 1 = 6
.then(_ => null);
} else
console.error(result[t].reason);
console.debug("mismatch", mismatch);
resolve(mismatch);
}).catch(error => reject(error))
})
}
function requestTableChunks(tables, ws) {
let request = {
floID: global.myFloID,
pubKey: global.myPubKey,
type: "RE_SYNC",
tables: tables,
req_time: Date.now()
};
request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey);
ws.send(JSON.stringify(tables));
}
module.exports = {
SINK_KEY_INDICATOR,
set DB(db) {

View File

@ -1,4 +1,5 @@
var DB; //Container for database
const HASH_ROW_COUNT = 100;
//Backup Transfer
function sendBackupData(timestamp, checksum, ws) {
@ -110,11 +111,34 @@ function backupSync_checksum(ws) {
}
function sendTableHash(tables, ws) {
const getHash = table => new Promise((res, rej) => {
DB.query("SHOW COLUMNS FROM " + table).then(result => {
let columns = result.map(r => r["Field"]).sort();
DB.query(`SELECT CEIL(id/${HASH_ROW_COUNT}) as group_id, MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash FROM ${table} GROUP BY group_id ORDER BY group_id`)
.then(result => res(Object.fromEntries(result.map(r => [r.group_id, r.hash]))))
.catch(error => rej(error))
}).catch(error => rej(error))
});
Promise.allSettled(tables.map(t => getHash(t))).then(result => {
let hashes = {};
for (let i in tables)
if (result[i].status === "fulfilled")
hashes[tables[i]] = result[i].value;
else
console.error(result[i].reason);
ws.send(JSON.stringify({
command: "SYNC_HASH",
hashes: hashes
}));
})
}
function sendTableData(tables, ws) {
let promises = [
tableSync_data(tables, ws),
tableSync_delete(tables, ws),
tableSync_checksum(tables, ws)
tableSync_checksum(Object.keys(tables), ws)
];
Promise.allSettled(promises).then(result => {
let failedSync = [];
@ -137,54 +161,79 @@ function sendTableData(tables, ws) {
}
function tableSync_delete(tables, ws) {
let getDelete = (table, group_id) => new Promise((res, rej) => {
let id_end = group_id * HASH_ROW_COUNT,
id_start = id_end - HASH_ROW_COUNT + 1;
DB.query("SELCT * FROM _backup WHERE t_name=? AND mode is NULL AND id BETWEEN ? AND ?", [table, id_start, id_end])
.then(result => res(result))
.catch(error => rej(error))
})
return new Promise((resolve, reject) => {
DB.query(`SELECT * FROM _backup WHERE mode is NULL AND t_name IN (${Array(tables.length).fill("?").join()})`, tables).then(result => {
let promises = [];
for (let t in tables)
for (let g_id in tables[t][1]) //tables[t] is [convertIntArray(hash_ref), convertIntArray(hash_cur)]
promises.push(getDelete(t, g_id));
Promise.allSettled(promises).then(results => {
let delete_sync = results.filter(r => r.status === "fulfilled").map(r => r.value); //Filtered results
delete_sync = [].concat(...delete_sync); //Convert 2d array into 1d
ws.send(JSON.stringify({
command: "SYNC_DELETE",
delete_data: result
delete_data: delete_sync
}));
resolve("deleteSync");
}).catch(error => {
console.error(error);
reject("deleteSync");
});
})
}
function tableSync_data(tables, ws) {
const sendTable = table => new Promise((res, rej) => {
DB.query(`SELECT * FROM ${table}`)
.then(data => {
ws.send(JSON.stringify({
table,
command: "SYNC_UPDATE",
data
}));
res(table);
}).catch(error => {
console.error(error);
rej(table);
});
const sendTable = (table, group_id) => new Promise((res, rej) => {
let id_end = group_id * HASH_ROW_COUNT,
id_start = id_end - HASH_ROW_COUNT + 1;
DB.query(`SELECT * FROM ${table} WHERE id BETWEEN ? AND ?`, [id_start, id_end]).then(data => {
ws.send(JSON.stringify({
table,
command: "SYNC_UPDATE",
data
}));
res(table);
}).catch(error => {
console.error(error);
rej(table);
});
});
const getUpdate = (table, group_id) => new Promise((res, rej) => {
let id_end = group_id * HASH_ROW_COUNT,
id_start = id_end - HASH_ROW_COUNT + 1;
DB.query("SELCT * FROM _backup WHERE t_name=? AND mode=TRUE AND id BETWEEN ? AND ?", [table, id_start, id_end])
.then(result => res(result))
.catch(error => rej(error))
})
return new Promise((resolve, reject) => {
DB.query(`SELECT * FROM _backup WHERE mode=TRUE AND t_name IN (${Array(tables.length).fill("?").join()})`, tables).then(result => {
let promises = [];
for (let t in tables)
for (let g_id in tables[t][0]) //tables[t] is [convertIntArray(hash_ref), convertIntArray(hash_cur)]
promises.push(getUpdate(t, g_id));
Promise.allSettled(promises).then(results => {
let update_sync = results.filter(r => r.status === "fulfilled").map(r => r.value); //Filtered results
update_sync = [].concat(...update_sync); //Convert 2d array into 1d
ws.send(JSON.stringify({
command: "SYNC_HEADER",
add_data: result
add_data: update_sync
}));
Promise.allSettled(tables.map(t => sendTable(t))).then(result => {
let promises = [];
for (let t in tables)
for (let g_id in tables[t][0]) //tables[t] is [convertIntArray(hash_ref), convertIntArray(hash_cur)]
promises.push(sendTable(t, g_id));
Promise.allSettled(promises).then(result => {
let failedTables = [];
result.forEach(r => r.status === "rejected" ? failedTables.push(r.reason) : null);
if (failedTables.length)
reject(["dataSync", failedTables]);
reject(["dataSync", [...new Set(failedTables)]]);
else
resolve("dataSync");
});
}).catch(error => {
console.error(error);
reject("dataSync");
});
});
})
}
function tableSync_checksum(tables, ws) {
@ -206,6 +255,7 @@ function tableSync_checksum(tables, ws) {
module.exports = {
sendBackupData,
sendTableHash,
sendTableData,
set DB(db) {
DB = db;