From 49dce2b8b6c5cf0c7f7a8d947bd59bb4fe705d34 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Wed, 21 Dec 2022 03:33:06 +0530 Subject: [PATCH] Bug fixes - moved constants to _constants.js - Fixed: query stream not working with values - Fixed: sync not closing - Fixed: various minor bugs --- src/_constants.js | 19 ++++++++++ src/backup/intra.js | 14 ++++--- src/backup/sync.js | 87 ++++++++++++++++++++++---------------------- src/backup/values.js | 10 +++-- src/client.js | 9 +---- src/database.js | 13 ++++--- src/floGlobals.js | 1 + src/main.js | 2 +- src/server.js | 4 +- 9 files changed, 88 insertions(+), 71 deletions(-) create mode 100644 src/_constants.js diff --git a/src/_constants.js b/src/_constants.js new file mode 100644 index 0000000..aa97f2b --- /dev/null +++ b/src/_constants.js @@ -0,0 +1,19 @@ +const INVALID = function (message) { + if (!(this instanceof INVALID)) + return new INVALID(message); + this.message = message; +} + +module.exports = { + INVALID, + INVALID_E_CODE: 400, + INTERNAL_E_CODE: 500, + + INTERVAL_REFRESH_TIME: 1 * 60 * 60 * 1000, //1 hr + + backup: { + RETRY_TIMEOUT: 5 * 60 * 1000, //5 mins + MIGRATE_WAIT_DELAY: 5 * 60 * 1000, //5 mins + SYNC_WAIT_TIME: 10*1000//5 * 60 * 1000 //5 mins + } +} \ No newline at end of file diff --git a/src/backup/intra.js b/src/backup/intra.js index 640b0dc..c6673d7 100644 --- a/src/backup/intra.js +++ b/src/backup/intra.js @@ -3,13 +3,9 @@ const WebSocket = require('ws'); const { DB } = require('../database'); const keys = require('../keys'); const TYPE_ = require('./message_types.json'); -const { _list, packet_, _nextNode, _prevNode } = require("./values"); +const { _list, packet_, _nextNode, _prevNode, SUPERNODE_INDICATOR } = require("./values"); const sync = require('./sync'); - -//CONSTANTS -const SUPERNODE_INDICATOR = '$', - RETRY_TIMEOUT = 5 * 60 * 1000, //5 mins - MIGRATE_WAIT_DELAY = 5 * 60 * 1000; //5 mins +const { RETRY_TIMEOUT, MIGRATE_WAIT_DELAY } = require('../_constants')['backup']; var refresher; //container for and refresher @@ -94,6 +90,9 @@ function connectToAllActiveNodes(nodes = null) { //-----PROCESS TASKS----- +_nextNode.onmessage = evt => processTaskFromNextNode(evt.data); +_nextNode.onclose = evt => reconnectNextNode(); + //Tasks from next-node function processTaskFromNextNode(packet) { console.debug("_nextNode: ", packet); @@ -127,6 +126,9 @@ function processTaskFromNextNode(packet) { }; }; +_prevNode.onmessage = evt => processTaskFromPrevNode(evt.data); +_prevNode.onclose = evt => _prevNode.close(); + //Tasks from prev-node function processTaskFromPrevNode(packet) { console.debug("_prevNode: ", packet); diff --git a/src/backup/sync.js b/src/backup/sync.js index eeec76f..78922c4 100644 --- a/src/backup/sync.js +++ b/src/backup/sync.js @@ -1,11 +1,20 @@ +'use strict'; const Database = require("../database"); const DB = Database.DB; const floGlobals = require("../floGlobals"); const { H_struct } = require("../data_structure.json"); -const TYPE = require('./message_types.json'); +const TYPE_ = require('./message_types.json'); const { _list, packet_, _nextNode } = require("./values"); +const { SYNC_WAIT_TIME } = require('../_constants')['backup']; -const SYNC_WAIT_TIME = 5 * 60 * 1000 //5 mins +const _ = { + get block_calc_sql() { + return `CEIL(CAST(${H_struct.VECTOR_CLOCK} AS UNSIGNED) / (${floGlobals.sn_config.blockInterval}))` + }, + t_name(node_i) { + return '_' + node_i; + } +} const queueSync = { list: {}, @@ -13,10 +22,10 @@ const queueSync = { init(node_i, ws) { if (node_i in this.list) return console.debug("Another sync is in process for ", node_i); - console.init(`START: Data sync for ${node_i}`) + console.info(`START: Data sync for ${node_i}`) this.list[node_i] = { ws, ts: Date.now(), q: [], cur: new Set(), hashes: {} }; DB.createTable(node_i) - .then(result => ws.send(packet_.construct({ type_: TYPE.REQ_HASH, node_i }))) + .then(result => ws.send(packet_.construct({ type_: TYPE_.REQ_HASH, node_i }))) .catch(error => console.error(error)); }, @@ -30,7 +39,7 @@ const queueSync = { else { console.debug(`Queue-Sync: Started block sync for ${node_i}#${block_n}`); r.cur.add(block_n); - r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n })) + r.ws.send(packet_.construct({ type_: TYPE_.REQ_BLOCK, node_i, block_n })) } }, @@ -44,53 +53,44 @@ const queueSync = { r.cur.delete(block_n); console.debug(`Queue-Sync: Finished block sync for ${node_i}#${block_n}`); - if (r.last_block && block_n === r.last_block) - r.last_block_done = true; - if (!r.cur.size && r.q.length) { //request next block let n = r.q.shift(); console.debug(`Queue-Sync: Started block sync for ${node_i}#${n}`); r.cur.add(n); - r.ws.send(packet_.construct({ type_: TYPE.REQ_BLOCK, node_i, block_n: n })) + r.ws.send(packet_.construct({ type_: TYPE_.REQ_BLOCK, node_i, block_n: n })) } //verify hash of block after timeout setTimeout(() => verifyBlockHash(node_i, block_n, r.hashes[block_n]).then(verified => { if (!verified) //hash mistmatch, resync this.add_block(node_i, block_n, r.hashes[block_n]); - else {//hash matched + else //hash matched delete r.hashes[block_n]; - if (!r.cur.size && !r.q.length && r.last_block_done) { - if (Object.keys(r.hashes).length) - console.warn(`Queue-Sync: queue list is empty, but hash list is not empty for ${node_i}`); - //all blocks synced, remove the sync instance - delete this.list[node_i]; - //indicate next node for ordering - _nextNode.send(packet_.construct({ - type_: TYPE_.ORDER_BACKUP, - order: _list.get([node_i]) - })); - } - } }).catch(error => console.error(error)), SYNC_WAIT_TIME); }, last_block(node_i, block_n) { if (!(node_i in this.list)) return console.error(`Queue-Sync: Not active for ${node_i}. Cannot request block ${block_n}`); - this.list[node_i].last_block = block_n; + let r = this.list[node_i]; + r.last_block = block_n; + r.check_interval = setInterval(() => { + if (!r.cur.size && !r.q.length) { + if (Object.keys(r.hashes).length) + console.warn(`Queue-Sync: queue list is empty, but hash list is not empty for ${node_i}`); + //all blocks synced, remove the sync instance + console.info(`END: Data sync for ${node_i}`); + delete this.list[node_i]; + //indicate next node for ordering + _nextNode.send(packet_.construct({ + type_: TYPE_.ORDER_BACKUP, + order: _list.get([node_i]) + })); + } + }, SYNC_WAIT_TIME); } }; -const _ = { - get block_calc_sql() { - return `CEIL(CAST(${H_struct.VECTOR_CLOCK} AS UNSIGNED) / (${floGlobals.sn_config.hash_n}))` - }, - t_name(node_i) { - return '_' + node_i; - } -} - function getColumns(t_name) { return new Promise((resolve, reject) => { Database.query("SHOW COLUMNS FROM " + t_name) @@ -109,21 +109,21 @@ function sendBlockHashes(node_i, ws) { let t_name = _.t_name(node_i); getColumns(t_name).then(columns => { let statement = `SELECT ${_.block_calc_sql} AS block_n,` - + `MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash` - + `FROM ${t_name} GROUP BY block_n ORDER BY block_n`; + + ` MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash` + + ` FROM ${t_name} GROUP BY block_n ORDER BY block_n`; let last_block; Database.query_stream(statement, r => { ws.send(packet_.construct({ - type_: TYPE.RES_HASH, + type_: TYPE_.RES_HASH, node_i: node_i, block_n: r.block_n, hash: r.hash })); last_block = r.block_n; }).then(result => { - console.debug(`Send ${result} block hashes`); + console.debug(`Sent ${result} block hashes`); ws.send(packet_.construct({ - type_: TYPE.VAL_LAST_BLK, + type_: TYPE_.VAL_LAST_BLK, node_i: node_i, block_n: last_block, })); @@ -144,12 +144,11 @@ function verifyBlockHash(node_i, block_n, hash) { let t_name = _.t_name(node_i); getColumns(t_name).then(columns => { let statement = `SELECT MD5(GROUP_CONCAT(${columns.map(c => `IFNULL(${c}, "NULL")`).join()})) as hash` - + `FROM ${t_name} WHERE ${_.block_calc_sql} = ?`; + + ` FROM ${t_name} WHERE ${_.block_calc_sql} = ?`; Database.query(statement, [block_n]).then(result => { if (!result.length || result[0].hash != hash) { //Hash mismatch //ReSync Block - let clear_statement = `DELETE FROM ${t_name} WHERE ` - + `CEIL(CAST(${H_struct.VECTOR_CLOCK} AS UNSIGNED) / (${floGlobals.sn_config.hash_n})) = ?`; + let clear_statement = `DELETE FROM ${t_name} WHERE ${_.block_calc_sql} = ?`; Database.query(clear_statement, [block_n]) .then(_ => resolve(false)) .catch(error => reject(error)) @@ -169,21 +168,21 @@ function setLastBlock(node_i, block_n) { function sendBlockData(node_i, block_n, ws) { let t_name = _.t_name(node_i); ws.send(packet_.construct({ - type_: TYPE.INDICATE_BLK, + type_: TYPE_.INDICATE_BLK, node_i, block_n, status: true })) console.debug(`START: Sync-send ${node_i}#${block_n} to ${ws.id}`); let statement = `SELECT * FROM ${t_name} WHERE ${_.block_calc_sql} = ?`; Database.query_stream(statement, [block_n], d => ws.send(packet_.construct({ - type_: TYPE.STORE_BACKUP_DATA, + type_: TYPE_.STORE_BACKUP_DATA, data: d }))).then(result => console.debug(`END: Sync-send ${node_i}#${block_n} to ${ws.id} (${result} records)`)) .catch(error => { console.debug(`ERROR: Sync-send ${node_i}#${block_n} to ${ws.id}`) console.error(error); }).finally(_ => ws.send(packet_.construct({ - type_: TYPE.INDICATE_BLK, + type_: TYPE_.INDICATE_BLK, node_i, block_n, status: false }))); @@ -195,7 +194,7 @@ function endBlockSync(node_i, block_n) { } function syncIndicator(node_i, block_n, status, from) { - console.debug(`${status ? 'START' : 'END'}: Sync-receive ${node_i}#${block_n} from ${from}`); + console.debug(`${status ? 'Start' : 'End'}: Sync-receive ${node_i}#${block_n} from ${from}`); if (!status) endBlockSync(node_i, block_n); } diff --git a/src/backup/values.js b/src/backup/values.js index e39e2bc..58f90b2 100644 --- a/src/backup/values.js +++ b/src/backup/values.js @@ -1,3 +1,8 @@ +'use strict'; +const keys = require('../keys'); + +const SUPERNODE_INDICATOR = '$'; + //List of node backups stored const _list = {}; Object.defineProperty(_list, 'delete', { @@ -92,12 +97,8 @@ function NodeContainer() { //Container for next-node const _nextNode = new NodeContainer(); -_nextNode.onmessage = evt => processTaskFromNextNode(evt.data); -_nextNode.onclose = evt => reconnectNextNode(); //Container for prev-node const _prevNode = new NodeContainer(); -_prevNode.onmessage = evt => processTaskFromPrevNode(evt.data); -_prevNode.onclose = evt => _prevNode.close(); //Packet processing const packet_ = {}; @@ -135,4 +136,5 @@ module.exports = { _nextNode, _prevNode, packet_, + SUPERNODE_INDICATOR } \ No newline at end of file diff --git a/src/client.js b/src/client.js index 92a3467..4e2a000 100644 --- a/src/client.js +++ b/src/client.js @@ -1,11 +1,6 @@ const { DB } = require("./database"); const { _list } = require("./backup/values"); - -global.INVALID = function (message) { - if (!(this instanceof INVALID)) - return new INVALID(message); - this.message = message; -} +const { INVALID } = require("./_constants"); function processIncomingData(data) { return new Promise((resolve, reject) => { @@ -37,7 +32,7 @@ function processIncomingData(data) { else return reject(INVALID("Invalid Data-format")); process.then(result => { - console.debug(result); + //console.debug(result); resolve(result); }).catch(error => { (error instanceof INVALID ? console.debug : console.error)(error); diff --git a/src/database.js b/src/database.js index 7da3e58..613741f 100644 --- a/src/database.js +++ b/src/database.js @@ -45,15 +45,15 @@ function queryResolve(sql, values) { }) } -function queryStream(sql, value, callback) { +function queryStream(sql, values, callback) { return new Promise((resolve, reject) => { getConnection().then(conn => { - if (!callback && value instanceof Function) { - callback = value; - value = undefined; + if (!callback && values instanceof Function) { + callback = values; + values = undefined; } var err_flag, row_count = 0; - (value ? conn.query(sql, values) : conn.query(sql)) + (values ? conn.query(sql, values) : conn.query(sql)) .on('error', err => err_flag = err) .on('result', row => { row_count++; @@ -476,11 +476,12 @@ DB.clearAuthorisedAppData = function (snID, app, adminID, subAdmins, timestamp) DB.clearUnauthorisedAppData = function (snID, appList, timestamp) { return new Promise((resolve, reject) => { - let statement = "DELETE FROM _" + snID + + let statement = "SELECT * FROM _" + snID + " WHERE " + H_struct.TIME + " "?").join(", ") + ")" : ""); + console.debug(statement, [timestamp].concat(appList)) queryResolve(statement, [timestamp].concat(appList)) .then(result => resolve(result)) .catch(error => reject(error)); diff --git a/src/floGlobals.js b/src/floGlobals.js index 6b9b829..19e9c75 100644 --- a/src/floGlobals.js +++ b/src/floGlobals.js @@ -16,6 +16,7 @@ const floGlobals = { deleteDelay - (Interger) Maximum duration (milliseconds) an unauthorised data is stored errorFeedback - (Boolean) Send error (if any) feedback to the requestor delayDelta - (Interger) Maximum allowed delay from the data-time + blockInterval - (Interger) Duration (milliseconds) of a single block (for backup sync) */ }; diff --git a/src/main.js b/src/main.js index 4190da6..56ce20e 100644 --- a/src/main.js +++ b/src/main.js @@ -8,9 +8,9 @@ const Database = require("./database"); const intra = require('./backup/intra'); const Server = require('./server'); const keys = require("./keys"); +const { INTERVAL_REFRESH_TIME } = require("./_constants"); const DB = Database.DB; -const INTERVAL_REFRESH_TIME = 1 * 60 * 60 * 1000; //1 hr function startNode() { diff --git a/src/server.js b/src/server.js index 7f1ff0e..1619fb2 100644 --- a/src/server.js +++ b/src/server.js @@ -3,9 +3,7 @@ const WebSocket = require('ws'); const url = require('url'); const intra = require('./backup/intra'); const client = require('./client'); - -const INVALID_E_CODE = 400, - INTERNAL_E_CODE = 500; +const { INVALID, INVALID_E_CODE, INTERNAL_E_CODE } = require('./_constants'); module.exports = function Server(port) {