diff --git a/src/backup/head.js b/src/backup/head.js index 4a70b02..e0f3afb 100644 --- a/src/backup/head.js +++ b/src/backup/head.js @@ -276,9 +276,10 @@ function informLiveNodes(init) { } else console.warn(`Node(${nodes[i]}) is offline`); if (init) { - if (flag === true) + if (flag === true) { collectShares.active = true; - else { + syncRequest(); + } else { //No other node is active (possible 1st node to start exchange) console.debug("Starting the exchange...") let newSink = generateNewSink(); @@ -294,8 +295,19 @@ function informLiveNodes(init) { }); } +function syncRequest(cur = global.myFloID) { + //Sync data from next available node + console.debug("syncRequest", cur); + let nextNode = nodeKBucket.nextNode(cur); + if (!nextNode) + return console.warn("No nodes available to Sync"); + connectWS(nextNode) + .then(ws => slave.syncRequest(ws)) + .catch(_ => syncRequest(nextNode)); +} + function updateMaster(floID) { - let currentMaster = mod === MASTER_MODE ? global.floID : slave.masterWS.floID; + let currentMaster = mod === MASTER_MODE ? global.myFloID : slave.masterWS.floID; if (nodeList.indexOf(floID) < nodeList.indexOf(currentMaster)) connectToMaster(); } diff --git a/src/backup/slave.js b/src/backup/slave.js index 2210413..ed670c0 100644 --- a/src/backup/slave.js +++ b/src/backup/slave.js @@ -1,7 +1,7 @@ 'use strict'; -const WAIT_TIME = 30 * 60 * 1000, - BACKUP_INTERVAL = 10 * 60 * 1000; +const WAIT_TIME = 10 * 60 * 1000, + BACKUP_INTERVAL = 1 * 60 * 1000; var DB; //Container for Database connection var masterWS = null; //Container for Master websocket connection @@ -17,7 +17,7 @@ function startSlaveProcess(ws) { masterWS = ws; //inform master let message = { - floID: global.floID, + floID: global.myFloID, pubKey: global.pubKey, req_time: Date.now(), type: "SLAVE_CONNECT" @@ -46,11 +46,11 @@ function requestBackupSync(ws) { Users: "created", Request_Log: "request_time", Transactions: "tx_time", - priceHistory: "rec_time", + //priceHistory: "rec_time", _backup: "timestamp" }; let subs = []; - for (t in tables) + for (let t in tables) subs.push(`SELECT MAX(${tables[t]}) as ts FROM ${t}`); DB.query(`SELECT MAX(ts) as last_time FROM (${subs.join(' UNION ')}) AS Z`).then(result => { let request = { @@ -61,7 +61,6 @@ function requestBackupSync(ws) { req_time: Date.now() }; request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey); - console.debug("REQUEST: ", request); ws.send(JSON.stringify(request)); resolve(request); }).catch(error => reject(error)) @@ -77,10 +76,11 @@ const requestInstance = { add_data: null, total_add: null, request: null, + onetime: null, last_response_time: null }; -requestInstance.open = function() { +requestInstance.open = function(ws = null) { const self = this; //Check if there is an active request if (self.request) { @@ -90,16 +90,25 @@ requestInstance.open = function() { else return; } - if (!masterWS) - return console.warn("Not connected to master"); - requestBackupSync(masterWS).then(request => { + //Use websocket connection if passed, else use masterWS if available + if (ws) { + ws.on('message', processDataFromMaster); + self.onetime = true; + } else if (masterWS) + ws = masterWS; + else return console.warn("Not connected to master"); + + requestBackupSync(ws).then(request => { self.request = request; - self.ws = masterWS; + self.ws = ws; }).catch(error => console.error(error)) } requestInstance.close = function() { const self = this; + if (self.onetime) + self.ws.close(); + self.onetime = null; self.ws = null; self.delete_sync = null; self.add_sync = null; @@ -215,9 +224,11 @@ function deleteData(data) { } function addUpdateData(table, data) { + if (!data.length) + return; let cols = Object.keys(data[0]), _mark = "(" + Array(cols.length).fill('?') + ")"; - values = data.map(r => cols.map(c => validateValue(r[c]))).flat(); + let values = data.map(r => cols.map(c => validateValue(r[c]))).flat(); let statement = `INSERT INTO ${table} (${cols}) VALUES ${Array(data.length).fill(_mark)} AS new` + " ON DUPLICATE KEY UPDATE " + cols.filter(c => c != 'id').map(c => c + " = new." + c); DB.query(statement, values).then(_ => null).catch(error => console.error(error)); @@ -231,7 +242,7 @@ function addImmutableData(table, data) { }; let cols = Object.keys(data[0]), _mark = "(" + Array(cols.length).fill('?') + ")"; - values = data.map(r => cols.map(c => validateValue(r[c]))).flat(); + let values = data.map(r => cols.map(c => validateValue(r[c]))).flat(); let statement = `INSERT INTO ${table} (${cols}) VALUES ${Array(data.length).fill(_mark)}`; if (table in primaryKeys) statement += ` ON DUPLICATE KEY UPDATE ${primaryKeys[table]}=${primaryKeys[table]}`; @@ -248,5 +259,6 @@ module.exports = { return masterWS; }, start: startSlaveProcess, - stop: stopSlaveProcess + stop: stopSlaveProcess, + syncRequest: ws => requestInstance.open(ws) } \ No newline at end of file