From 2a125eb4d24a0cfd2e3cf7b3032dc8e98d4e94a9 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Wed, 16 Feb 2022 01:55:31 +0530 Subject: [PATCH] Verify backup data integrity - Backup data integrity is verified using Checksum for each table - If checksum mismatch is found (for a table) after a backup sync, the table data is cleared and re-synced - Moved backup sync module to backup/sync.js - Fixed: checkForRatedSellers not using correct asset when checking for rated sellers - Others: priceHistory is converted toFixed(3) before storing in DB. (As large/non-terminating decimals cause ambiguity btw nodes during checksum) --- src/backup/head.js | 93 ++----------------- src/backup/slave.js | 150 +++++++++++++++++++++++-------- src/backup/sync.js | 213 ++++++++++++++++++++++++++++++++++++++++++++ src/price.js | 6 +- 4 files changed, 333 insertions(+), 129 deletions(-) create mode 100644 src/backup/sync.js diff --git a/src/backup/head.js b/src/backup/head.js index 5907019..dc31623 100644 --- a/src/backup/head.js +++ b/src/backup/head.js @@ -2,6 +2,7 @@ const K_Bucket = require('../../public/KBucket'); const slave = require('./slave'); +const sync = require('./sync'); const WebSocket = require('ws'); const shareThreshold = 50 / 100; @@ -13,91 +14,6 @@ var nodeShares = null, const SLAVE_MODE = 0, MASTER_MODE = 1; -//Backup Transfer -function sendBackup(timestamp, ws) { - if (!timestamp) timestamp = 0; - else if (typeof timestamp === "string" && /\.\d{3}Z$/.test(timestamp)) - timestamp = timestamp.substring(0, timestamp.length - 1); - let promises = [ - send_dataSync(timestamp, ws), - send_deleteSync(timestamp, ws) - ]; - Promise.allSettled(promises).then(result => { - let failedSync = []; - result.forEach(r => r.status === "rejected" ? failedSync.push(r.reason) : null); - if (failedSync.length) { - console.info("Backup Sync Failed:", failedSync); - ws.send(JSON.stringify({ - command: "SYNC_END", - status: false, - info: failedSync - })); - } else { - console.info("Backup Sync completed"); - ws.send(JSON.stringify({ - command: "SYNC_END", - status: true - })); - } - }); -} - -function send_deleteSync(timestamp, ws) { - return new Promise((resolve, reject) => { - DB.query("SELECT * FROM _backup WHERE mode is NULL AND timestamp > ?", [timestamp]).then(result => { - ws.send(JSON.stringify({ - command: "SYNC_DELETE", - delete_data: result - })); - resolve("deleteSync"); - }).catch(error => { - console.error(error); - reject("deleteSync"); - }); - }) -} - -function send_dataSync(timestamp, ws) { - const sendTable = (table, id_list) => new Promise((res, rej) => { - DB.query(`SELECT * FROM ${table} WHERE id IN (${id_list})`) - .then(data => { - ws.send(JSON.stringify({ - table, - command: "SYNC_UPDATE", - data - })); - res(table); - }).catch(error => { - console.error(error); - rej(table); - }); - }); - return new Promise((resolve, reject) => { - DB.query("SELECT * FROM _backup WHERE mode=TRUE AND timestamp > ?", [timestamp]).then(result => { - let sync_needed = {}; - result.forEach(r => r.t_name in sync_needed ? sync_needed[r.t_name].push(r.id) : sync_needed[r.t_name] = [r.id]); - ws.send(JSON.stringify({ - command: "SYNC_HEADER", - add_data: result - })); - let promises = []; - for (let table in sync_needed) - promises.push(sendTable(table, sync_needed[table])); - Promise.allSettled(promises).then(result => { - let failedTables = []; - result.forEach(r => r.status === "rejected" ? failedTables.push(r.reason) : null); - if (failedTables.length) - reject(["dataSync", failedTables]); - else - resolve("dataSync"); - }); - }).catch(error => { - console.error(error); - reject("dataSync"); - }); - }); -} - //Shares function generateShares(sinkKey) { let nextNodes = nodeKBucket.nextNode(global.myFloID, null), @@ -370,7 +286,10 @@ function startBackupTransmitter(server) { //TODO: check if request time is valid; else switch (request.type) { case "BACKUP_SYNC": - sendBackup(request.last_time, ws); + sync.sendBackupData(request.last_time, request.checksum, ws); + break; + case "RE_SYNC": + sync.sendTableData(request.tables, ws); break; case "UPDATE_MASTER": updateMaster(request.floID); @@ -392,7 +311,6 @@ function startBackupTransmitter(server) { } catch (error) { console.error(error); ws.send(JSON.stringify({ - type: request.type, command: "REQUEST_ERROR", error: 'Unable to process the request!' })); @@ -424,6 +342,7 @@ module.exports = { }, set DB(db) { DB = db; + sync.DB = db; slave.DB = db; }, get wss() { diff --git a/src/backup/slave.js b/src/backup/slave.js index cd4a4f5..1f5d93f 100644 --- a/src/backup/slave.js +++ b/src/backup/slave.js @@ -2,6 +2,7 @@ const WAIT_TIME = 10 * 60 * 1000, BACKUP_INTERVAL = 1 * 60 * 1000, + CHECKSUM_INTERVAL = 15, //times of BACKUP_INTERVAL SINK_KEY_INDICATOR = '$$$'; var DB; //Container for Database connection @@ -43,7 +44,7 @@ function stopSlaveProcess() { } } -function requestBackupSync(ws) { +function requestBackupSync(checksum_trigger, ws) { return new Promise((resolve, reject) => { DB.query('SELECT MAX(timestamp) as last_time FROM _backup').then(result => { let request = { @@ -51,6 +52,7 @@ function requestBackupSync(ws) { pubKey: global.myPubKey, type: "BACKUP_SYNC", last_time: result[0].last_time, + checksum: checksum_trigger, req_time: Date.now() }; request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey); @@ -63,6 +65,7 @@ function requestBackupSync(ws) { const requestInstance = { ws: null, cache: null, + checksum: null, delete_count: null, add_count: null, delete_data: null, @@ -70,7 +73,8 @@ const requestInstance = { total_add: null, request: null, onetime: null, - last_response_time: null + last_response_time: null, + checksum_count_down: 0 }; requestInstance.open = function(ws = null) { @@ -91,7 +95,7 @@ requestInstance.open = function(ws = null) { ws = masterWS; else return console.warn("Not connected to master"); - requestBackupSync(ws).then(request => { + requestBackupSync(!self.checksum_count_down || self.onetime, ws).then(request => { self.request = request; self.cache = []; self.add_count = self.delete_count = 0; @@ -104,9 +108,12 @@ requestInstance.close = function() { const self = this; if (self.onetime) self.ws.close(); + else + self.checksum_count_down = self.checksum_count_down ? self.checksum_count_down - 1 : CHECKSUM_INTERVAL; self.onetime = null; self.ws = null; self.cache = null; + self.checksum = null; self.delete_count = null; self.add_count = null; self.delete_data = null; @@ -124,7 +131,7 @@ function processDataFromMaster(message) { processBackupData(message); else switch (message.command) { case "SINK_SHARE": - storeSinkShare(message); + storeSinkShare(message.sinkID, message.keyShare); break; case "SEND_SHARE": sendSinkShare(message.pubKey); @@ -140,10 +147,10 @@ function processDataFromMaster(message) { } } -function storeSinkShare(message) { - let encryptedShare = Crypto.AES.encrypt(floCrypto.decryptData(message.keyShare, global.myPrivKey), global.myPrivKey); - console.debug(Date.now(), '|sinkID:', message.sinkID, '|EnShare:', encryptedShare); - DB.query("INSERT INTO sinkShares (floID, share) VALUE (?, ?) AS new ON DUPLICATE KEY UPDATE share=new.share", [message.sinkID, encryptedShare]) +function storeSinkShare(sinkID, keyShare) { + let encryptedShare = Crypto.AES.encrypt(floCrypto.decryptData(keyShare, global.myPrivKey), global.myPrivKey); + console.debug(Date.now(), '|sinkID:', sinkID, '|EnShare:', encryptedShare); + DB.query("INSERT INTO sinkShares (floID, share) VALUE (?, ?) AS new ON DUPLICATE KEY UPDATE share=new.share", [sinkID, encryptedShare]) .then(_ => null).catch(error => console.error(error)); } @@ -180,10 +187,21 @@ function processBackupData(response) { console.info(`Backup Sync Instance finished!, ${self.total_add - self.add_count} packets not received.`); else console.info("Backup Sync Instance finished successfully"); - storeBackupData(self.cache, self.add_data, self.delete_data); - } else + storeBackupData(self.cache, self.checksum).then(result => { + updateBackupTable(self.add_data, self.delete_data); + if (result) { + console.log("Backup Sync completed successfully"); + self.close(); + } else + console.log("Waiting for come re-sync data"); + }).catch(_ => { + console.warn("Backup Sync was not successful"); + self.close(); + }); + } else { console.info("Backup Sync was not successful! Failed info: ", response.info); - self.close(); + self.close(); + } break; case "SYNC_DELETE": self.delete_data = response.delete_data; @@ -198,6 +216,9 @@ function processBackupData(response) { self.add_count += 1; self.cache.push(cacheBackupData(response.table, response.data)); break; + case "SYNC_CHECKSUM": + self.checksum = response.checksum; + break; } } @@ -209,37 +230,47 @@ const cacheBackupData = (tableName, dataCache) => new Promise((resolve, reject) }) }); -function storeBackupData(cache_promises, add_header, delete_header) { - Promise.allSettled(cache_promises).then(_ => { - console.log("START: BackupCache -> Tables"); - //Process 'Users' table 1st as it provides foreign key attribute to other tables - DB.query("SELECT * FROM _backupCache WHERE t_name=?", ["Users"]).then(data => { - Promise.allSettled(data.map(d => updateTableData("Users", JSON.parse(d.data_cache)))).then(result => { - storeBackupData.commit(data, result).then(_ => { - DB.query("SELECT * FROM _backupCache WHERE t_name IS NOT NULL").then(data => { - Promise.allSettled(data.map(d => updateTableData(d.t_name, JSON.parse(d.data_cache)))).then(result => { - storeBackupData.commit(data, result).then(_ => { - DB.query("SELECT * FROM _backupCache WHERE t_name IS NULL").then(data => { - Promise.allSettled(data.map(d => deleteTableData(JSON.parse(d.data_cache)))).then(result => { - storeBackupData.commit(data, result).then(_ => { - console.log("END: BackupCache -> Tables"); - updateBackupTable(add_header, delete_header); - }); +function storeBackupData(cache_promises, checksum_ref) { + return new Promise((resolve, reject) => { + Promise.allSettled(cache_promises).then(_ => { + console.log("START: BackupCache -> Tables"); + //Process 'Users' table 1st as it provides foreign key attribute to other tables + DB.query("SELECT * FROM _backupCache WHERE t_name=?", ["Users"]).then(data => { + Promise.allSettled(data.map(d => updateTableData("Users", JSON.parse(d.data_cache)))).then(result => { + storeBackupData.commit(data, result).then(_ => { + DB.query("SELECT * FROM _backupCache WHERE t_name IS NOT NULL").then(data => { + Promise.allSettled(data.map(d => updateTableData(d.t_name, JSON.parse(d.data_cache)))).then(result => { + storeBackupData.commit(data, result).then(_ => { + DB.query("SELECT * FROM _backupCache WHERE t_name IS NULL").then(data => { + Promise.allSettled(data.map(d => deleteTableData(JSON.parse(d.data_cache)))).then(result => { + storeBackupData.commit(data, result).then(_ => { + console.log("END: BackupCache -> Tables"); + if (!checksum_ref) //No checksum verification + resolve(true); + else + verifyChecksum(checksum_ref) + .then(result => resolve(result)) + .catch(error => reject(error)) + }); + }) }) }) }) - }) - }).catch(error => { - console.error(error); - console.warn("ABORT: BackupCache -> Tables") - }); + }).catch(error => { + console.error(error); + console.warn("ABORT: BackupCache -> Tables"); + reject(false); + }); + }) }) + }).catch(error => { + console.error(error); + console.warn("ABORT: BackupCache -> Tables"); + reject(false); }) - }).catch(error => { - console.error(error); - console.warn("ABORT: BackupCache -> Tables") }) }) + } storeBackupData.commit = function(data, result) { @@ -260,13 +291,13 @@ storeBackupData.commit = function(data, result) { function updateBackupTable(add_data, delete_data) { //update _backup table for added data DB.transaction(add_data.map(r => [ - "INSERT INTO _backup (t_name, id, mode, timestamp) VALUES (?, ?, TRUE, ?) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=?", - [r.t_name, r.id, validateValue(r.timestamp), validateValue(r.timestamp)] + "INSERT INTO _backup (t_name, id, mode, timestamp) VALUE (?, ?, TRUE, ?) AS new ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=new.timestamp", + [r.t_name, r.id, validateValue(r.timestamp)] ])).then(_ => null).catch(error => console.error(error)); //update _backup table for deleted data DB.transaction(delete_data.map(r => [ - "INSERT INTO _backup (t_name, id, mode, timestamp) VALUES (?, ?, NULL, ?) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=?", - [r.t_name, r.id, validateValue(r.timestamp), validateValue(r.timestamp)] + "INSERT INTO _backup (t_name, id, mode, timestamp) VALUE (?, ?, NULL, ?) AS new ON DUPLICATE KEY UPDATE mode=NULL, timestamp=new.timestamp", + [r.t_name, r.id, validateValue(r.timestamp)] ])).then(_ => null).catch(error => console.error(error)); } @@ -296,6 +327,47 @@ function updateTableData(table, data) { const validateValue = val => (typeof val === "string" && /\.\d{3}Z$/.test(val)) ? val.substring(0, val.length - 1) : val; +function verifyChecksum(checksum_ref) { + return new Promise((resolve, reject) => { + DB.query("CHECKSUM TABLE " + Object.keys(checksum_ref).join()).then(result => { + let checksum = Object.fromEntries(result.map(r => [r.Table.split(".").pop(), r.Checksum])); + let mismatch = []; + for (let table in checksum) + if (checksum[table] != checksum_ref[table]) + mismatch.push(table); + console.debug("Mismatch:", mismatch); + if (!mismatch.length) //Checksum of every table is verified. + return resolve(true); + else //If one or more tables checksum is not correct, re-request the table data + Promise.allSettled(mismatch.map(t => DB.query("TRUNCATE " + t))).then(_ => { + requestReSync(mismatch); + resolve(false); + }) + }).catch(error => { + console.error(error); + reject(false); + }) + }) +} + +function requestReSync(tables) { + let self = requestInstance; + let request = { + floID: global.myFloID, + pubKey: global.myPubKey, + type: "RE_SYNC", + tables: tables, + req_time: Date.now() + }; + request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey); + self.ws.send(JSON.stringify(request)); + self.request = request; + self.checksum = null; + self.cache = []; + self.total_add = null; + self.add_count = self.delete_count = 0; +} + module.exports = { SINK_KEY_INDICATOR, set DB(db) { diff --git a/src/backup/sync.js b/src/backup/sync.js new file mode 100644 index 0000000..0026a65 --- /dev/null +++ b/src/backup/sync.js @@ -0,0 +1,213 @@ +var DB; //Container for database + +//Backup Transfer +function sendBackupData(timestamp, checksum, ws) { + if (!timestamp) timestamp = 0; + else if (typeof timestamp === "string" && /\.\d{3}Z$/.test(timestamp)) + timestamp = timestamp.substring(0, timestamp.length - 1); + let promises = [ + backupSync_data(timestamp, ws), + backupSync_delete(timestamp, ws) + ]; + if (checksum) + promises.push(backupSync_checksum(ws)); + Promise.allSettled(promises).then(result => { + let failedSync = []; + result.forEach(r => r.status === "rejected" ? failedSync.push(r.reason) : null); + if (failedSync.length) { + console.info("Backup Sync Failed:", failedSync); + ws.send(JSON.stringify({ + command: "SYNC_END", + status: false, + info: failedSync + })); + } else { + console.info("Backup Sync completed"); + ws.send(JSON.stringify({ + command: "SYNC_END", + status: true + })); + } + }); +} + +function backupSync_delete(timestamp, ws) { + return new Promise((resolve, reject) => { + DB.query("SELECT * FROM _backup WHERE mode is NULL AND timestamp > ?", [timestamp]).then(result => { + ws.send(JSON.stringify({ + command: "SYNC_DELETE", + delete_data: result + })); + resolve("deleteSync"); + }).catch(error => { + console.error(error); + reject("deleteSync"); + }); + }) +} + +function backupSync_data(timestamp, ws) { + const sendTable = (table, id_list) => new Promise((res, rej) => { + DB.query(`SELECT * FROM ${table} WHERE id IN (${id_list})`) + .then(data => { + ws.send(JSON.stringify({ + table, + command: "SYNC_UPDATE", + data + })); + res(table); + }).catch(error => { + console.error(error); + rej(table); + }); + }); + return new Promise((resolve, reject) => { + DB.query("SELECT * FROM _backup WHERE mode=TRUE AND timestamp > ?", [timestamp]).then(result => { + let sync_needed = {}; + result.forEach(r => r.t_name in sync_needed ? sync_needed[r.t_name].push(r.id) : sync_needed[r.t_name] = [r.id]); + ws.send(JSON.stringify({ + command: "SYNC_HEADER", + add_data: result + })); + let promises = []; + for (let table in sync_needed) + promises.push(sendTable(table, sync_needed[table])); + Promise.allSettled(promises).then(result => { + let failedTables = []; + result.forEach(r => r.status === "rejected" ? failedTables.push(r.reason) : null); + if (failedTables.length) + reject(["dataSync", failedTables]); + else + resolve("dataSync"); + }); + }).catch(error => { + console.error(error); + reject("dataSync"); + }); + }); +} + +function backupSync_checksum(ws) { + return new Promise((resolve, reject) => { + DB.query("SELECT DISTINCT t_name FROM _backup").then(result => { + let tableList = result.map(r => r['t_name']); + DB.query("CHECKSUM TABLE " + tableList.join()).then(result => { + let checksum = Object.fromEntries(result.map(r => [r.Table.split(".").pop(), r.Checksum])); + ws.send(JSON.stringify({ + command: "SYNC_CHECKSUM", + checksum: checksum + })); + resolve("checksum"); + }).catch(error => { + console.error(error); + reject("checksum"); + }) + }).catch(error => { + console.error(error); + reject("checksum"); + }) + }) + +} + +function sendTableData(tables, ws) { + let promises = [ + tableSync_data(tables, ws), + tableSync_delete(tables, ws), + tableSync_checksum(tables, ws) + ]; + Promise.allSettled(promises).then(result => { + let failedSync = []; + result.forEach(r => r.status === "rejected" ? failedSync.push(r.reason) : null); + if (failedSync.length) { + console.info("Backup Sync Failed:", failedSync); + ws.send(JSON.stringify({ + command: "SYNC_END", + status: false, + info: failedSync + })); + } else { + console.info("Backup Sync completed"); + ws.send(JSON.stringify({ + command: "SYNC_END", + status: true + })); + } + }); +} + +function tableSync_delete(tables, ws) { + return new Promise((resolve, reject) => { + DB.query(`SELECT * FROM _backup WHERE mode is NULL AND t_name IN (${Array(tables.length).fill("?").join()})`, tables).then(result => { + ws.send(JSON.stringify({ + command: "SYNC_DELETE", + delete_data: result + })); + resolve("deleteSync"); + }).catch(error => { + console.error(error); + reject("deleteSync"); + }); + }) +} + +function tableSync_data(tables, ws) { + const sendTable = table => new Promise((res, rej) => { + DB.query(`SELECT * FROM ${table}`) + .then(data => { + ws.send(JSON.stringify({ + table, + command: "SYNC_UPDATE", + data + })); + res(table); + }).catch(error => { + console.error(error); + rej(table); + }); + }); + return new Promise((resolve, reject) => { + DB.query(`SELECT * FROM _backup WHERE mode=TRUE AND t_name IN (${Array(tables.length).fill("?").join()})`, tables).then(result => { + ws.send(JSON.stringify({ + command: "SYNC_HEADER", + add_data: result + })); + Promise.allSettled(tables.map(t => sendTable(t))).then(result => { + let failedTables = []; + result.forEach(r => r.status === "rejected" ? failedTables.push(r.reason) : null); + if (failedTables.length) + reject(["dataSync", failedTables]); + else + resolve("dataSync"); + }); + }).catch(error => { + console.error(error); + reject("dataSync"); + }); + }); +} + +function tableSync_checksum(tables, ws) { + return new Promise((resolve, reject) => { + DB.query("CHECKSUM TABLE " + tables.join()).then(result => { + let checksum = Object.fromEntries(result.map(r => [r.Table.split(".").pop(), r.Checksum])); + ws.send(JSON.stringify({ + command: "SYNC_CHECKSUM", + checksum: checksum + })); + resolve("checksum"); + }).catch(error => { + console.error(error); + reject("checksum"); + }) + }) + +} + +module.exports = { + sendBackupData, + sendTableData, + set DB(db) { + DB = db; + } +} \ No newline at end of file diff --git a/src/price.js b/src/price.js index c738075..55168ac 100644 --- a/src/price.js +++ b/src/price.js @@ -19,7 +19,7 @@ const updateLastTime = asset => lastTime[asset] = Date.now(); //store FLO price in DB every 1 hr function storeRate(asset, rate) { - DB.query("INSERT INTO PriceHistory (asset, rate) VALUE (?, ?)", [asset, rate]) + DB.query("INSERT INTO PriceHistory (asset, rate) VALUE (?, ?)", [asset, rate.toFixed(3)]) .then(_ => null).catch(error => console.error(error)) } setInterval(() => { @@ -116,7 +116,7 @@ function getRates(asset) { resolve(currentRate[asset]); } else if (noSellOrder[asset]) { //No Sell, But Buy available: Increase the price - checkForRatedSellers().then(result => { + checkForRatedSellers(asset).then(result => { if (result) { let tmp_val = currentRate[asset] * (1 + UP_RATE); if (tmp_val <= ratePast24hr * (1 + MAX_UP_PER_DAY)) { @@ -142,7 +142,7 @@ function checkForRatedSellers(asset) { let ratedMin = result[0].max_p * (1 - TOP_RANGE); DB.query("SELECT COUNT(*) as value FROM SellOrder WHERE floID IN (" + " SELECT UserTag.floID FROM UserTag INNER JOIN TagList ON UserTag.tag = TagList.tag" + - " WHERE TagList.sellPriority > ?)", [ratedMin]).then(result => { + " WHERE TagList.sellPriority > ?) AND asset=?", [ratedMin, asset]).then(result => { resolve(result[0].value > 0); }).catch(error => reject(error)) }).catch(error => reject(error))