Restoration

This commit is contained in:
sairajzero 2021-12-30 06:33:10 +05:30
parent 4b9a5e656b
commit c4732f5831
2 changed files with 113 additions and 100 deletions

View File

@ -1,44 +1,29 @@
'use strict';
const config = require('../../args/backup-config.json');
const floGlobals = require('../../public/floGlobals');
require('../set_globals');
require('../lib');
require('../floCrypto');
const WebSocket = require('ws');
const WAIT_TIME = 1 * 60 * 1000,
BACKUP_INTERVAL = 1 * 60 * 1000;
const WAIT_TIME = 30 * 60 * 1000,
BACKUP_INTERVAL = 10 * 60 * 1000;
const myPrivKey = config.private_key,
myPubKey = floCrypto.getPubKeyHex(config.private_key),
myFloID = floCrypto.getFloID(config.private_key);
const Database = require("../database");
var DB; //Container for Database connection
var masterWS; //Container for Master websocket connection
function startBackupStorage() {
console.log("Logged in as", myFloID);
Database(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(db => {
DB = db;
IntervalFunction();
const BackupInterval = setInterval(IntervalFunction, BACKUP_INTERVAL);
console.log("Backup Storage Started");
})
}
var intervalID = null;
function IntervalFunction() {
IntervalFunction.count += 1;
console.log(Date.now().toString(), "Instance #" + IntervalFunction.count);
function startIntervalSync(ws) {
//set masterWS
ws.on('message', processDataFromMaster);
masterWS = ws;
//stop existing sync
stopIntervalSync();
//start sync
requestInstance.open();
intervalID = setInterval(requestInstance.open, BACKUP_INTERVAL);
}
IntervalFunction.count = 0;
function connectToWSServer(url) {
return new Promise((resolve, reject) => {
const ws = new WebSocket(url);
ws.on('open', _ => resolve(ws));
ws.on('error', _ => reject(error));
})
function stopIntervalSync() {
if (intervalID !== null) {
clearInterval(intervalID);
intervalID = null;
}
}
function requestBackupSync(ws) {
@ -91,20 +76,16 @@ requestInstance.open = function() {
else
return;
}
connectToWSServer(config["main_server_url"]).then(ws => {
requestBackupSync(ws).then(request => {
if (!masterWS)
return console.warn("Not connected to master");
requestBackupSync(masterWS).then(request => {
self.request = request;
self.ws = ws;
ws.on('message', processBackupData)
ws.onclose = _ => console.log("Connection was Interrupted");
}).catch(error => console.error(error))
self.ws = masterWS;
}).catch(error => console.error(error))
}
requestInstance.close = function() {
const self = this;
self.ws.onclose = () => null;
self.ws.close();
self.ws = null;
self.delete_sync = null;
self.add_sync = null;
@ -116,19 +97,26 @@ requestInstance.close = function() {
self.last_response_time = null;
}
function processBackupData(message) {
function processDataFromMaster(message) {
try {
message = JSON.parse(message);
console.debug(message);
if (message.mode.startsWith("SYNC"))
processBackupData(message);
} catch (error) {
console.error(error);
}
}
function processBackupData(response) {
const self = requestInstance;
self.last_response_time = Date.now();
try {
const response = JSON.parse(message);
console.debug(response);
if (response.error) {
switch (response.mode) {
case "SYNC_ERROR":
console.log(response.error);
self.close();
return;
}
switch (response.mode) {
case "END":
break;
case "SYNC_END":
if (response.status) {
if (self.total_add !== self.add_sync)
console.info(`Backup Sync Instance finished!, ${self.total_add - self.add_sync} packets not received.`);
@ -139,27 +127,24 @@ function processBackupData(message) {
console.info("Backup Sync was not successful! Failed info: ", response.info);
self.close();
break;
case "DELETE":
case "SYNC_DELETE":
self.delete_data = response.delete_data;
self.delete_sync += 1;
deleteData(response.delete_data);
break;
case "ADD_UPDATE_HEADER":
case "SYNC_ADD_UPDATE_HEADER":
self.add_data = response.add_data;
self.total_add = Object.keys(response.add_data).length;
break;
case "ADD_UPDATE":
case "SYNC_ADD_UPDATE":
self.add_sync += 1;
addUpdateData(response.table, response.data);
break;
case "ADD_IMMUTABLE":
case "SYNC_ADD_IMMUTABLE":
self.immutable_sync += 1;
addImmutableData(response.table, response.data);
break;
}
} catch (error) {
console.error(error);
}
}
function updateBackupTable(add_data, delete_data) {
@ -212,4 +197,10 @@ function addImmutableData(table, data) {
const validateValue = val => (typeof val === "string" && /\.\d{3}Z$/.test(val)) ? val.substring(0, val.length - 1) : val;
startBackupStorage();
module.exports = {
set DB(db) {
DB = db;
},
start: startIntervalSync,
stop: stopIntervalSync
}

View File

@ -1,7 +1,9 @@
'use strict';
const WebSocket = require('ws');
const shareThreshold = 70 / 100;
var DB; //Container for database
//Backup Transfer
function sendBackup(timestamp, ws) {
if (!timestamp) timestamp = 0;
else if (typeof timestamp === "string" && /\.\d{3}Z$/.test(timestamp))
@ -17,14 +19,14 @@ function sendBackup(timestamp, ws) {
if (failedSync.length) {
console.info("Backup Sync Failed:", failedSync);
ws.send(JSON.stringify({
mode: "END",
mode: "SYNC_END",
status: false,
info: failedSync
}));
} else {
console.info("Backup Sync completed");
ws.send(JSON.stringify({
mode: "END",
mode: "SYNC_END",
status: true
}));
}
@ -35,7 +37,7 @@ function send_deleteSync(timestamp, ws) {
return new Promise((resolve, reject) => {
DB.query("SELECT * FROM _backup WHERE mode is NULL AND timestamp > ?", [timestamp]).then(result => {
ws.send(JSON.stringify({
mode: "DELETE",
mode: "SYNC_DELETE",
delete_data: result
}));
resolve("deleteSync");
@ -52,7 +54,7 @@ function send_dataSync(timestamp, ws) {
.then(data => {
ws.send(JSON.stringify({
table,
mode: "ADD_UPDATE",
mode: "SYNC_ADD_UPDATE",
data
}));
res(table);
@ -66,7 +68,7 @@ function send_dataSync(timestamp, ws) {
let sync_needed = {};
result.forEach(r => r.t_name in sync_needed ? sync_needed[r.t_name].push(r.id) : sync_needed[r.t_name] = [r.id]);
ws.send(JSON.stringify({
mode: "ADD_UPDATE_HEADER",
mode: "SYNC_ADD_UPDATE_HEADER",
add_data: result
}));
let promises = [];
@ -99,7 +101,7 @@ function send_dataImmutable(timestamp, ws) {
.then(data => {
ws.send(JSON.stringify({
table,
mode: "ADD_IMMUTABLE",
mode: "SYNC_ADD_IMMUTABLE",
data
}));
res(table);
@ -124,15 +126,28 @@ function send_dataImmutable(timestamp, ws) {
})
}
function startBackupTransmitter(db, port, backupIDs) {
DB = db;
this.port = port;
this.backupIDs = backupIDs;
//Shares
function generateNewSink() {
let sink = floCrypto.generateNewID();
let nextNodes = KB.nextNode(myFloID, null);
let shares = floCrypto.createShamirsSecretShares(sink.privKey, nextNodes.length, Math.ceil(nextNodes.length * shareThreshold));
sink.shares = {};
for (let i in nextNodes)
sink.shares[nextNodes[i]] = shares[i];
return sink;
}
const wss = this.wss = new WebSocket.Server({
port: port
});
this.close = () => wss.close();
function sendShare(ws, sinkID, share) {
ws.send(JSON.stringify({
command: "SINK_SHARE",
sinkID,
keyShare
}));
}
//Transmistter
var nodeList; //Container for (backup) node list
function startBackupTransmitter(wss) {
wss.on('connection', ws => {
ws.on('message', message => {
//verify if from a backup node
@ -158,13 +173,20 @@ function startBackupTransmitter(db, port, backupIDs) {
} catch (error) {
console.error(error);
ws.send(JSON.stringify({
mode: "SYNC_ERROR",
error: 'Unable to process the request!'
}));
}
});
});
console.log("Backup Transmitter running in port", port);
}
module.exports = startBackupTransmitter;
module.exports = {
init: startBackupTransmitter,
set nodeList(ids) {
nodeList = ids;
},
set DB(db) {
DB = db;
}
};