diff --git a/src/backup/storage.js b/src/backup/storage.js index b17bfcd..9793512 100644 --- a/src/backup/storage.js +++ b/src/backup/storage.js @@ -1,44 +1,29 @@ 'use strict'; -const config = require('../../args/backup-config.json'); -const floGlobals = require('../../public/floGlobals'); -require('../set_globals'); -require('../lib'); -require('../floCrypto'); -const WebSocket = require('ws'); -const WAIT_TIME = 1 * 60 * 1000, - BACKUP_INTERVAL = 1 * 60 * 1000; +const WAIT_TIME = 30 * 60 * 1000, + BACKUP_INTERVAL = 10 * 60 * 1000; -const myPrivKey = config.private_key, - myPubKey = floCrypto.getPubKeyHex(config.private_key), - myFloID = floCrypto.getFloID(config.private_key); - -const Database = require("../database"); var DB; //Container for Database connection +var masterWS; //Container for Master websocket connection -function startBackupStorage() { - console.log("Logged in as", myFloID); - Database(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(db => { - DB = db; - IntervalFunction(); - const BackupInterval = setInterval(IntervalFunction, BACKUP_INTERVAL); - console.log("Backup Storage Started"); - }) -} +var intervalID = null; -function IntervalFunction() { - IntervalFunction.count += 1; - console.log(Date.now().toString(), "Instance #" + IntervalFunction.count); +function startIntervalSync(ws) { + //set masterWS + ws.on('message', processDataFromMaster); + masterWS = ws; + //stop existing sync + stopIntervalSync(); + //start sync requestInstance.open(); + intervalID = setInterval(requestInstance.open, BACKUP_INTERVAL); } -IntervalFunction.count = 0; -function connectToWSServer(url) { - return new Promise((resolve, reject) => { - const ws = new WebSocket(url); - ws.on('open', _ => resolve(ws)); - ws.on('error', _ => reject(error)); - }) +function stopIntervalSync() { + if (intervalID !== null) { + clearInterval(intervalID); + intervalID = null; + } } function requestBackupSync(ws) { @@ -91,20 +76,16 @@ requestInstance.open = function() { else return; } - connectToWSServer(config["main_server_url"]).then(ws => { - requestBackupSync(ws).then(request => { - self.request = request; - self.ws = ws; - ws.on('message', processBackupData) - ws.onclose = _ => console.log("Connection was Interrupted"); - }).catch(error => console.error(error)) + if (!masterWS) + return console.warn("Not connected to master"); + requestBackupSync(masterWS).then(request => { + self.request = request; + self.ws = masterWS; }).catch(error => console.error(error)) } requestInstance.close = function() { const self = this; - self.ws.onclose = () => null; - self.ws.close(); self.ws = null; self.delete_sync = null; self.add_sync = null; @@ -116,52 +97,56 @@ requestInstance.close = function() { self.last_response_time = null; } -function processBackupData(message) { - const self = requestInstance; - self.last_response_time = Date.now(); +function processDataFromMaster(message) { try { - const response = JSON.parse(message); - console.debug(response); - if (response.error) { - console.log(response.error); - self.close(); - return; - } - switch (response.mode) { - case "END": - if (response.status) { - if (self.total_add !== self.add_sync) - console.info(`Backup Sync Instance finished!, ${self.total_add - self.add_sync} packets not received.`); - else - console.info("Backup Sync Instance finished successfully"); - updateBackupTable(self.add_data, self.delete_data) - } else - console.info("Backup Sync was not successful! Failed info: ", response.info); - self.close(); - break; - case "DELETE": - self.delete_data = response.delete_data; - self.delete_sync += 1; - deleteData(response.delete_data); - break; - case "ADD_UPDATE_HEADER": - self.add_data = response.add_data; - self.total_add = Object.keys(response.add_data).length; - break; - case "ADD_UPDATE": - self.add_sync += 1; - addUpdateData(response.table, response.data); - break; - case "ADD_IMMUTABLE": - self.immutable_sync += 1; - addImmutableData(response.table, response.data); - break; - } + message = JSON.parse(message); + console.debug(message); + if (message.mode.startsWith("SYNC")) + processBackupData(message); } catch (error) { console.error(error); } } +function processBackupData(response) { + const self = requestInstance; + self.last_response_time = Date.now(); + switch (response.mode) { + case "SYNC_ERROR": + console.log(response.error); + self.close(); + break; + case "SYNC_END": + if (response.status) { + if (self.total_add !== self.add_sync) + console.info(`Backup Sync Instance finished!, ${self.total_add - self.add_sync} packets not received.`); + else + console.info("Backup Sync Instance finished successfully"); + updateBackupTable(self.add_data, self.delete_data) + } else + console.info("Backup Sync was not successful! Failed info: ", response.info); + self.close(); + break; + case "SYNC_DELETE": + self.delete_data = response.delete_data; + self.delete_sync += 1; + deleteData(response.delete_data); + break; + case "SYNC_ADD_UPDATE_HEADER": + self.add_data = response.add_data; + self.total_add = Object.keys(response.add_data).length; + break; + case "SYNC_ADD_UPDATE": + self.add_sync += 1; + addUpdateData(response.table, response.data); + break; + case "SYNC_ADD_IMMUTABLE": + self.immutable_sync += 1; + addImmutableData(response.table, response.data); + break; + } +} + function updateBackupTable(add_data, delete_data) { //update _backup table for added data DB.transaction(add_data.map(r => [ @@ -212,4 +197,10 @@ function addImmutableData(table, data) { const validateValue = val => (typeof val === "string" && /\.\d{3}Z$/.test(val)) ? val.substring(0, val.length - 1) : val; -startBackupStorage(); \ No newline at end of file +module.exports = { + set DB(db) { + DB = db; + }, + start: startIntervalSync, + stop: stopIntervalSync +} \ No newline at end of file diff --git a/src/backup/transmit.js b/src/backup/transmit.js index c91b987..1b03cdd 100644 --- a/src/backup/transmit.js +++ b/src/backup/transmit.js @@ -1,7 +1,9 @@ 'use strict'; -const WebSocket = require('ws'); +const shareThreshold = 70 / 100; var DB; //Container for database + +//Backup Transfer function sendBackup(timestamp, ws) { if (!timestamp) timestamp = 0; else if (typeof timestamp === "string" && /\.\d{3}Z$/.test(timestamp)) @@ -17,14 +19,14 @@ function sendBackup(timestamp, ws) { if (failedSync.length) { console.info("Backup Sync Failed:", failedSync); ws.send(JSON.stringify({ - mode: "END", + mode: "SYNC_END", status: false, info: failedSync })); } else { console.info("Backup Sync completed"); ws.send(JSON.stringify({ - mode: "END", + mode: "SYNC_END", status: true })); } @@ -35,7 +37,7 @@ function send_deleteSync(timestamp, ws) { return new Promise((resolve, reject) => { DB.query("SELECT * FROM _backup WHERE mode is NULL AND timestamp > ?", [timestamp]).then(result => { ws.send(JSON.stringify({ - mode: "DELETE", + mode: "SYNC_DELETE", delete_data: result })); resolve("deleteSync"); @@ -52,7 +54,7 @@ function send_dataSync(timestamp, ws) { .then(data => { ws.send(JSON.stringify({ table, - mode: "ADD_UPDATE", + mode: "SYNC_ADD_UPDATE", data })); res(table); @@ -66,7 +68,7 @@ function send_dataSync(timestamp, ws) { let sync_needed = {}; result.forEach(r => r.t_name in sync_needed ? sync_needed[r.t_name].push(r.id) : sync_needed[r.t_name] = [r.id]); ws.send(JSON.stringify({ - mode: "ADD_UPDATE_HEADER", + mode: "SYNC_ADD_UPDATE_HEADER", add_data: result })); let promises = []; @@ -99,7 +101,7 @@ function send_dataImmutable(timestamp, ws) { .then(data => { ws.send(JSON.stringify({ table, - mode: "ADD_IMMUTABLE", + mode: "SYNC_ADD_IMMUTABLE", data })); res(table); @@ -124,15 +126,28 @@ function send_dataImmutable(timestamp, ws) { }) } -function startBackupTransmitter(db, port, backupIDs) { - DB = db; - this.port = port; - this.backupIDs = backupIDs; +//Shares +function generateNewSink() { + let sink = floCrypto.generateNewID(); + let nextNodes = KB.nextNode(myFloID, null); + let shares = floCrypto.createShamirsSecretShares(sink.privKey, nextNodes.length, Math.ceil(nextNodes.length * shareThreshold)); + sink.shares = {}; + for (let i in nextNodes) + sink.shares[nextNodes[i]] = shares[i]; + return sink; +} - const wss = this.wss = new WebSocket.Server({ - port: port - }); - this.close = () => wss.close(); +function sendShare(ws, sinkID, share) { + ws.send(JSON.stringify({ + command: "SINK_SHARE", + sinkID, + keyShare + })); +} + +//Transmistter +var nodeList; //Container for (backup) node list +function startBackupTransmitter(wss) { wss.on('connection', ws => { ws.on('message', message => { //verify if from a backup node @@ -158,13 +173,20 @@ function startBackupTransmitter(db, port, backupIDs) { } catch (error) { console.error(error); ws.send(JSON.stringify({ + mode: "SYNC_ERROR", error: 'Unable to process the request!' })); } }); }); - - console.log("Backup Transmitter running in port", port); } -module.exports = startBackupTransmitter; \ No newline at end of file +module.exports = { + init: startBackupTransmitter, + set nodeList(ids) { + nodeList = ids; + }, + set DB(db) { + DB = db; + } +}; \ No newline at end of file