diff --git a/src/backup/head.js b/src/backup/head.js index ee3bc0b..f3865a8 100644 --- a/src/backup/head.js +++ b/src/backup/head.js @@ -288,6 +288,9 @@ function startBackupTransmitter(server) { case "BACKUP_SYNC": sync.sendBackupData(request.last_time, request.checksum, ws); break; + case "HASH_SYNC": + sync.sendTableHash(request.tables, ws); + break; case "RE_SYNC": sync.sendTableData(request.tables, ws); break; diff --git a/src/backup/slave.js b/src/backup/slave.js index 1f5d93f..370aadc 100644 --- a/src/backup/slave.js +++ b/src/backup/slave.js @@ -3,7 +3,8 @@ const WAIT_TIME = 10 * 60 * 1000, BACKUP_INTERVAL = 1 * 60 * 1000, CHECKSUM_INTERVAL = 15, //times of BACKUP_INTERVAL - SINK_KEY_INDICATOR = '$$$'; + SINK_KEY_INDICATOR = '$$$', + HASH_ROW_COUNT = 100; var DB; //Container for Database connection var masterWS = null; //Container for Master websocket connection @@ -219,6 +220,14 @@ function processBackupData(response) { case "SYNC_CHECKSUM": self.checksum = response.checksum; break; + case "SYNC_HASH": + verifyHash(response.hashes) + .then(mismatch => requestTableChunks(mismatch)) + .catch(error => { + console.error(error); + self.close(); + }); + break; } } @@ -337,12 +346,11 @@ function verifyChecksum(checksum_ref) { mismatch.push(table); console.debug("Mismatch:", mismatch); if (!mismatch.length) //Checksum of every table is verified. - return resolve(true); - else //If one or more tables checksum is not correct, re-request the table data - Promise.allSettled(mismatch.map(t => DB.query("TRUNCATE " + t))).then(_ => { - requestReSync(mismatch); - resolve(false); - }) + resolve(true); + else { //If one or more tables checksum is not correct, re-request the table data + requestHash(mismatch); + resolve(false); + } }).catch(error => { console.error(error); reject(false); @@ -350,12 +358,13 @@ function verifyChecksum(checksum_ref) { }) } -function requestReSync(tables) { +function requestHash(tables) { + //TODO: resync only necessary data (instead of entire table) let self = requestInstance; let request = { floID: global.myFloID, pubKey: global.myPubKey, - type: "RE_SYNC", + type: "HASH_SYNC", tables: tables, req_time: Date.now() }; @@ -368,6 +377,60 @@ function requestReSync(tables) { self.add_count = self.delete_count = 0; } +function verifyHash(hashes) { + const getHash = table => new Promise((res, rej) => { + DB.query("SHOW COLUMNS FROM " + table).then(result => { + let columns = result.map(r => r["Field"]).sort(); + DB.query(`SELECT CEIL(id/${HASH_ROW_COUNT}) as group_id, MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash FROM ${table} GROUP BY group_id ORDER BY group_id`) + .then(result => res(Object.fromEntries(result.map(r => [r.group_id, r.hash])))) + .catch(error => rej(error)) + }).catch(error => rej(error)) + }); + const convertIntArray = obj => Object.keys(obj).map(i => parseInt(i)); + const checkHash = (table, hash_ref) => new Promise((res, rej) => { + getHash(table).then(hash_cur => { + console.debug(hash_ref, hash_cur) + for (let i in hash_ref) + if (hash_ref[i] === hash_cur[i]) { + delete hash_ref[i]; + delete hash_cur[i]; + } + console.debug("HashDiff:", hash_ref, hash_cur); + res([convertIntArray(hash_ref), convertIntArray(hash_cur)]); + }).catch(error => rej(error)) + }) + return new Promise((resolve, reject) => { + let tables = Object.keys(hashes); + Promise.allSettled(tables.map(t => checkHash(t, hashes[t]))).then(result => { + let mismatch = {}; + for (let t in tables) + if (result[t].status === "fulfilled") { + mismatch[tables[t]] = result[t].value; //Data that are incorrect/missing/deleted + //Data to be deleted (incorrect data will be added by resync) + let id_end = result[t].value[1].map(i => i * HASH_ROW_COUNT); //eg if i=2 AND H_R_C = 5 then id_end = 2 * 5 = 10 (ie, range 6-10) + Promise.allSettled(id_end.map(i => + DB.query(`DELETE FROM ${tables[t]} WHERE id BETWEEN ${i - HASH_ROW_COUNT + 1} AND ${i}`))) //eg, i - HASH_ROW_COUNT + 1 = 10 - 5 + 1 = 6 + .then(_ => null); + } else + console.error(result[t].reason); + console.debug("mismatch", mismatch); + resolve(mismatch); + }).catch(error => reject(error)) + }) +} + +function requestTableChunks(tables, ws) { + let request = { + floID: global.myFloID, + pubKey: global.myPubKey, + type: "RE_SYNC", + tables: tables, + req_time: Date.now() + }; + request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey); + ws.send(JSON.stringify(tables)); +} + module.exports = { SINK_KEY_INDICATOR, set DB(db) { diff --git a/src/backup/sync.js b/src/backup/sync.js index 0026a65..fca884f 100644 --- a/src/backup/sync.js +++ b/src/backup/sync.js @@ -1,4 +1,5 @@ var DB; //Container for database +const HASH_ROW_COUNT = 100; //Backup Transfer function sendBackupData(timestamp, checksum, ws) { @@ -110,11 +111,34 @@ function backupSync_checksum(ws) { } +function sendTableHash(tables, ws) { + const getHash = table => new Promise((res, rej) => { + DB.query("SHOW COLUMNS FROM " + table).then(result => { + let columns = result.map(r => r["Field"]).sort(); + DB.query(`SELECT CEIL(id/${HASH_ROW_COUNT}) as group_id, MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash FROM ${table} GROUP BY group_id ORDER BY group_id`) + .then(result => res(Object.fromEntries(result.map(r => [r.group_id, r.hash])))) + .catch(error => rej(error)) + }).catch(error => rej(error)) + }); + Promise.allSettled(tables.map(t => getHash(t))).then(result => { + let hashes = {}; + for (let i in tables) + if (result[i].status === "fulfilled") + hashes[tables[i]] = result[i].value; + else + console.error(result[i].reason); + ws.send(JSON.stringify({ + command: "SYNC_HASH", + hashes: hashes + })); + }) +} + function sendTableData(tables, ws) { let promises = [ tableSync_data(tables, ws), tableSync_delete(tables, ws), - tableSync_checksum(tables, ws) + tableSync_checksum(Object.keys(tables), ws) ]; Promise.allSettled(promises).then(result => { let failedSync = []; @@ -137,54 +161,79 @@ function sendTableData(tables, ws) { } function tableSync_delete(tables, ws) { + let getDelete = (table, group_id) => new Promise((res, rej) => { + let id_end = group_id * HASH_ROW_COUNT, + id_start = id_end - HASH_ROW_COUNT + 1; + DB.query("SELCT * FROM _backup WHERE t_name=? AND mode is NULL AND id BETWEEN ? AND ?", [table, id_start, id_end]) + .then(result => res(result)) + .catch(error => rej(error)) + }) return new Promise((resolve, reject) => { - DB.query(`SELECT * FROM _backup WHERE mode is NULL AND t_name IN (${Array(tables.length).fill("?").join()})`, tables).then(result => { + let promises = []; + for (let t in tables) + for (let g_id in tables[t][1]) //tables[t] is [convertIntArray(hash_ref), convertIntArray(hash_cur)] + promises.push(getDelete(t, g_id)); + Promise.allSettled(promises).then(results => { + let delete_sync = results.filter(r => r.status === "fulfilled").map(r => r.value); //Filtered results + delete_sync = [].concat(...delete_sync); //Convert 2d array into 1d ws.send(JSON.stringify({ command: "SYNC_DELETE", - delete_data: result + delete_data: delete_sync })); resolve("deleteSync"); - }).catch(error => { - console.error(error); - reject("deleteSync"); }); }) } function tableSync_data(tables, ws) { - const sendTable = table => new Promise((res, rej) => { - DB.query(`SELECT * FROM ${table}`) - .then(data => { - ws.send(JSON.stringify({ - table, - command: "SYNC_UPDATE", - data - })); - res(table); - }).catch(error => { - console.error(error); - rej(table); - }); + const sendTable = (table, group_id) => new Promise((res, rej) => { + let id_end = group_id * HASH_ROW_COUNT, + id_start = id_end - HASH_ROW_COUNT + 1; + DB.query(`SELECT * FROM ${table} WHERE id BETWEEN ? AND ?`, [id_start, id_end]).then(data => { + ws.send(JSON.stringify({ + table, + command: "SYNC_UPDATE", + data + })); + res(table); + }).catch(error => { + console.error(error); + rej(table); + }); }); + const getUpdate = (table, group_id) => new Promise((res, rej) => { + let id_end = group_id * HASH_ROW_COUNT, + id_start = id_end - HASH_ROW_COUNT + 1; + DB.query("SELCT * FROM _backup WHERE t_name=? AND mode=TRUE AND id BETWEEN ? AND ?", [table, id_start, id_end]) + .then(result => res(result)) + .catch(error => rej(error)) + }) return new Promise((resolve, reject) => { - DB.query(`SELECT * FROM _backup WHERE mode=TRUE AND t_name IN (${Array(tables.length).fill("?").join()})`, tables).then(result => { + let promises = []; + for (let t in tables) + for (let g_id in tables[t][0]) //tables[t] is [convertIntArray(hash_ref), convertIntArray(hash_cur)] + promises.push(getUpdate(t, g_id)); + Promise.allSettled(promises).then(results => { + let update_sync = results.filter(r => r.status === "fulfilled").map(r => r.value); //Filtered results + update_sync = [].concat(...update_sync); //Convert 2d array into 1d ws.send(JSON.stringify({ command: "SYNC_HEADER", - add_data: result + add_data: update_sync })); - Promise.allSettled(tables.map(t => sendTable(t))).then(result => { + let promises = []; + for (let t in tables) + for (let g_id in tables[t][0]) //tables[t] is [convertIntArray(hash_ref), convertIntArray(hash_cur)] + promises.push(sendTable(t, g_id)); + Promise.allSettled(promises).then(result => { let failedTables = []; result.forEach(r => r.status === "rejected" ? failedTables.push(r.reason) : null); if (failedTables.length) - reject(["dataSync", failedTables]); + reject(["dataSync", [...new Set(failedTables)]]); else resolve("dataSync"); }); - }).catch(error => { - console.error(error); - reject("dataSync"); }); - }); + }) } function tableSync_checksum(tables, ws) { @@ -206,6 +255,7 @@ function tableSync_checksum(tables, ws) { module.exports = { sendBackupData, + sendTableHash, sendTableData, set DB(db) { DB = db;