Update hash algo in backup sync
This commit is contained in:
parent
bf692f05a7
commit
9c08160ab4
@ -7,7 +7,7 @@ const TYPE_ = require('./message_types.json');
|
|||||||
const { _list, packet_, _nextNode } = require("./values");
|
const { _list, packet_, _nextNode } = require("./values");
|
||||||
const { SYNC_WAIT_TIME } = require('../_constants')['backup'];
|
const { SYNC_WAIT_TIME } = require('../_constants')['backup'];
|
||||||
|
|
||||||
const SESSION_GROUP_CONCAT_MAX_LENGTH = 100000 //MySQL default max value is 1024, which is too low for block grouping
|
//const SESSION_GROUP_CONCAT_MAX_LENGTH = 100000 //MySQL default max value is 1024, which is too low for block grouping
|
||||||
|
|
||||||
const col_aggregate = (function () {
|
const col_aggregate = (function () {
|
||||||
let ds = require("../data_structure.json");
|
let ds = require("../data_structure.json");
|
||||||
@ -28,7 +28,7 @@ const _x = {
|
|||||||
return `${H_struct.VECTOR_CLOCK} > '${lower_vc}' AND ${H_struct.VECTOR_CLOCK} < '${upper_vc}'`;
|
return `${H_struct.VECTOR_CLOCK} > '${lower_vc}' AND ${H_struct.VECTOR_CLOCK} < '${upper_vc}'`;
|
||||||
},
|
},
|
||||||
get hash_algo_sql() {
|
get hash_algo_sql() {
|
||||||
return `MD5(GROUP_CONCAT(${col_aggregate}))`;
|
return `SUM(CRC32(MD5(${col_aggregate})))`;
|
||||||
},
|
},
|
||||||
t_name(node_i) {
|
t_name(node_i) {
|
||||||
return '_' + node_i;
|
return '_' + node_i;
|
||||||
@ -113,13 +113,13 @@ const queueSync = {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
function setSessionVar() {
|
/*function setSessionVar() {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
Database.query(`SET SESSION group_concat_max_len = ${SESSION_GROUP_CONCAT_MAX_LENGTH}`)
|
Database.query(`SET SESSION group_concat_max_len = ${SESSION_GROUP_CONCAT_MAX_LENGTH}`)
|
||||||
.then(result => resolve(result))
|
.then(result => resolve(result))
|
||||||
.catch(error => reject(error))
|
.catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}*/
|
||||||
|
|
||||||
//R: request hashes for node_i
|
//R: request hashes for node_i
|
||||||
function requestDataSync(node_i, ws) {
|
function requestDataSync(node_i, ws) {
|
||||||
@ -131,27 +131,27 @@ function sendBlockHashes(node_i, ws) {
|
|||||||
if (!_list.stored.includes(node_i))
|
if (!_list.stored.includes(node_i))
|
||||||
return console.debug(`Block hash is requested for ${node_i}, but its not in stored list`);
|
return console.debug(`Block hash is requested for ${node_i}, but its not in stored list`);
|
||||||
let t_name = _x.t_name(node_i);
|
let t_name = _x.t_name(node_i);
|
||||||
setSessionVar().then(_ => {
|
//setSessionVar().then(_ => {
|
||||||
let statement = `SELECT ${_x.block_calc_sql} AS block_n, ${_x.hash_algo_sql} as hash`
|
let statement = `SELECT ${_x.block_calc_sql} AS block_n, ${_x.hash_algo_sql} as hash`
|
||||||
+ ` FROM ${t_name} GROUP BY block_n ORDER BY block_n`;
|
+ ` FROM ${t_name} GROUP BY block_n ORDER BY block_n`;
|
||||||
let last_block;
|
let last_block;
|
||||||
Database.query_stream(statement, r => {
|
Database.query_stream(statement, r => {
|
||||||
ws.send(packet_.construct({
|
ws.send(packet_.construct({
|
||||||
type_: TYPE_.RES_HASH,
|
type_: TYPE_.RES_HASH,
|
||||||
node_i: node_i,
|
node_i: node_i,
|
||||||
block_n: r.block_n,
|
block_n: r.block_n,
|
||||||
hash: r.hash
|
hash: r.hash
|
||||||
}));
|
}));
|
||||||
last_block = r.block_n;
|
last_block = r.block_n;
|
||||||
}).then(result => {
|
}).then(result => {
|
||||||
console.debug(`Sent ${result} block hashes`);
|
console.debug(`Sent ${result} block hashes`);
|
||||||
ws.send(packet_.construct({
|
ws.send(packet_.construct({
|
||||||
type_: TYPE_.VAL_LAST_BLK,
|
type_: TYPE_.VAL_LAST_BLK,
|
||||||
node_i: node_i,
|
node_i: node_i,
|
||||||
block_n: last_block,
|
block_n: last_block,
|
||||||
}));
|
}));
|
||||||
}).catch(error => console.error(error))
|
}).catch(error => console.error(error))
|
||||||
}).catch(error => console.error(error));
|
//}).catch(error => console.error(error));
|
||||||
}
|
}
|
||||||
|
|
||||||
//R: check hash and request data if hash mismatch
|
//R: check hash and request data if hash mismatch
|
||||||
@ -165,20 +165,20 @@ function checkBlockHash(node_i, block_n, hash) {
|
|||||||
function verifyBlockHash(node_i, block_n, hash) {
|
function verifyBlockHash(node_i, block_n, hash) {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
let t_name = _x.t_name(node_i);
|
let t_name = _x.t_name(node_i);
|
||||||
setSessionVar().then(_ => {
|
//setSessionVar().then(_ => {
|
||||||
let statement = `SELECT ${_x.hash_algo_sql} as hash`
|
let statement = `SELECT ${_x.hash_algo_sql} as hash`
|
||||||
+ ` FROM ${t_name} WHERE ${_x.block_calc_sql} = ?`;
|
+ ` FROM ${t_name} WHERE ${_x.block_calc_sql} = ?`;
|
||||||
Database.query(statement, [block_n]).then(result => {
|
Database.query(statement, [block_n]).then(result => {
|
||||||
if (!result.length || result[0].hash != hash) { //Hash mismatch
|
if (!result.length || result[0].hash != hash) { //Hash mismatch
|
||||||
//ReSync Block
|
//ReSync Block
|
||||||
let clear_statement = `DELETE FROM ${t_name} WHERE ${_x.block_range_sql(block_n)}`;
|
let clear_statement = `DELETE FROM ${t_name} WHERE ${_x.block_range_sql(block_n)}`;
|
||||||
Database.query(clear_statement)
|
Database.query(clear_statement)
|
||||||
.then(_ => resolve(false))
|
.then(_ => resolve(false))
|
||||||
.catch(error => reject(error))
|
.catch(error => reject(error))
|
||||||
} else //Hash is verified
|
} else //Hash is verified
|
||||||
resolve(true);
|
resolve(true);
|
||||||
}).catch(error => reject(error))
|
|
||||||
}).catch(error => reject(error))
|
}).catch(error => reject(error))
|
||||||
|
//}).catch(error => reject(error))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user