diff --git a/src/backup/sync.js b/src/backup/sync.js index 8c3d308..848d7a3 100644 --- a/src/backup/sync.js +++ b/src/backup/sync.js @@ -5,6 +5,8 @@ const { H_struct } = require("../data_structure.json"); const { packet_, _nextNode, _list } = require("./intra"); const TYPE = require('./message_types.json'); +const SYNC_WAIT_TIME = 5 * 60 * 1000 //5 mins + const queueSync = { list: {}, @@ -12,7 +14,7 @@ const queueSync = { if (node_i in this.list) return console.debug("Another sync is in process for ", node_i); console.init(`START: Data sync for ${node_i}`) - this.list[node_i] = { ws, ts: Date.now(), q: [], cur: [], hashes: {} }; + this.list[node_i] = { ws, ts: Date.now(), q: [], cur: new Set(), hashes: {} }; DB.createTable(node_i) .then(result => ws.send(packet_.construct({ type_: TYPE.REQ_HASH, node_i }))) .catch(error => console.error(error)); @@ -23,11 +25,11 @@ const queueSync = { return console.error(`Queue-Sync: Not active for ${node_i}. Cannot request block ${block_n}`); let r = this.list[node_i]; r.hashes[block_n] = hash; - if (r.cur.length) //atleast a block is already in syncing process + if (r.cur.size) //atleast a block is already in syncing process r.q.push(block_n); else { console.debug(`Queue-Sync: Started block sync for ${node_i}#${block_n}`); - r.cur.push(block_n); + r.cur.add(block_n); r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n })) } }, @@ -37,32 +39,40 @@ const queueSync = { return console.error(`Queue-Sync: Not active for ${node_i}. Cannot request block ${block_n}`); let r = this.list[node_i]; - let p = r.cur.indexOf(block_n); - if (p == -1) + if (!r.cur.has(block_n)) return console.warn(`Queue-Sync: Block ${block_n} not currently syncing`); - r.cur.splice(p, 1); + r.cur.delete(block_n); console.debug(`Queue-Sync: Finished block sync for ${node_i}#${block_n}`); if (r.last_block && block_n === r.last_block) r.last_block_done = true; - if (!r.cur.length) { - if (r.q.length) { - //request next block - let n = r.q.shift(); - console.debug(`Queue-Sync: Started block sync for ${node_i}#${n}`); - r.cur.push(n); - r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n: n })) - } else if (r.last_block_done) { - //last block is synced and queue is empty. close the sync - delete this.list[node_i]; - //indicate next node for ordering - _nextNode.send(packet_.construct({ - type_: TYPE_.ORDER_BACKUP, - order: _list.get([node_i]) - })); - } + if (!r.cur.size && r.q.length) { + //request next block + let n = r.q.shift(); + console.debug(`Queue-Sync: Started block sync for ${node_i}#${n}`); + r.cur.add(n); + r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n: n })) } + //verify hash of block after timeout + setTimeout(() => verifyBlockHash(node_i, block_n, r.hashes[block_n]).then(verified => { + if (!verified) //hash mistmatch, resync + this.add_block(node_i, block_n, r.hashes[block_n]); + else {//hash matched + delete r.hashes[block_n]; + if (!r.cur.size && !r.q.length && r.last_block_done) { + if (Object.keys(r.hashes).length) + console.warn(`Queue-Sync: queue list is empty, but hash list is not empty for ${node_i}`); + //all blocks synced, remove the sync instance + delete this.list[node_i]; + //indicate next node for ordering + _nextNode.send(packet_.construct({ + type_: TYPE_.ORDER_BACKUP, + order: _list.get([node_i]) + })); + } + } + }).catch(error => console.error(error)), SYNC_WAIT_TIME); }, last_block(node_i, block_n) { @@ -123,23 +133,33 @@ function sendBlockHashes(node_i, ws) { //R: check hash and request data if hash mismatch function checkBlockHash(node_i, block_n, hash) { - let t_name = _.t_name(node_i); - getColumns(t_name).then(columns => { - let statement = `SELECT MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash` - + `FROM ${t_name} WHERE ${_.block_calc_sql} = ?`; - Database.query(statement, [block_n]).then(result => { - if (!result.length || result[0].hash != hash) { - //ReSync Block - let clear_statement = `DELETE FROM ${t_name} WHERE ` - + `CEIL(CAST(${H_struct.VECTOR_CLOCK} AS UNSIGNED) / (${floGlobals.sn_config.hash_n})) = ?`; - Database.query(clear_statement, [block_n]) - .then(_ => queueSync.add_block(node_i, block_n, hash)) - .catch(error => console.error(error)) - } - }).catch(error => console.error(error)) + verifyBlockHash(node_i, block_n, hash).then(result => { + if (!result) //Hash mismatch: ReSync block + queueSync.add_block(node_i, block_n, hash) }).catch(error => console.error(error)) } +function verifyBlockHash(node_i, block_n, hash) { + return new Promise((resolve, reject) => { + let t_name = _.t_name(node_i); + getColumns(t_name).then(columns => { + let statement = `SELECT MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash` + + `FROM ${t_name} WHERE ${_.block_calc_sql} = ?`; + Database.query(statement, [block_n]).then(result => { + if (!result.length || result[0].hash != hash) { //Hash mismatch + //ReSync Block + let clear_statement = `DELETE FROM ${t_name} WHERE ` + + `CEIL(CAST(${H_struct.VECTOR_CLOCK} AS UNSIGNED) / (${floGlobals.sn_config.hash_n})) = ?`; + Database.query(clear_statement, [block_n]) + .then(_ => resolve(false)) + .catch(error => reject(error)) + } else //Hash is verified + resolve(true); + }).catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + //R: set last block number function setLastBlock(node_i, block_n) { queueSync.last_block(node_i, block_n);