Fixed: re-sync until hash is matched
This commit is contained in:
parent
f748bc7a4a
commit
5836ba0686
@ -5,6 +5,8 @@ const { H_struct } = require("../data_structure.json");
|
|||||||
const { packet_, _nextNode, _list } = require("./intra");
|
const { packet_, _nextNode, _list } = require("./intra");
|
||||||
const TYPE = require('./message_types.json');
|
const TYPE = require('./message_types.json');
|
||||||
|
|
||||||
|
const SYNC_WAIT_TIME = 5 * 60 * 1000 //5 mins
|
||||||
|
|
||||||
const queueSync = {
|
const queueSync = {
|
||||||
list: {},
|
list: {},
|
||||||
|
|
||||||
@ -12,7 +14,7 @@ const queueSync = {
|
|||||||
if (node_i in this.list)
|
if (node_i in this.list)
|
||||||
return console.debug("Another sync is in process for ", node_i);
|
return console.debug("Another sync is in process for ", node_i);
|
||||||
console.init(`START: Data sync for ${node_i}`)
|
console.init(`START: Data sync for ${node_i}`)
|
||||||
this.list[node_i] = { ws, ts: Date.now(), q: [], cur: [], hashes: {} };
|
this.list[node_i] = { ws, ts: Date.now(), q: [], cur: new Set(), hashes: {} };
|
||||||
DB.createTable(node_i)
|
DB.createTable(node_i)
|
||||||
.then(result => ws.send(packet_.construct({ type_: TYPE.REQ_HASH, node_i })))
|
.then(result => ws.send(packet_.construct({ type_: TYPE.REQ_HASH, node_i })))
|
||||||
.catch(error => console.error(error));
|
.catch(error => console.error(error));
|
||||||
@ -23,11 +25,11 @@ const queueSync = {
|
|||||||
return console.error(`Queue-Sync: Not active for ${node_i}. Cannot request block ${block_n}`);
|
return console.error(`Queue-Sync: Not active for ${node_i}. Cannot request block ${block_n}`);
|
||||||
let r = this.list[node_i];
|
let r = this.list[node_i];
|
||||||
r.hashes[block_n] = hash;
|
r.hashes[block_n] = hash;
|
||||||
if (r.cur.length) //atleast a block is already in syncing process
|
if (r.cur.size) //atleast a block is already in syncing process
|
||||||
r.q.push(block_n);
|
r.q.push(block_n);
|
||||||
else {
|
else {
|
||||||
console.debug(`Queue-Sync: Started block sync for ${node_i}#${block_n}`);
|
console.debug(`Queue-Sync: Started block sync for ${node_i}#${block_n}`);
|
||||||
r.cur.push(block_n);
|
r.cur.add(block_n);
|
||||||
r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n }))
|
r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n }))
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@ -37,32 +39,40 @@ const queueSync = {
|
|||||||
return console.error(`Queue-Sync: Not active for ${node_i}. Cannot request block ${block_n}`);
|
return console.error(`Queue-Sync: Not active for ${node_i}. Cannot request block ${block_n}`);
|
||||||
|
|
||||||
let r = this.list[node_i];
|
let r = this.list[node_i];
|
||||||
let p = r.cur.indexOf(block_n);
|
if (!r.cur.has(block_n))
|
||||||
if (p == -1)
|
|
||||||
return console.warn(`Queue-Sync: Block ${block_n} not currently syncing`);
|
return console.warn(`Queue-Sync: Block ${block_n} not currently syncing`);
|
||||||
r.cur.splice(p, 1);
|
r.cur.delete(block_n);
|
||||||
console.debug(`Queue-Sync: Finished block sync for ${node_i}#${block_n}`);
|
console.debug(`Queue-Sync: Finished block sync for ${node_i}#${block_n}`);
|
||||||
|
|
||||||
if (r.last_block && block_n === r.last_block)
|
if (r.last_block && block_n === r.last_block)
|
||||||
r.last_block_done = true;
|
r.last_block_done = true;
|
||||||
|
|
||||||
if (!r.cur.length) {
|
if (!r.cur.size && r.q.length) {
|
||||||
if (r.q.length) {
|
//request next block
|
||||||
//request next block
|
let n = r.q.shift();
|
||||||
let n = r.q.shift();
|
console.debug(`Queue-Sync: Started block sync for ${node_i}#${n}`);
|
||||||
console.debug(`Queue-Sync: Started block sync for ${node_i}#${n}`);
|
r.cur.add(n);
|
||||||
r.cur.push(n);
|
r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n: n }))
|
||||||
r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n: n }))
|
|
||||||
} else if (r.last_block_done) {
|
|
||||||
//last block is synced and queue is empty. close the sync
|
|
||||||
delete this.list[node_i];
|
|
||||||
//indicate next node for ordering
|
|
||||||
_nextNode.send(packet_.construct({
|
|
||||||
type_: TYPE_.ORDER_BACKUP,
|
|
||||||
order: _list.get([node_i])
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
//verify hash of block after timeout
|
||||||
|
setTimeout(() => verifyBlockHash(node_i, block_n, r.hashes[block_n]).then(verified => {
|
||||||
|
if (!verified) //hash mistmatch, resync
|
||||||
|
this.add_block(node_i, block_n, r.hashes[block_n]);
|
||||||
|
else {//hash matched
|
||||||
|
delete r.hashes[block_n];
|
||||||
|
if (!r.cur.size && !r.q.length && r.last_block_done) {
|
||||||
|
if (Object.keys(r.hashes).length)
|
||||||
|
console.warn(`Queue-Sync: queue list is empty, but hash list is not empty for ${node_i}`);
|
||||||
|
//all blocks synced, remove the sync instance
|
||||||
|
delete this.list[node_i];
|
||||||
|
//indicate next node for ordering
|
||||||
|
_nextNode.send(packet_.construct({
|
||||||
|
type_: TYPE_.ORDER_BACKUP,
|
||||||
|
order: _list.get([node_i])
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).catch(error => console.error(error)), SYNC_WAIT_TIME);
|
||||||
},
|
},
|
||||||
|
|
||||||
last_block(node_i, block_n) {
|
last_block(node_i, block_n) {
|
||||||
@ -123,23 +133,33 @@ function sendBlockHashes(node_i, ws) {
|
|||||||
|
|
||||||
//R: check hash and request data if hash mismatch
|
//R: check hash and request data if hash mismatch
|
||||||
function checkBlockHash(node_i, block_n, hash) {
|
function checkBlockHash(node_i, block_n, hash) {
|
||||||
let t_name = _.t_name(node_i);
|
verifyBlockHash(node_i, block_n, hash).then(result => {
|
||||||
getColumns(t_name).then(columns => {
|
if (!result) //Hash mismatch: ReSync block
|
||||||
let statement = `SELECT MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash`
|
queueSync.add_block(node_i, block_n, hash)
|
||||||
+ `FROM ${t_name} WHERE ${_.block_calc_sql} = ?`;
|
|
||||||
Database.query(statement, [block_n]).then(result => {
|
|
||||||
if (!result.length || result[0].hash != hash) {
|
|
||||||
//ReSync Block
|
|
||||||
let clear_statement = `DELETE FROM ${t_name} WHERE `
|
|
||||||
+ `CEIL(CAST(${H_struct.VECTOR_CLOCK} AS UNSIGNED) / (${floGlobals.sn_config.hash_n})) = ?`;
|
|
||||||
Database.query(clear_statement, [block_n])
|
|
||||||
.then(_ => queueSync.add_block(node_i, block_n, hash))
|
|
||||||
.catch(error => console.error(error))
|
|
||||||
}
|
|
||||||
}).catch(error => console.error(error))
|
|
||||||
}).catch(error => console.error(error))
|
}).catch(error => console.error(error))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function verifyBlockHash(node_i, block_n, hash) {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
let t_name = _.t_name(node_i);
|
||||||
|
getColumns(t_name).then(columns => {
|
||||||
|
let statement = `SELECT MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash`
|
||||||
|
+ `FROM ${t_name} WHERE ${_.block_calc_sql} = ?`;
|
||||||
|
Database.query(statement, [block_n]).then(result => {
|
||||||
|
if (!result.length || result[0].hash != hash) { //Hash mismatch
|
||||||
|
//ReSync Block
|
||||||
|
let clear_statement = `DELETE FROM ${t_name} WHERE `
|
||||||
|
+ `CEIL(CAST(${H_struct.VECTOR_CLOCK} AS UNSIGNED) / (${floGlobals.sn_config.hash_n})) = ?`;
|
||||||
|
Database.query(clear_statement, [block_n])
|
||||||
|
.then(_ => resolve(false))
|
||||||
|
.catch(error => reject(error))
|
||||||
|
} else //Hash is verified
|
||||||
|
resolve(true);
|
||||||
|
}).catch(error => reject(error))
|
||||||
|
}).catch(error => reject(error))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
//R: set last block number
|
//R: set last block number
|
||||||
function setLastBlock(node_i, block_n) {
|
function setLastBlock(node_i, block_n) {
|
||||||
queueSync.last_block(node_i, block_n);
|
queueSync.last_block(node_i, block_n);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user