From 4b9a5e656bf2fcde54c0d38d57456dc12a7bc878 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Thu, 30 Dec 2021 06:30:06 +0530 Subject: [PATCH 1/9] Create KBucket.js --- src/backup/KBucket.js | 460 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 460 insertions(+) create mode 100644 src/backup/KBucket.js diff --git a/src/backup/KBucket.js b/src/backup/KBucket.js new file mode 100644 index 0000000..51932b7 --- /dev/null +++ b/src/backup/KBucket.js @@ -0,0 +1,460 @@ +'use strict'; + +/*Kademlia DHT K-bucket implementation as a binary tree.*/ +/** + * Implementation of a Kademlia DHT k-bucket used for storing + * contact (peer node) information. + * + * @extends EventEmitter + */ +function BuildKBucket(options = {}) { + /** + * `options`: + * `distance`: Function + * `function (firstId, secondId) { return distance }` An optional + * `distance` function that gets two `id` Uint8Arrays + * and return distance (as number) between them. + * `arbiter`: Function (Default: vectorClock arbiter) + * `function (incumbent, candidate) { return contact; }` An optional + * `arbiter` function that givent two `contact` objects with the same `id` + * returns the desired object to be used for updating the k-bucket. For + * more details, see [arbiter function](#arbiter-function). + * `localNodeId`: Uint8Array An optional Uint8Array representing the local node id. + * If not provided, a local node id will be created via `randomBytes(20)`. + * `metadata`: Object (Default: {}) Optional satellite data to include + * with the k-bucket. `metadata` property is guaranteed not be altered by, + * it is provided as an explicit container for users of k-bucket to store + * implementation-specific data. + * `numberOfNodesPerKBucket`: Integer (Default: 20) The number of nodes + * that a k-bucket can contain before being full or split. + * `numberOfNodesToPing`: Integer (Default: 3) The number of nodes to + * ping when a bucket that should not be split becomes full. KBucket will + * emit a `ping` event that contains `numberOfNodesToPing` nodes that have + * not been contacted the longest. + * + * @param {Object=} options optional + */ + + this.localNodeId = options.localNodeId || window.crypto.getRandomValues(new Uint8Array(20)) + this.numberOfNodesPerKBucket = options.numberOfNodesPerKBucket || 20 + this.numberOfNodesToPing = options.numberOfNodesToPing || 3 + this.distance = options.distance || this.distance + // use an arbiter from options or vectorClock arbiter by default + this.arbiter = options.arbiter || this.arbiter + this.metadata = Object.assign({}, options.metadata) + + this.createNode = function() { + return { + contacts: [], + dontSplit: false, + left: null, + right: null + } + } + + this.ensureInt8 = function(name, val) { + if (!(val instanceof Uint8Array)) { + throw new TypeError(name + ' is not a Uint8Array') + } + } + + /** + * @param {Uint8Array} array1 + * @param {Uint8Array} array2 + * @return {Boolean} + */ + this.arrayEquals = function(array1, array2) { + if (array1 === array2) { + return true + } + if (array1.length !== array2.length) { + return false + } + for (let i = 0, length = array1.length; i < length; ++i) { + if (array1[i] !== array2[i]) { + return false + } + } + return true + } + + this.ensureInt8('option.localNodeId as parameter 1', this.localNodeId) + this.root = this.createNode() + + /** + * Default arbiter function for contacts with the same id. Uses + * contact.vectorClock to select which contact to update the k-bucket with. + * Contact with larger vectorClock field will be selected. If vectorClock is + * the same, candidat will be selected. + * + * @param {Object} incumbent Contact currently stored in the k-bucket. + * @param {Object} candidate Contact being added to the k-bucket. + * @return {Object} Contact to updated the k-bucket with. + */ + this.arbiter = function(incumbent, candidate) { + return incumbent.vectorClock > candidate.vectorClock ? incumbent : candidate + } + + /** + * Default distance function. Finds the XOR + * distance between firstId and secondId. + * + * @param {Uint8Array} firstId Uint8Array containing first id. + * @param {Uint8Array} secondId Uint8Array containing second id. + * @return {Number} Integer The XOR distance between firstId + * and secondId. + */ + this.distance = function(firstId, secondId) { + let distance = 0 + let i = 0 + const min = Math.min(firstId.length, secondId.length) + const max = Math.max(firstId.length, secondId.length) + for (; i < min; ++i) { + distance = distance * 256 + (firstId[i] ^ secondId[i]) + } + for (; i < max; ++i) distance = distance * 256 + 255 + return distance + } + + /** + * Adds a contact to the k-bucket. + * + * @param {Object} contact the contact object to add + */ + this.add = function(contact) { + this.ensureInt8('contact.id', (contact || {}).id) + + let bitIndex = 0 + let node = this.root + + while (node.contacts === null) { + // this is not a leaf node but an inner node with 'low' and 'high' + // branches; we will check the appropriate bit of the identifier and + // delegate to the appropriate node for further processing + node = this._determineNode(node, contact.id, bitIndex++) + } + + // check if the contact already exists + const index = this._indexOf(node, contact.id) + if (index >= 0) { + this._update(node, index, contact) + return this + } + + if (node.contacts.length < this.numberOfNodesPerKBucket) { + node.contacts.push(contact) + return this + } + + // the bucket is full + if (node.dontSplit) { + // we are not allowed to split the bucket + // we need to ping the first this.numberOfNodesToPing + // in order to determine if they are alive + // only if one of the pinged nodes does not respond, can the new contact + // be added (this prevents DoS flodding with new invalid contacts) + return this + } + + this._split(node, bitIndex) + return this.add(contact) + } + + /** + * Get the n closest contacts to the provided node id. "Closest" here means: + * closest according to the XOR metric of the contact node id. + * + * @param {Uint8Array} id Contact node id + * @param {Number=} n Integer (Default: Infinity) The maximum number of + * closest contacts to return + * @return {Array} Array Maximum of n closest contacts to the node id + */ + this.closest = function(id, n = Infinity) { + this.ensureInt8('id', id) + + if ((!Number.isInteger(n) && n !== Infinity) || n <= 0) { + throw new TypeError('n is not positive number') + } + + let contacts = [] + + for (let nodes = [this.root], bitIndex = 0; nodes.length > 0 && contacts.length < n;) { + const node = nodes.pop() + if (node.contacts === null) { + const detNode = this._determineNode(node, id, bitIndex++) + nodes.push(node.left === detNode ? node.right : node.left) + nodes.push(detNode) + } else { + contacts = contacts.concat(node.contacts) + } + } + + return contacts + .map(a => [this.distance(a.id, id), a]) + .sort((a, b) => a[0] - b[0]) + .slice(0, n) + .map(a => a[1]) + } + + /** + * Counts the total number of contacts in the tree. + * + * @return {Number} The number of contacts held in the tree + */ + this.count = function() { + // return this.toArray().length + let count = 0 + for (const nodes = [this.root]; nodes.length > 0;) { + const node = nodes.pop() + if (node.contacts === null) nodes.push(node.right, node.left) + else count += node.contacts.length + } + return count + } + + /** + * Determines whether the id at the bitIndex is 0 or 1. + * Return left leaf if `id` at `bitIndex` is 0, right leaf otherwise + * + * @param {Object} node internal object that has 2 leafs: left and right + * @param {Uint8Array} id Id to compare localNodeId with. + * @param {Number} bitIndex Integer (Default: 0) The bit index to which bit + * to check in the id Uint8Array. + * @return {Object} left leaf if id at bitIndex is 0, right leaf otherwise. + */ + this._determineNode = function(node, id, bitIndex) { + // *NOTE* remember that id is a Uint8Array and has granularity of + // bytes (8 bits), whereas the bitIndex is the bit index (not byte) + + // id's that are too short are put in low bucket (1 byte = 8 bits) + // (bitIndex >> 3) finds how many bytes the bitIndex describes + // bitIndex % 8 checks if we have extra bits beyond byte multiples + // if number of bytes is <= no. of bytes described by bitIndex and there + // are extra bits to consider, this means id has less bits than what + // bitIndex describes, id therefore is too short, and will be put in low + // bucket + const bytesDescribedByBitIndex = bitIndex >> 3 + const bitIndexWithinByte = bitIndex % 8 + if ((id.length <= bytesDescribedByBitIndex) && (bitIndexWithinByte !== 0)) { + return node.left + } + + const byteUnderConsideration = id[bytesDescribedByBitIndex] + + // byteUnderConsideration is an integer from 0 to 255 represented by 8 bits + // where 255 is 11111111 and 0 is 00000000 + // in order to find out whether the bit at bitIndexWithinByte is set + // we construct (1 << (7 - bitIndexWithinByte)) which will consist + // of all bits being 0, with only one bit set to 1 + // for example, if bitIndexWithinByte is 3, we will construct 00010000 by + // (1 << (7 - 3)) -> (1 << 4) -> 16 + if (byteUnderConsideration & (1 << (7 - bitIndexWithinByte))) { + return node.right + } + + return node.left + } + + /** + * Get a contact by its exact ID. + * If this is a leaf, loop through the bucket contents and return the correct + * contact if we have it or null if not. If this is an inner node, determine + * which branch of the tree to traverse and repeat. + * + * @param {Uint8Array} id The ID of the contact to fetch. + * @return {Object|Null} The contact if available, otherwise null + */ + this.get = function(id) { + this.ensureInt8('id', id) + + let bitIndex = 0 + + let node = this.root + while (node.contacts === null) { + node = this._determineNode(node, id, bitIndex++) + } + + // index of uses contact id for matching + const index = this._indexOf(node, id) + return index >= 0 ? node.contacts[index] : null + } + + /** + * Returns the index of the contact with provided + * id if it exists, returns -1 otherwise. + * + * @param {Object} node internal object that has 2 leafs: left and right + * @param {Uint8Array} id Contact node id. + * @return {Number} Integer Index of contact with provided id if it + * exists, -1 otherwise. + */ + this._indexOf = function(node, id) { + for (let i = 0; i < node.contacts.length; ++i) { + if (this.arrayEquals(node.contacts[i].id, id)) return i + } + + return -1 + } + + /** + * Removes contact with the provided id. + * + * @param {Uint8Array} id The ID of the contact to remove. + * @return {Object} The k-bucket itself. + */ + this.remove = function(id) { + this.ensureInt8('the id as parameter 1', id) + + let bitIndex = 0 + let node = this.root + + while (node.contacts === null) { + node = this._determineNode(node, id, bitIndex++) + } + + const index = this._indexOf(node, id) + if (index >= 0) { + const contact = node.contacts.splice(index, 1)[0] + } + + return this + } + + /** + * Splits the node, redistributes contacts to the new nodes, and marks the + * node that was split as an inner node of the binary tree of nodes by + * setting this.root.contacts = null + * + * @param {Object} node node for splitting + * @param {Number} bitIndex the bitIndex to which byte to check in the + * Uint8Array for navigating the binary tree + */ + this._split = function(node, bitIndex) { + node.left = this.createNode() + node.right = this.createNode() + + // redistribute existing contacts amongst the two newly created nodes + for (const contact of node.contacts) { + this._determineNode(node, contact.id, bitIndex).contacts.push(contact) + } + + node.contacts = null // mark as inner tree node + + // don't split the "far away" node + // we check where the local node would end up and mark the other one as + // "dontSplit" (i.e. "far away") + const detNode = this._determineNode(node, this.localNodeId, bitIndex) + const otherNode = node.left === detNode ? node.right : node.left + otherNode.dontSplit = true + } + + /** + * Returns all the contacts contained in the tree as an array. + * If this is a leaf, return a copy of the bucket. `slice` is used so that we + * don't accidentally leak an internal reference out that might be + * accidentally misused. If this is not a leaf, return the union of the low + * and high branches (themselves also as arrays). + * + * @return {Array} All of the contacts in the tree, as an array + */ + this.toArray = function() { + let result = [] + for (const nodes = [this.root]; nodes.length > 0;) { + const node = nodes.pop() + if (node.contacts === null) nodes.push(node.right, node.left) + else result = result.concat(node.contacts) + } + return result + } + + /** + * Updates the contact selected by the arbiter. + * If the selection is our old contact and the candidate is some new contact + * then the new contact is abandoned (not added). + * If the selection is our old contact and the candidate is our old contact + * then we are refreshing the contact and it is marked as most recently + * contacted (by being moved to the right/end of the bucket array). + * If the selection is our new contact, the old contact is removed and the new + * contact is marked as most recently contacted. + * + * @param {Object} node internal object that has 2 leafs: left and right + * @param {Number} index the index in the bucket where contact exists + * (index has already been computed in a previous + * calculation) + * @param {Object} contact The contact object to update. + */ + this._update = function(node, index, contact) { + // sanity check + if (!this.arrayEquals(node.contacts[index].id, contact.id)) { + throw new Error('wrong index for _update') + } + + const incumbent = node.contacts[index] + const selection = this.arbiter(incumbent, contact) + // if the selection is our old contact and the candidate is some new + // contact, then there is nothing to do + if (selection === incumbent && incumbent !== contact) return + + node.contacts.splice(index, 1) // remove old contact + node.contacts.push(selection) // add more recent contact version + + } +} + +module.exports = function K_Bucket(masterID, backupList) { + const decodeID = function(floID) { + let k = bitjs.Base58.decode(floID); + k.shift(); + k.splice(-4, 4); + const decodedId = Crypto.util.bytesToHex(k); + const nodeIdBigInt = new BigInteger(decodedId, 16); + const nodeIdBytes = nodeIdBigInt.toByteArrayUnsigned(); + const nodeIdNewInt8Array = new Uint8Array(nodeIdBytes); + return nodeIdNewInt8Array; + }; + const _KB = new BuildKBucket({ + localNodeId: decodeID(masterID) + }); + backupList.forEach(id => _KB.add({ + id: decodeID(id), + floID: id + })); + const orderedList = backupList.map(sn => [_KB.distance(decodeID(masterID), decodeID(sn)), sn]) + .sort((a, b) => a[0] - b[0]) + .map(a => a[1]); + const self = this; + + Object.defineProperty(self, 'order', { + get: () => Array.from(orderedList) + }); + + self.closestNode = function(id, N = 1) { + let decodedId = decodeID(id); + let n = N || orderedList.length; + let cNodes = _KB.closest(decodedId, n) + .map(k => k.floID); + return (N == 1 ? cNodes[0] : cNodes); + }; + + self.isBefore = (source, target) => orderedList.indexOf(target) < orderedList.indexOf(source); + self.isAfter = (source, target) => orderedList.indexOf(target) > orderedList.indexOf(source); + self.isPrev = (source, target) => orderedList.indexOf(target) === orderedList.indexOf(source) - 1; + self.isNext = (source, target) => orderedList.indexOf(target) === orderedList.indexOf(source) + 1; + + self.prevNode = function(id, N = 1) { + let n = N || orderedList.length; + if (!orderedList.includes(id)) + throw Error(`${id} is not in KB list`); + let pNodes = orderedList.slice(0, orderedList.indexOf(id)).slice(-n); + return (N == 1 ? pNodes[0] : pNodes); + }; + + self.nextNode = function(id, N = 1) { + let n = N || orderedList.length; + if (!orderedList.includes(id)) + throw Error(`${id} is not in KB list`); + let nNodes = orderedList.slice(orderedList.indexOf(id) + 1).slice(0, n); + return (N == 1 ? nNodes[0] : nNodes); + }; + +} \ No newline at end of file From c4732f5831801b431a2b1eada371fef966b3a6c3 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Thu, 30 Dec 2021 06:33:10 +0530 Subject: [PATCH 2/9] Restoration --- src/backup/storage.js | 155 +++++++++++++++++++---------------------- src/backup/transmit.js | 58 ++++++++++----- 2 files changed, 113 insertions(+), 100 deletions(-) diff --git a/src/backup/storage.js b/src/backup/storage.js index b17bfcd..9793512 100644 --- a/src/backup/storage.js +++ b/src/backup/storage.js @@ -1,44 +1,29 @@ 'use strict'; -const config = require('../../args/backup-config.json'); -const floGlobals = require('../../public/floGlobals'); -require('../set_globals'); -require('../lib'); -require('../floCrypto'); -const WebSocket = require('ws'); -const WAIT_TIME = 1 * 60 * 1000, - BACKUP_INTERVAL = 1 * 60 * 1000; +const WAIT_TIME = 30 * 60 * 1000, + BACKUP_INTERVAL = 10 * 60 * 1000; -const myPrivKey = config.private_key, - myPubKey = floCrypto.getPubKeyHex(config.private_key), - myFloID = floCrypto.getFloID(config.private_key); - -const Database = require("../database"); var DB; //Container for Database connection +var masterWS; //Container for Master websocket connection -function startBackupStorage() { - console.log("Logged in as", myFloID); - Database(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(db => { - DB = db; - IntervalFunction(); - const BackupInterval = setInterval(IntervalFunction, BACKUP_INTERVAL); - console.log("Backup Storage Started"); - }) -} +var intervalID = null; -function IntervalFunction() { - IntervalFunction.count += 1; - console.log(Date.now().toString(), "Instance #" + IntervalFunction.count); +function startIntervalSync(ws) { + //set masterWS + ws.on('message', processDataFromMaster); + masterWS = ws; + //stop existing sync + stopIntervalSync(); + //start sync requestInstance.open(); + intervalID = setInterval(requestInstance.open, BACKUP_INTERVAL); } -IntervalFunction.count = 0; -function connectToWSServer(url) { - return new Promise((resolve, reject) => { - const ws = new WebSocket(url); - ws.on('open', _ => resolve(ws)); - ws.on('error', _ => reject(error)); - }) +function stopIntervalSync() { + if (intervalID !== null) { + clearInterval(intervalID); + intervalID = null; + } } function requestBackupSync(ws) { @@ -91,20 +76,16 @@ requestInstance.open = function() { else return; } - connectToWSServer(config["main_server_url"]).then(ws => { - requestBackupSync(ws).then(request => { - self.request = request; - self.ws = ws; - ws.on('message', processBackupData) - ws.onclose = _ => console.log("Connection was Interrupted"); - }).catch(error => console.error(error)) + if (!masterWS) + return console.warn("Not connected to master"); + requestBackupSync(masterWS).then(request => { + self.request = request; + self.ws = masterWS; }).catch(error => console.error(error)) } requestInstance.close = function() { const self = this; - self.ws.onclose = () => null; - self.ws.close(); self.ws = null; self.delete_sync = null; self.add_sync = null; @@ -116,52 +97,56 @@ requestInstance.close = function() { self.last_response_time = null; } -function processBackupData(message) { - const self = requestInstance; - self.last_response_time = Date.now(); +function processDataFromMaster(message) { try { - const response = JSON.parse(message); - console.debug(response); - if (response.error) { - console.log(response.error); - self.close(); - return; - } - switch (response.mode) { - case "END": - if (response.status) { - if (self.total_add !== self.add_sync) - console.info(`Backup Sync Instance finished!, ${self.total_add - self.add_sync} packets not received.`); - else - console.info("Backup Sync Instance finished successfully"); - updateBackupTable(self.add_data, self.delete_data) - } else - console.info("Backup Sync was not successful! Failed info: ", response.info); - self.close(); - break; - case "DELETE": - self.delete_data = response.delete_data; - self.delete_sync += 1; - deleteData(response.delete_data); - break; - case "ADD_UPDATE_HEADER": - self.add_data = response.add_data; - self.total_add = Object.keys(response.add_data).length; - break; - case "ADD_UPDATE": - self.add_sync += 1; - addUpdateData(response.table, response.data); - break; - case "ADD_IMMUTABLE": - self.immutable_sync += 1; - addImmutableData(response.table, response.data); - break; - } + message = JSON.parse(message); + console.debug(message); + if (message.mode.startsWith("SYNC")) + processBackupData(message); } catch (error) { console.error(error); } } +function processBackupData(response) { + const self = requestInstance; + self.last_response_time = Date.now(); + switch (response.mode) { + case "SYNC_ERROR": + console.log(response.error); + self.close(); + break; + case "SYNC_END": + if (response.status) { + if (self.total_add !== self.add_sync) + console.info(`Backup Sync Instance finished!, ${self.total_add - self.add_sync} packets not received.`); + else + console.info("Backup Sync Instance finished successfully"); + updateBackupTable(self.add_data, self.delete_data) + } else + console.info("Backup Sync was not successful! Failed info: ", response.info); + self.close(); + break; + case "SYNC_DELETE": + self.delete_data = response.delete_data; + self.delete_sync += 1; + deleteData(response.delete_data); + break; + case "SYNC_ADD_UPDATE_HEADER": + self.add_data = response.add_data; + self.total_add = Object.keys(response.add_data).length; + break; + case "SYNC_ADD_UPDATE": + self.add_sync += 1; + addUpdateData(response.table, response.data); + break; + case "SYNC_ADD_IMMUTABLE": + self.immutable_sync += 1; + addImmutableData(response.table, response.data); + break; + } +} + function updateBackupTable(add_data, delete_data) { //update _backup table for added data DB.transaction(add_data.map(r => [ @@ -212,4 +197,10 @@ function addImmutableData(table, data) { const validateValue = val => (typeof val === "string" && /\.\d{3}Z$/.test(val)) ? val.substring(0, val.length - 1) : val; -startBackupStorage(); \ No newline at end of file +module.exports = { + set DB(db) { + DB = db; + }, + start: startIntervalSync, + stop: stopIntervalSync +} \ No newline at end of file diff --git a/src/backup/transmit.js b/src/backup/transmit.js index c91b987..1b03cdd 100644 --- a/src/backup/transmit.js +++ b/src/backup/transmit.js @@ -1,7 +1,9 @@ 'use strict'; -const WebSocket = require('ws'); +const shareThreshold = 70 / 100; var DB; //Container for database + +//Backup Transfer function sendBackup(timestamp, ws) { if (!timestamp) timestamp = 0; else if (typeof timestamp === "string" && /\.\d{3}Z$/.test(timestamp)) @@ -17,14 +19,14 @@ function sendBackup(timestamp, ws) { if (failedSync.length) { console.info("Backup Sync Failed:", failedSync); ws.send(JSON.stringify({ - mode: "END", + mode: "SYNC_END", status: false, info: failedSync })); } else { console.info("Backup Sync completed"); ws.send(JSON.stringify({ - mode: "END", + mode: "SYNC_END", status: true })); } @@ -35,7 +37,7 @@ function send_deleteSync(timestamp, ws) { return new Promise((resolve, reject) => { DB.query("SELECT * FROM _backup WHERE mode is NULL AND timestamp > ?", [timestamp]).then(result => { ws.send(JSON.stringify({ - mode: "DELETE", + mode: "SYNC_DELETE", delete_data: result })); resolve("deleteSync"); @@ -52,7 +54,7 @@ function send_dataSync(timestamp, ws) { .then(data => { ws.send(JSON.stringify({ table, - mode: "ADD_UPDATE", + mode: "SYNC_ADD_UPDATE", data })); res(table); @@ -66,7 +68,7 @@ function send_dataSync(timestamp, ws) { let sync_needed = {}; result.forEach(r => r.t_name in sync_needed ? sync_needed[r.t_name].push(r.id) : sync_needed[r.t_name] = [r.id]); ws.send(JSON.stringify({ - mode: "ADD_UPDATE_HEADER", + mode: "SYNC_ADD_UPDATE_HEADER", add_data: result })); let promises = []; @@ -99,7 +101,7 @@ function send_dataImmutable(timestamp, ws) { .then(data => { ws.send(JSON.stringify({ table, - mode: "ADD_IMMUTABLE", + mode: "SYNC_ADD_IMMUTABLE", data })); res(table); @@ -124,15 +126,28 @@ function send_dataImmutable(timestamp, ws) { }) } -function startBackupTransmitter(db, port, backupIDs) { - DB = db; - this.port = port; - this.backupIDs = backupIDs; +//Shares +function generateNewSink() { + let sink = floCrypto.generateNewID(); + let nextNodes = KB.nextNode(myFloID, null); + let shares = floCrypto.createShamirsSecretShares(sink.privKey, nextNodes.length, Math.ceil(nextNodes.length * shareThreshold)); + sink.shares = {}; + for (let i in nextNodes) + sink.shares[nextNodes[i]] = shares[i]; + return sink; +} - const wss = this.wss = new WebSocket.Server({ - port: port - }); - this.close = () => wss.close(); +function sendShare(ws, sinkID, share) { + ws.send(JSON.stringify({ + command: "SINK_SHARE", + sinkID, + keyShare + })); +} + +//Transmistter +var nodeList; //Container for (backup) node list +function startBackupTransmitter(wss) { wss.on('connection', ws => { ws.on('message', message => { //verify if from a backup node @@ -158,13 +173,20 @@ function startBackupTransmitter(db, port, backupIDs) { } catch (error) { console.error(error); ws.send(JSON.stringify({ + mode: "SYNC_ERROR", error: 'Unable to process the request!' })); } }); }); - - console.log("Backup Transmitter running in port", port); } -module.exports = startBackupTransmitter; \ No newline at end of file +module.exports = { + init: startBackupTransmitter, + set nodeList(ids) { + nodeList = ids; + }, + set DB(db) { + DB = db; + } +}; \ No newline at end of file From 0b70f099b073ecb9e1995c01b4b8f14f1e8258b7 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Thu, 30 Dec 2021 06:49:48 +0530 Subject: [PATCH 3/9] Update App Server - Use App server port for Websocket Server. - pause/resume serving and market when node acts as slave/master respectively. --- src/app.js | 71 ++++++++++++++++++++++++++++++++++++++++++++++---- src/market.js | 8 +++--- src/request.js | 21 +++++++++++++-- 3 files changed, 88 insertions(+), 12 deletions(-) diff --git a/src/app.js b/src/app.js index b97a756..ad37d27 100644 --- a/src/app.js +++ b/src/app.js @@ -3,11 +3,17 @@ const express = require('express'); const cookieParser = require("cookie-parser"); const sessions = require('express-session'); const Request = require('./request'); +const WebSocket = require('ws'); const REFRESH_INTERVAL = 5 * 1000; //10 * 60 * 1000; -module.exports = function App(secret, trustedIDs, DB) { +module.exports = function App(secret, DB) { + if (!(this instanceof App)) + return new App(secret, DB); + + var server = null, + wss = null; const app = express(); //session middleware app.use(sessions({ @@ -65,12 +71,67 @@ module.exports = function App(secret, trustedIDs, DB) { app.post('/withdraw-rupee', Request.WithdrawRupee); //Manage user tags (Access to trusted IDs only) - Request.trustedIDs = trustedIDs; + app.post('/add-tag', Request.addUserTag); app.post('/remove-tag', Request.removeUserTag); Request.DB = DB; - Request.periodicProcess(); - let refresher = setInterval(Request.periodicProcess, REFRESH_INTERVAL); - return app; + + //Properties + var periodInstance = null; + let self = this; + + //return server, express-app, wss + Object.defineProperty(self, "server", { + get: () => server + }); + Object.defineProperty(self, "express", { + get: () => app + }); + Object.defineProperty(self, "wss", { + get: () => wss + }); + + //set trustedID for subAdmin requests + Object.defineProperty(self, "trustedIDs", { + set: (ids) => Request.trustedIDs = ids + }); + + //Start (or) Stop servers + self.start = (port) => new Promise(resolve => { + server = app.listen(port, () => { + wss = new WebSocket.Server({ + server + }); + resolve(`Server Running at port ${port}`); + }); + }); + self.stop = () => new Promise(resolve => { + server.close(() => { + server = null; + wss = null; + resolve('Server stopped') + }); + }); + + //(Node is not master) Pause serving the clients + self.pause = () => { + Request.pause(); + if (periodInstance !== null) { + clearInterval(periodInstance); + periodInstance = null; + } + } + + //(Node is master) Resume serving the clients + self.resume = () => { + Request.resume(); + Request.periodicProcess(); + if (periodInstance === null) + periodInstance = setInterval(Request.periodicProcess, REFRESH_INTERVAL); + } + + Object.defineProperty(self, "periodInstance", { + get: () => periodInstance + }); } \ No newline at end of file diff --git a/src/market.js b/src/market.js index 3fe915d..fe3a60f 100644 --- a/src/market.js +++ b/src/market.js @@ -54,10 +54,6 @@ const tokenAPI = { } } -function returnRates() { - return coupling.price.currentRate; -} - function addSellOrder(floID, quantity, min_price) { return new Promise((resolve, reject) => { if (!floID || !floCrypto.validateAddr(floID)) @@ -505,7 +501,9 @@ function blockchainReCheck() { } module.exports = { - returnRates, + get rate() { + return coupling.price.currentRate; + }, addBuyOrder, addSellOrder, cancelOrder, diff --git a/src/request.js b/src/request.js index ccf0511..86f3336 100644 --- a/src/request.js +++ b/src/request.js @@ -21,8 +21,13 @@ INTERNAL.e_code = 500; const oneDay = 1000 * 60 * 60 * 24; const maxSessionTimeout = 60 * oneDay; +var serving; +const INVALID_SERVER_MSG = "Incorrect Server. Please connect to main server."; + function validateRequestFromFloID(request, sign, floID, proxy = true) { return new Promise((resolve, reject) => { + if (!serving) + return reject(INVALID(INVALID_SERVER_MSG)); DB.query("SELECT " + (proxy ? "proxyKey AS pubKey FROM Sessions" : "pubKey FROM Users") + " WHERE floID=?", [floID]).then(result => { if (result.length < 1) return reject(INVALID(proxy ? "Session not active" : "User not registered")); @@ -53,6 +58,8 @@ function storeRequest(floID, req_str, sign) { } function SignUp(req, res) { + if (!serving) + return res.status(INVALID.e_code).send(INVALID_SERVER_MSG); let data = req.body, session = req.session; if (floCrypto.getFloID(data.pubKey) !== data.floID) @@ -249,11 +256,15 @@ function ListTransactions(req, res) { } function getRate(req, res) { - let rate = market.returnRates(); - res.send(`${rate}`); + if (!serving) + res.status(INVALID.e_code).send(INVALID_SERVER_MSG); + else + res.send(`${market.rate}`); } function Account(req, res) { + if (!serving) + return res.status(INVALID.e_code).send(INVALID_SERVER_MSG); const setLogin = function(message) { let randID = floCrypto.randString(16, true); req.session.random = randID; @@ -506,5 +517,11 @@ module.exports = { set DB(db) { DB = db; market.DB = db; + }, + pause() { + serving = false; + }, + resume() { + serving = true; } }; \ No newline at end of file From db9ff02488d52777ff583cbba04f157b7ccd6236 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Thu, 30 Dec 2021 06:53:04 +0530 Subject: [PATCH 4/9] Update main.js - fetch data (nodeList, TagList, trustedIDs) from blockchain - Act as slave or master when needed --- src/main.js | 187 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 178 insertions(+), 9 deletions(-) diff --git a/src/main.js b/src/main.js index ba5005a..5166cf4 100644 --- a/src/main.js +++ b/src/main.js @@ -9,6 +9,173 @@ const Database = require("./database"); const App = require('./app'); const PORT = config['port']; +const K_Bucket = require('./backup/KBucket'); +const slave = require('./backup/storage'); +const transmit = require('./backup/transmit'); + +var DB, app; + +var nodeList, nodeURL, nodeKBucket; + +function refreshData(startup = false) { + return new Promise((resolve, reject) => { + refreshDataFromBlockchain().then(result => { + loadDataFromDB(result, startup) + .then(_ => resolve("Data refresh successful")) + .catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + +function refreshDataFromBlockchain() { + return new Promise((resolve, reject) => { + DB.query("SELECT num FROM lastTx WHERE floID=?", [floGlobals.adminID]).then(result => { + let lastTx = result.length ? result[0].num : 0; + floBlockchainAPI.readData(floGlobals.adminID, { + ignoreOld: lastTx, + sentOnly: true, + pattern: floGlobals.application + }).then(result => { + let promises = [], + nodes_change = false, + trusted_change = false; + result.data.reverse().forEach(data => { + var content = JSON.parse(data)[floGlobals.application]; + //Node List + if (content.Nodes) { + nodes_change = true; + if (content.Nodes.remove) + for (let n of content.Nodes.remove) + promises.push(DB.query("DELETE FROM nodeURL WHERE floID=?", [n])); + if (content.Nodes.add) + for (let n in content.Nodes.add) + promises.push(DB.query("INSERT INTO nodeURL (floID, url) VALUE (?,?) ON DUPLICATE KEY UPDATE url=NEW.url", [n, content.Nodes.add[n]])); + } + //Trusted List + if (content.Trusted) { + trusted_change = true; + if (content.Trusted.remove) + for (let id of content.Trusted.remove) + promises.push(DB.query("DELETE FROM trustedList WHERE floID=?", [id])); + if (content.Trusted.add) + for (let id of content.Trusted.add) + promises.push(DB.query("INSERT INTO trustedList (floID) VALUE (?) ON DUPLICATE KEY UPDATE floID=NEW.floID", [id])); + } + //Tag List with priority and API + if (content.Tag) { + if (content.Tag.remove) + for (let t of content.Tag.remove) + promises.push(DB.query("DELETE FROM TagList WHERE tag=?", [t])); + if (content.Tag.add) + for (let t in content.Tag.add) + promises.push(DB.query("INSERT INTO TagList (tag, sellPriority, buyPriority, api) VALUE (?,?,?,?)", [t, content.Tag.add[t].sellPriority, content.Tag.add[t].buyPriority, content.Tag.add[t].api])); + if (content.Tag.update) + for (let t in content.Tag.update) + for (let a in content.Tag.update[t]) + promises.push(`UPDATE TagList WHERE tag=? SET ${a}=?`, [t, content.Tag.update[t][a]]); + } + }); + promises.push(DB.query("INSERT INTO lastTx (floID, num) VALUE (?, ?) ON DUPLICATE KEY UPDATE num=NEW.num", [floGlobals.adminID, result.totalTxs])); + //Check if all save process were successful + Promise.allSettled(promises).then(results => { + if (results.reduce((a, r) => r.status === "rejected" ? ++a : a, 0)) + console.warn("Some data might not have been saved in database correctly"); + }); + resolve({ + nodes: nodes_change, + trusted: trusted_change + }); + }).catch(error => reject(error)); + }).catch(error => reject(error)) + }) +} + +function loadDataFromDB(changes, startup) { + return new Promise((resolve, reject) => { + let promises = []; + if (startup || changes.nodes) + promises.push(loadDataFromDB.nodeList()); + if (startup || changes.trusted) + promises.push(loadDataFromDB.trustedIDs); + Promise.all(promises) + .then(_ => resolve("Data load successful")) + .catch(error => reject(error)) + }) +} + +loadDataFromDB.nodeList = function() { + return new Promise((resolve, reject) => { + DB.query("SELECT * FROM nodeList").then(result => { + let nodes = {} + for (let i in result) + nodes[result[i].floID] = result[i]; + nodeURL = nodes; + nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL)); + nodeList = nodeKBucket.order; + //update dependents + transmit.nodeList = nodeList; + resolve(nodeList); + }).catch(error => reject(error)) + }) +} + +loadDataFromDB.trustedIDs = function() { + return new Promise((resolve, reject) => { + DB.query("SELECT * FROM trustedList").then(result => { + let trustedIDs = []; + for (let i in result) + trustedIDs.push(result[i].floID); + //update dependents + app.trustedIDs = trustedIDs; + resolve(trustedIDs); + }).catch(error => reject(error)) + }) +} + +function setDB(db) { + DB = db; + slave.DB = DB; + transmit.DB = DB; +} + +function connectWS(floID) { + let url = nodeURL[floID]; + return new Promise((resolve, reject) => { + const ws = new WebSocket(url); + ws.on('open', _ => resolve(ws)); + ws.on('error', _ => reject(error)); + }) +} + +function connectToMaster(i = 0) { + if (i >= nodeList.length) { + console.error("No master is found, and myFloID is not in list. This should not happen!"); + process.exit(1); + } + let floID = nodeList[i]; + if (floID === myFloID) + serveAsMaster(); + else + connectWS(floID).then(ws => { + ws.floID = floID; + ws.onclose = () => connectToMaster(i); + serveAsSlave(ws); + }).catch(error => { + console.log(`Node(${floID}) is offline`); + connectToMaster(i + 1) + }); +} + +function serveAsMaster() { + app.resume(); + slave.stop(); +} + +function serveAsSlave(masterWS) { + app.pause(); + slave.start(masterWS); +} + module.exports = function startServer(public_dir) { try { var _tmp = require('../args/keys.json'); @@ -35,13 +202,15 @@ module.exports = function startServer(public_dir) { global.PUBLIC_DIR = public_dir; console.debug(PUBLIC_DIR, global.myFloID); - Database(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(DB => { - const app = App(config['secret'], config['trusted-floIDs'], DB); - app.listen(PORT, () => console.log(`Server Running at port ${PORT}`)); - //start backup - if (config["backup-port"] && config["backup-floIDs"].length) { - var backupTransmitter = require('./backup/transmit'); - backupTransmitter = new backupTransmitter(DB, config["backup-port"], config["backup-floIDs"]); - } - }); + Database(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(db => { + setDB(db); + app = new App(config['secret'], DB); + refreshData(true).then(_ => { + app.start(PORT).then(result => { + console.log(result); + transmit.init(app.wss); + connectToMaster(); + }).catch(error => console.error(error)) + }).catch(error => console.error(error)) + }).catch(error => console.error(error)); }; \ No newline at end of file From 9fd1ae49c284d21b16907ddb3d132d5d727827d0 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 14 Jan 2022 04:07:51 +0530 Subject: [PATCH 5/9] move tokenAPI to tokenAPI.js --- src/market.js | 49 --------------------------------------------- src/tokenAPI.js | 53 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 49 deletions(-) create mode 100644 src/tokenAPI.js diff --git a/src/market.js b/src/market.js index fe3a60f..1458959 100644 --- a/src/market.js +++ b/src/market.js @@ -5,55 +5,6 @@ const MINIMUM_BUY_REQUIREMENT = 0.1; var DB; //container for database -const tokenAPI = { - fetch_api: function(apicall) { - return new Promise((resolve, reject) => { - console.log(floGlobals.tokenURL + apicall); - fetch(floGlobals.tokenURL + apicall).then(response => { - if (response.ok) - response.json().then(data => resolve(data)); - else - reject(response) - }).catch(error => reject(error)) - }) - }, - getBalance: function(floID, token = 'rupee') { - return new Promise((resolve, reject) => { - this.fetch_api(`api/v1.0/getFloAddressBalance?token=${token}&floAddress=${floID}`) - .then(result => resolve(result.balance || 0)) - .catch(error => reject(error)) - }) - }, - getTx: function(txID) { - return new Promise((resolve, reject) => { - this.fetch_api(`api/v1.0/getTransactionDetails/${txID}`).then(res => { - if (res.result === "error") - reject(res.description); - else if (!res.parsedFloData) - reject("Data piece (parsedFloData) missing"); - else if (!res.transactionDetails) - reject("Data piece (transactionDetails) missing"); - else - resolve(res); - }).catch(error => reject(error)) - }) - }, - sendToken: function(privKey, amount, message = "", receiverID = floGlobals.adminID, token = 'rupee') { - return new Promise((resolve, reject) => { - let senderID = floCrypto.getFloID(privKey); - if (typeof amount !== "number" || amount <= 0) - return reject("Invalid amount"); - this.getBalance(senderID, token).then(bal => { - if (amount > bal) - return reject("Insufficiant token balance"); - floBlockchainAPI.writeData(senderID, `send ${amount} ${token}# ${message}`, privKey, receiverID) - .then(txid => resolve(txid)) - .catch(error => reject(error)) - }).catch(error => reject(error)) - }); - } -} - function addSellOrder(floID, quantity, min_price) { return new Promise((resolve, reject) => { if (!floID || !floCrypto.validateAddr(floID)) diff --git a/src/tokenAPI.js b/src/tokenAPI.js new file mode 100644 index 0000000..be96954 --- /dev/null +++ b/src/tokenAPI.js @@ -0,0 +1,53 @@ +'use strict'; + +/* Token Operator to send/receive tokens from blockchain using API calls*/ +(function(GLOBAL) { + const tokenAPI = GLOBAL.tokenAPI = { + fetch_api: function(apicall) { + return new Promise((resolve, reject) => { + console.log(floGlobals.tokenURL + apicall); + fetch(floGlobals.tokenURL + apicall).then(response => { + if (response.ok) + response.json().then(data => resolve(data)); + else + reject(response) + }).catch(error => reject(error)) + }) + }, + getBalance: function(floID, token = floGlobals.token) { + return new Promise((resolve, reject) => { + this.fetch_api(`api/v1.0/getFloAddressBalance?token=${token}&floAddress=${floID}`) + .then(result => resolve(result.balance || 0)) + .catch(error => reject(error)) + }) + }, + getTx: function(txID) { + return new Promise((resolve, reject) => { + this.fetch_api(`api/v1.0/getTransactionDetails/${txID}`).then(res => { + if (res.result === "error") + reject(res.description); + else if (!res.parsedFloData) + reject("Data piece (parsedFloData) missing"); + else if (!res.transactionDetails) + reject("Data piece (transactionDetails) missing"); + else + resolve(res); + }).catch(error => reject(error)) + }) + }, + sendToken: function(privKey, amount, message = "", receiverID = floGlobals.adminID, token = floGlobals.token) { + return new Promise((resolve, reject) => { + let senderID = floCrypto.getFloID(privKey); + if (typeof amount !== "number" || amount <= 0) + return reject("Invalid amount"); + this.getBalance(senderID, token).then(bal => { + if (amount > bal) + return reject("Insufficiant token balance"); + floBlockchainAPI.writeData(senderID, `send ${amount} ${token}# ${message}`, privKey, receiverID) + .then(txid => resolve(txid)) + .catch(error => reject(error)) + }).catch(error => reject(error)) + }); + } + } +})(typeof global !== "undefined" ? global : window); \ No newline at end of file From 626c9ec21436bd73152665dbe14b190d8bcdcd31 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 14 Jan 2022 04:08:44 +0530 Subject: [PATCH 6/9] Backup transfer/retrieval process --- src/app.js | 11 +-- src/backup/storage.js | 68 +++++++++++--- src/backup/transmit.js | 205 +++++++++++++++++++++++++++++++++++++---- src/main.js | 44 +-------- 4 files changed, 247 insertions(+), 81 deletions(-) diff --git a/src/app.js b/src/app.js index ad37d27..9e216b4 100644 --- a/src/app.js +++ b/src/app.js @@ -81,16 +81,13 @@ module.exports = function App(secret, DB) { var periodInstance = null; let self = this; - //return server, express-app, wss + //return server, express-app Object.defineProperty(self, "server", { get: () => server }); Object.defineProperty(self, "express", { get: () => app }); - Object.defineProperty(self, "wss", { - get: () => wss - }); //set trustedID for subAdmin requests Object.defineProperty(self, "trustedIDs", { @@ -100,17 +97,13 @@ module.exports = function App(secret, DB) { //Start (or) Stop servers self.start = (port) => new Promise(resolve => { server = app.listen(port, () => { - wss = new WebSocket.Server({ - server - }); resolve(`Server Running at port ${port}`); }); }); self.stop = () => new Promise(resolve => { server.close(() => { server = null; - wss = null; - resolve('Server stopped') + resolve('Server stopped'); }); }); diff --git a/src/backup/storage.js b/src/backup/storage.js index 9793512..2210413 100644 --- a/src/backup/storage.js +++ b/src/backup/storage.js @@ -4,22 +4,36 @@ const WAIT_TIME = 30 * 60 * 1000, BACKUP_INTERVAL = 10 * 60 * 1000; var DB; //Container for Database connection -var masterWS; //Container for Master websocket connection +var masterWS = null; //Container for Master websocket connection var intervalID = null; -function startIntervalSync(ws) { +function startSlaveProcess(ws) { + if (!ws) throw Error("Master WS connection required"); + //stop existing process + stopSlaveProcess(); //set masterWS ws.on('message', processDataFromMaster); masterWS = ws; - //stop existing sync - stopIntervalSync(); + //inform master + let message = { + floID: global.floID, + pubKey: global.pubKey, + req_time: Date.now(), + type: "SLAVE_CONNECT" + } + message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey); //start sync requestInstance.open(); intervalID = setInterval(requestInstance.open, BACKUP_INTERVAL); } -function stopIntervalSync() { +function stopSlaveProcess() { + if (masterWS !== null) { + masterWS.onclose = () => null; + masterWS.close(); + masterWS = null; + } if (intervalID !== null) { clearInterval(intervalID); intervalID = null; @@ -40,13 +54,13 @@ function requestBackupSync(ws) { subs.push(`SELECT MAX(${tables[t]}) as ts FROM ${t}`); DB.query(`SELECT MAX(ts) as last_time FROM (${subs.join(' UNION ')}) AS Z`).then(result => { let request = { - floID: myFloID, + floID: global.myFloID, pubKey: myPubKey, type: "BACKUP_SYNC", last_time: result[0].last_time, req_time: Date.now() }; - request.sign = floCrypto.signData(request.type + "|" + request.req_time, myPrivKey); + request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey); console.debug("REQUEST: ", request); ws.send(JSON.stringify(request)); resolve(request); @@ -101,17 +115,46 @@ function processDataFromMaster(message) { try { message = JSON.parse(message); console.debug(message); - if (message.mode.startsWith("SYNC")) + if (message.command.startsWith("SYNC")) processBackupData(message); + else switch (message.command) { + case "SINK_SHARE": + storeSinkShare(message); + break; + case "SEND_SHARE": + sendSinkShare(); + break; + } } catch (error) { console.error(error); } } +function storeSinkShare(message) { + console.debug(Date.now(), '|sinkID:', message.sinkID, '|share:', message.keyShare); + DB.query("INSERT INTO sinkShares (floID, share) VALUE (?, ?)", [message.sinkID, message.keyShare]) + .then(_ => null).catch(error => console.error(error)); +} + +function sendSinkShare() { + DB.query("SELECT floID, share FROM sinkShares ORDER BY time_ DESC LIMIT 1").then(result => { + let response = { + type: "SINK_SHARE", + sinkID: result[0].floID, + share: result[0].share, + floID: global.myFloID, + pubKey: global.pubKey, + req_time: Date.now() + } + response.sign = floCrypto.signData(response.type + "|" + response.req_time, global.myPrivKey); //TODO: strengthen signature + masterWS.send(JSON.stringify(response)); + }).catch(error => console.error(error)); +} + function processBackupData(response) { const self = requestInstance; self.last_response_time = Date.now(); - switch (response.mode) { + switch (response.command) { case "SYNC_ERROR": console.log(response.error); self.close(); @@ -201,6 +244,9 @@ module.exports = { set DB(db) { DB = db; }, - start: startIntervalSync, - stop: stopIntervalSync + get masterWS() { + return masterWS; + }, + start: startSlaveProcess, + stop: stopSlaveProcess } \ No newline at end of file diff --git a/src/backup/transmit.js b/src/backup/transmit.js index 1b03cdd..d327b27 100644 --- a/src/backup/transmit.js +++ b/src/backup/transmit.js @@ -1,7 +1,15 @@ 'use strict'; -const shareThreshold = 70 / 100; +const slave = require('./storage'); +const WebSocket = require('ws'); +const shareThreshold = 50 / 100; -var DB; //Container for database +var DB, app, wss; //Container for database and app +var nodeShares = null, + nodeSinkID = null, + connectedSlaves = {}, + mod = null; +const SLAVE_MODE = 0, + MASTER_MODE = 1; //Backup Transfer function sendBackup(timestamp, ws) { @@ -19,14 +27,14 @@ function sendBackup(timestamp, ws) { if (failedSync.length) { console.info("Backup Sync Failed:", failedSync); ws.send(JSON.stringify({ - mode: "SYNC_END", + command: "SYNC_END", status: false, info: failedSync })); } else { console.info("Backup Sync completed"); ws.send(JSON.stringify({ - mode: "SYNC_END", + command: "SYNC_END", status: true })); } @@ -37,7 +45,7 @@ function send_deleteSync(timestamp, ws) { return new Promise((resolve, reject) => { DB.query("SELECT * FROM _backup WHERE mode is NULL AND timestamp > ?", [timestamp]).then(result => { ws.send(JSON.stringify({ - mode: "SYNC_DELETE", + command: "SYNC_DELETE", delete_data: result })); resolve("deleteSync"); @@ -54,7 +62,7 @@ function send_dataSync(timestamp, ws) { .then(data => { ws.send(JSON.stringify({ table, - mode: "SYNC_ADD_UPDATE", + command: "SYNC_ADD_UPDATE", data })); res(table); @@ -68,7 +76,7 @@ function send_dataSync(timestamp, ws) { let sync_needed = {}; result.forEach(r => r.t_name in sync_needed ? sync_needed[r.t_name].push(r.id) : sync_needed[r.t_name] = [r.id]); ws.send(JSON.stringify({ - mode: "SYNC_ADD_UPDATE_HEADER", + command: "SYNC_ADD_UPDATE_HEADER", add_data: result })); let promises = []; @@ -101,7 +109,7 @@ function send_dataImmutable(timestamp, ws) { .then(data => { ws.send(JSON.stringify({ table, - mode: "SYNC_ADD_IMMUTABLE", + command: "SYNC_ADD_IMMUTABLE", data })); res(table); @@ -129,7 +137,7 @@ function send_dataImmutable(timestamp, ws) { //Shares function generateNewSink() { let sink = floCrypto.generateNewID(); - let nextNodes = KB.nextNode(myFloID, null); + let nextNodes = KB.nextNode(global.myFloID, null); let shares = floCrypto.createShamirsSecretShares(sink.privKey, nextNodes.length, Math.ceil(nextNodes.length * shareThreshold)); sink.shares = {}; for (let i in nextNodes) @@ -137,7 +145,7 @@ function generateNewSink() { return sink; } -function sendShare(ws, sinkID, share) { +function sendShare(ws, sinkID, keyShare) { ws.send(JSON.stringify({ command: "SINK_SHARE", sinkID, @@ -145,9 +153,142 @@ function sendShare(ws, sinkID, share) { })); } +function sendSharesToNodes(sinkID, shares) { + nodeSinkID = sinkID; + nodeShares = shares; + for (let node in shares) + if (node in connectedSlaves) + sendShare(connectedSlaves[node], sinkID, shares[node]); +} + +function storeSink(sinkID, sinkPrivKey) { + global.sinkID = sinkID; + global.sinkPrivKey = sinkPrivKey; + let encryptedKey = Crypto.AES.encrypt(sinkPrivKey, global.myPrivKey); + DB.query('INSERT INTO sinkShares (floID, share) VALUE (?, ?)', [sinkID, '$$$' + encryptedKey]) + .then(_ => console.log('SinkID:', sinkID, '|SinkEnKey:', encryptedKey)) + .catch(error => console.error(error)); +} + +function transferMoneyToNewSink(oldSinkID, oldSinkKey) { + return new Promise((resolve, reject) => { + let newSink = generateNewSink(); + floBlockchainAPI.getBalance(oldSinkID).then(balFLO => { + tokenAPI.getBalance(oldSinkID).then(balRupee => { + floBlockchainAPI.sendTx(oldSinkID, newSink.floID, balFLO - floGlobals.fee, oldSinkKey, `send ${balRupee} ${floGlobals.token}# |Exchange-market New sink`) + .then(result => resolve(newSink)) + .catch(error => reject(error)) + }).catch(error => reject(error)); + }).catch(error => reject(error)) + }) +} + +function collectShares(floID, sinkID, share) { + if (!collectShares.sinkID) { + collectShares.sinkID = sinkID; + collectShares.shares = {}; + } else if (collectShares.sinkID !== sinkID) + return console.error("Something is wrong! Slaves are sending different sinkID"); + collectShares.shares[floID] = share; + try { + let privKey = floCrypto.retrieveShamirSecret(Object.values(collectShares.shares)); + if (floCrypto.verifyPrivKey(privKey, collectShares.sinkID)) { + transferMoneyToNewSink(collectShares.sinkID, privKey).then(newSink => { + delete collectShares.sinkID; + delete collectShares.shares; + collectShares.active = false; + storeSink(newSink.floID, newSink.privKey); + sendSharesToNodes(newSink.floID, newSink.shares); + }).catch(error => console.error(error)); + } + } catch (error) { + //Unable to retrive sink private key. Waiting for more shares! Do nothing for now + }; +} + +function connectWS(floID) { + let url = nodeURL[floID]; + return new Promise((resolve, reject) => { + const ws = new WebSocket(url); + ws.on('open', _ => resolve(ws)); + ws.on('error', _ => reject(error)); + }) +} + +function connectToMaster(i = 0) { + if (i >= nodeList.length) { + console.error("No master is found, and myFloID is not in list. This should not happen!"); + process.exit(1); + } + let floID = nodeList[i]; + if (floID === myFloID) + serveAsMaster(); + else + connectWS(floID).then(ws => { + ws.floID = floID; + ws.onclose = () => connectToMaster(i); + serveAsSlave(ws); + }).catch(error => { + console.log(`Node(${floID}) is offline`); + connectToMaster(i + 1) + }); +} + +//Node becomes master +function serveAsMaster() { + app.resume(); + slave.stop(); + mod = MASTER_MODE; + informLiveNodes(); + collectShares.active = true; +} + +function serveAsSlave(ws) { + app.pause(); + slave.start(ws); + mod = SLAVE_MODE; +} + +function informLiveNodes() { + let message = { + floID: global.myFloID, + type: "UPDATE_MASTER", + pubKey: global.myPubKey, + req_time: Date.now() + }; + message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey); + message = JSON.stringify(message); + for (let n in nodeURL) + if (n !== global.myFloID) + connectWS(n).then(ws => { + ws.send(message); + ws.close(); + }).catch(error => console.warn(`Node ${n} is offline`)); +} + +function updateMaster(floID) { + let currentMaster = mod === MASTER_MODE ? global.floID : slave.masterWS.floID; + if (nodeList.indexOf(floID) < nodeList.indexOf(currentMaster)) + connectToMaster(); +} + +function slaveConnect(floID, ws) { + ws.floID = floID; + connectedSlaves[floID] = ws; + if (collectShares.active) + ws.send(JSON.stringify({ + command: "SEND_SHARE" + })); + else if (nodeShares[floID]) + sendShare(ws, nodeSinkID, nodeShares[floID]); +} + //Transmistter var nodeList; //Container for (backup) node list -function startBackupTransmitter(wss) { +function startBackupTransmitter(server) { + wss = new WebSocket.Server({ + server + }); wss.on('connection', ws => { ws.on('message', message => { //verify if from a backup node @@ -155,38 +296,64 @@ function startBackupTransmitter(wss) { let invalid = null, request = JSON.parse(message); console.debug(request); - if (!backupIDs.includes(request.floID)) - invalid = "FLO ID not approved for backup"; + if (!nodeList.includes(request.floID)) + invalid = `floID ${request.floID} not in nodeList`; else if (request.floID !== floCrypto.getFloID(request.pubKey)) invalid = "Invalid pubKey"; else if (!floCrypto.verifySign(request.type + "|" + request.req_time, request.sign, request.pubKey)) invalid = "Invalid signature"; - else if (request.type !== "BACKUP_SYNC") - invalid = "Invalid Request Type"; //TODO: check if request time is valid; + else switch (request.type) { + case "BACKUP_SYNC": + sendBackup(request.last_time, ws); + break; + case "UPDATE_MASTER": + updateMaster(request.floID); + break; + case "SLAVE_CONNECT": + slaveConnect(request.floID, ws); + break; + case "SINK_SHARE": + collectShares(request.floID, request.sinkID, request.share) + default: + invalid = "Invalid Request Type"; + } if (invalid) ws.send(JSON.stringify({ error: invalid })); - else - sendBackup(request.last_time, ws); } catch (error) { console.error(error); ws.send(JSON.stringify({ - mode: "SYNC_ERROR", + command: "SYNC_ERROR", error: 'Unable to process the request!' })); } }); + ws.on('close', () => { + // remove from connected slaves (if needed) + if (ws.floID in connectedSlaves) + delete connectedSlaves[ws.floID]; + }) }); } +function initProcess(a) { + app = a; + startBackupTransmitter(app.server); + connectToMaster(); +} + module.exports = { - init: startBackupTransmitter, + init: initProcess, set nodeList(ids) { nodeList = ids; }, set DB(db) { DB = db; + slave.DB = db; + }, + get wss() { + return wss; } }; \ No newline at end of file diff --git a/src/main.js b/src/main.js index 5166cf4..81c5c45 100644 --- a/src/main.js +++ b/src/main.js @@ -4,13 +4,13 @@ require('./set_globals'); require('./lib'); require('./floCrypto'); require('./floBlockchainAPI'); +require('./tokenAPI'); const Database = require("./database"); const App = require('./app'); const PORT = config['port']; const K_Bucket = require('./backup/KBucket'); -const slave = require('./backup/storage'); const transmit = require('./backup/transmit'); var DB, app; @@ -134,48 +134,9 @@ loadDataFromDB.trustedIDs = function() { function setDB(db) { DB = db; - slave.DB = DB; transmit.DB = DB; } -function connectWS(floID) { - let url = nodeURL[floID]; - return new Promise((resolve, reject) => { - const ws = new WebSocket(url); - ws.on('open', _ => resolve(ws)); - ws.on('error', _ => reject(error)); - }) -} - -function connectToMaster(i = 0) { - if (i >= nodeList.length) { - console.error("No master is found, and myFloID is not in list. This should not happen!"); - process.exit(1); - } - let floID = nodeList[i]; - if (floID === myFloID) - serveAsMaster(); - else - connectWS(floID).then(ws => { - ws.floID = floID; - ws.onclose = () => connectToMaster(i); - serveAsSlave(ws); - }).catch(error => { - console.log(`Node(${floID}) is offline`); - connectToMaster(i + 1) - }); -} - -function serveAsMaster() { - app.resume(); - slave.stop(); -} - -function serveAsSlave(masterWS) { - app.pause(); - slave.start(masterWS); -} - module.exports = function startServer(public_dir) { try { var _tmp = require('../args/keys.json'); @@ -208,8 +169,7 @@ module.exports = function startServer(public_dir) { refreshData(true).then(_ => { app.start(PORT).then(result => { console.log(result); - transmit.init(app.wss); - connectToMaster(); + transmit.init(app); }).catch(error => console.error(error)) }).catch(error => console.error(error)) }).catch(error => console.error(error)); From 0ba0f10245ca76a5b2e562e46225cb4b8028ea78 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 14 Jan 2022 04:11:25 +0530 Subject: [PATCH 7/9] Renaming files --- src/backup/{transmit.js => head.js} | 2 +- src/backup/{storage.js => slave.js} | 0 src/main.js | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) rename src/backup/{transmit.js => head.js} (99%) rename src/backup/{storage.js => slave.js} (100%) diff --git a/src/backup/transmit.js b/src/backup/head.js similarity index 99% rename from src/backup/transmit.js rename to src/backup/head.js index d327b27..898af78 100644 --- a/src/backup/transmit.js +++ b/src/backup/head.js @@ -1,5 +1,5 @@ 'use strict'; -const slave = require('./storage'); +const slave = require('./slave'); const WebSocket = require('ws'); const shareThreshold = 50 / 100; diff --git a/src/backup/storage.js b/src/backup/slave.js similarity index 100% rename from src/backup/storage.js rename to src/backup/slave.js diff --git a/src/main.js b/src/main.js index 81c5c45..695efb8 100644 --- a/src/main.js +++ b/src/main.js @@ -11,7 +11,7 @@ const App = require('./app'); const PORT = config['port']; const K_Bucket = require('./backup/KBucket'); -const transmit = require('./backup/transmit'); +const transmit = require('./backup/head'); var DB, app; From c32833e6253a38b0db054329e2ef9b961f45e7a4 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 14 Jan 2022 04:15:21 +0530 Subject: [PATCH 8/9] Adding application name in floGlobals --- public/floGlobals.js | 1 + src/main.js | 1 + 2 files changed, 2 insertions(+) diff --git a/public/floGlobals.js b/public/floGlobals.js index 6d7bc6b..1153a3c 100644 --- a/public/floGlobals.js +++ b/public/floGlobals.js @@ -8,6 +8,7 @@ const floGlobals = { FLO: ['https://livenet.flocha.in/', 'https://flosight.duckdns.org/'], FLO_TEST: ['https://testnet-flosight.duckdns.org', 'https://testnet.flocha.in/'] }, + application: "exchange", adminID: "FKAEdnPfjXLHSYwrXQu377ugN4tXU7VGdf", sendAmt: 0.001, fee: 0.0005, diff --git a/src/main.js b/src/main.js index 695efb8..cc8f8c3 100644 --- a/src/main.js +++ b/src/main.js @@ -1,3 +1,4 @@ +'use strict'; const config = require('../args/app-config.json'); global.floGlobals = require('../public/floGlobals'); require('./set_globals'); From abb82b0b342274520ab909b29eebd972a7db0d36 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 14 Jan 2022 04:52:41 +0530 Subject: [PATCH 9/9] Minor Fixes - Adding tables to SQL schema - Moved Kbucket usage from main.js to backup/head.js - node Kbucket, ordered NodeList are calculated in backup/head.js --- args/schema.sql | 61 +++++++++++++++++++++++++++------------------- src/backup/head.js | 10 +++++--- src/main.js | 24 +++++++----------- 3 files changed, 52 insertions(+), 43 deletions(-) diff --git a/args/schema.sql b/args/schema.sql index f394531..78610df 100644 --- a/args/schema.sql +++ b/args/schema.sql @@ -112,6 +112,17 @@ PRIMARY KEY(id), FOREIGN KEY (floID) REFERENCES Users(floID) ); +CREATE TABLE nodeList( +floID CHAR(34) NOT NULL, +uri TINYTEXT, +PRIMARY KEY(floID) +); + +CREATE TABLE trustedList( +floID CHAR(34) NOT NULL, +FOREIGN KEY (floID) REFERENCES Users(floID), +); + CREATE TABLE TagList ( id INT NOT NULL AUTO_INCREMENT, tag VARCHAR(50) NOT NULL, @@ -133,27 +144,34 @@ FOREIGN KEY (tag) REFERENCES TagList(tag) ); CREATE TABLE priceHistory ( - rate FLOAT NOT NULL, - rec_time DATETIME DEFAULT CURRENT_TIMESTAMP +rate FLOAT NOT NULL, +rec_time DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE auditTransaction( - rec_time DATETIME DEFAULT CURRENT_TIMESTAMP, - unit_price FLOAT NOT NULL, - quantity FLOAT NOT NULL, - total_cost FLOAT NOT NULL, - sellerID CHAR(34) NOT NULL, - FLO_seller_old FLOAT NOT NULL, - FLO_seller_new FLOAT NOT NULL, - Rupee_seller_old FLOAT NOT NULL, - Rupee_seller_new FLOAT NOT NULL, - buyerID CHAR(34) NOT NULL, - FLO_buyer_old FLOAT NOT NULL, - FLO_buyer_new FLOAT NOT NULL, - Rupee_buyer_old FLOAT NOT NULL, - Rupee_buyer_new FLOAT NOT NULL, - FOREIGN KEY (sellerID) REFERENCES Users(floID), - FOREIGN KEY (buyerID) REFERENCES Users(floID) +rec_time DATETIME DEFAULT CURRENT_TIMESTAMP, +unit_price FLOAT NOT NULL, +quantity FLOAT NOT NULL, +total_cost FLOAT NOT NULL, +sellerID CHAR(34) NOT NULL, +FLO_seller_old FLOAT NOT NULL, +FLO_seller_new FLOAT NOT NULL, +Rupee_seller_old FLOAT NOT NULL, +Rupee_seller_new FLOAT NOT NULL, +buyerID CHAR(34) NOT NULL, +FLO_buyer_old FLOAT NOT NULL, +FLO_buyer_new FLOAT NOT NULL, +Rupee_buyer_old FLOAT NOT NULL, +Rupee_buyer_new FLOAT NOT NULL, +FOREIGN KEY (sellerID) REFERENCES Users(floID), +FOREIGN KEY (buyerID) REFERENCES Users(floID) +); + +CREATE TABLE sinkShares( +floID CHAR(34) NOT NULL, +share TEXT, +time_ DATETIME DEFAULT CURRENT_TIMESTAMP, +PRIMARY KEY(floID, share) ); /* Backup feature (Table and Triggers) */ @@ -222,13 +240,6 @@ FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('outputRupee', NEW.id) ON CREATE TRIGGER outputRupee_D AFTER DELETE ON outputRupee FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('outputRupee', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT; -CREATE TRIGGER TagList_I AFTER INSERT ON TagList -FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('TagList', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; -CREATE TRIGGER TagList_U AFTER UPDATE ON TagList -FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('TagList', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; -CREATE TRIGGER TagList_D AFTER DELETE ON TagList -FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('TagList', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, timestamp=DEFAULT; - CREATE TRIGGER Tags_I AFTER INSERT ON Tags FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('Tags', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, timestamp=DEFAULT; CREATE TRIGGER Tags_U AFTER UPDATE ON Tags diff --git a/src/backup/head.js b/src/backup/head.js index 898af78..e64bde5 100644 --- a/src/backup/head.js +++ b/src/backup/head.js @@ -1,9 +1,12 @@ 'use strict'; + +const K_Bucket = require('./KBucket'); const slave = require('./slave'); const WebSocket = require('ws'); const shareThreshold = 50 / 100; var DB, app, wss; //Container for database and app +var nodeList, nodeURL, nodeKBucket; //Container for (backup) node list var nodeShares = null, nodeSinkID = null, connectedSlaves = {}, @@ -284,7 +287,6 @@ function slaveConnect(floID, ws) { } //Transmistter -var nodeList; //Container for (backup) node list function startBackupTransmitter(server) { wss = new WebSocket.Server({ server @@ -346,8 +348,10 @@ function initProcess(a) { module.exports = { init: initProcess, - set nodeList(ids) { - nodeList = ids; + set nodeList(list) { + nodeURL = list; + nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL)); + nodeList = nodeKBucket.order; }, set DB(db) { DB = db; diff --git a/src/main.js b/src/main.js index cc8f8c3..2c91482 100644 --- a/src/main.js +++ b/src/main.js @@ -11,13 +11,10 @@ const Database = require("./database"); const App = require('./app'); const PORT = config['port']; -const K_Bucket = require('./backup/KBucket'); -const transmit = require('./backup/head'); +const backup = require('./backup/head'); var DB, app; -var nodeList, nodeURL, nodeKBucket; - function refreshData(startup = false) { return new Promise((resolve, reject) => { refreshDataFromBlockchain().then(result => { @@ -47,10 +44,10 @@ function refreshDataFromBlockchain() { nodes_change = true; if (content.Nodes.remove) for (let n of content.Nodes.remove) - promises.push(DB.query("DELETE FROM nodeURL WHERE floID=?", [n])); + promises.push(DB.query("DELETE FROM nodeList WHERE floID=?", [n])); if (content.Nodes.add) for (let n in content.Nodes.add) - promises.push(DB.query("INSERT INTO nodeURL (floID, url) VALUE (?,?) ON DUPLICATE KEY UPDATE url=NEW.url", [n, content.Nodes.add[n]])); + promises.push(DB.query("INSERT INTO nodeList (floID, uri) VALUE (?,?) ON DUPLICATE KEY UPDATE uri=NEW.uri", [n, content.Nodes.add[n]])); } //Trusted List if (content.Trusted) { @@ -106,16 +103,13 @@ function loadDataFromDB(changes, startup) { loadDataFromDB.nodeList = function() { return new Promise((resolve, reject) => { - DB.query("SELECT * FROM nodeList").then(result => { + DB.query("SELECT (floID, uri) FROM nodeList").then(result => { let nodes = {} for (let i in result) - nodes[result[i].floID] = result[i]; - nodeURL = nodes; - nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL)); - nodeList = nodeKBucket.order; + nodes[result[i].floID] = result[i].uri; //update dependents - transmit.nodeList = nodeList; - resolve(nodeList); + backup.nodeList = nodes; + resolve(nodes); }).catch(error => reject(error)) }) } @@ -135,7 +129,7 @@ loadDataFromDB.trustedIDs = function() { function setDB(db) { DB = db; - transmit.DB = DB; + backup.DB = DB; } module.exports = function startServer(public_dir) { @@ -170,7 +164,7 @@ module.exports = function startServer(public_dir) { refreshData(true).then(_ => { app.start(PORT).then(result => { console.log(result); - transmit.init(app); + backup.init(app); }).catch(error => console.error(error)) }).catch(error => console.error(error)) }).catch(error => console.error(error));