From db9ff02488d52777ff583cbba04f157b7ccd6236 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Thu, 30 Dec 2021 06:53:04 +0530 Subject: [PATCH] Update main.js - fetch data (nodeList, TagList, trustedIDs) from blockchain - Act as slave or master when needed --- src/main.js | 187 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 178 insertions(+), 9 deletions(-) diff --git a/src/main.js b/src/main.js index ba5005a..5166cf4 100644 --- a/src/main.js +++ b/src/main.js @@ -9,6 +9,173 @@ const Database = require("./database"); const App = require('./app'); const PORT = config['port']; +const K_Bucket = require('./backup/KBucket'); +const slave = require('./backup/storage'); +const transmit = require('./backup/transmit'); + +var DB, app; + +var nodeList, nodeURL, nodeKBucket; + +function refreshData(startup = false) { + return new Promise((resolve, reject) => { + refreshDataFromBlockchain().then(result => { + loadDataFromDB(result, startup) + .then(_ => resolve("Data refresh successful")) + .catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + +function refreshDataFromBlockchain() { + return new Promise((resolve, reject) => { + DB.query("SELECT num FROM lastTx WHERE floID=?", [floGlobals.adminID]).then(result => { + let lastTx = result.length ? result[0].num : 0; + floBlockchainAPI.readData(floGlobals.adminID, { + ignoreOld: lastTx, + sentOnly: true, + pattern: floGlobals.application + }).then(result => { + let promises = [], + nodes_change = false, + trusted_change = false; + result.data.reverse().forEach(data => { + var content = JSON.parse(data)[floGlobals.application]; + //Node List + if (content.Nodes) { + nodes_change = true; + if (content.Nodes.remove) + for (let n of content.Nodes.remove) + promises.push(DB.query("DELETE FROM nodeURL WHERE floID=?", [n])); + if (content.Nodes.add) + for (let n in content.Nodes.add) + promises.push(DB.query("INSERT INTO nodeURL (floID, url) VALUE (?,?) ON DUPLICATE KEY UPDATE url=NEW.url", [n, content.Nodes.add[n]])); + } + //Trusted List + if (content.Trusted) { + trusted_change = true; + if (content.Trusted.remove) + for (let id of content.Trusted.remove) + promises.push(DB.query("DELETE FROM trustedList WHERE floID=?", [id])); + if (content.Trusted.add) + for (let id of content.Trusted.add) + promises.push(DB.query("INSERT INTO trustedList (floID) VALUE (?) ON DUPLICATE KEY UPDATE floID=NEW.floID", [id])); + } + //Tag List with priority and API + if (content.Tag) { + if (content.Tag.remove) + for (let t of content.Tag.remove) + promises.push(DB.query("DELETE FROM TagList WHERE tag=?", [t])); + if (content.Tag.add) + for (let t in content.Tag.add) + promises.push(DB.query("INSERT INTO TagList (tag, sellPriority, buyPriority, api) VALUE (?,?,?,?)", [t, content.Tag.add[t].sellPriority, content.Tag.add[t].buyPriority, content.Tag.add[t].api])); + if (content.Tag.update) + for (let t in content.Tag.update) + for (let a in content.Tag.update[t]) + promises.push(`UPDATE TagList WHERE tag=? SET ${a}=?`, [t, content.Tag.update[t][a]]); + } + }); + promises.push(DB.query("INSERT INTO lastTx (floID, num) VALUE (?, ?) ON DUPLICATE KEY UPDATE num=NEW.num", [floGlobals.adminID, result.totalTxs])); + //Check if all save process were successful + Promise.allSettled(promises).then(results => { + if (results.reduce((a, r) => r.status === "rejected" ? ++a : a, 0)) + console.warn("Some data might not have been saved in database correctly"); + }); + resolve({ + nodes: nodes_change, + trusted: trusted_change + }); + }).catch(error => reject(error)); + }).catch(error => reject(error)) + }) +} + +function loadDataFromDB(changes, startup) { + return new Promise((resolve, reject) => { + let promises = []; + if (startup || changes.nodes) + promises.push(loadDataFromDB.nodeList()); + if (startup || changes.trusted) + promises.push(loadDataFromDB.trustedIDs); + Promise.all(promises) + .then(_ => resolve("Data load successful")) + .catch(error => reject(error)) + }) +} + +loadDataFromDB.nodeList = function() { + return new Promise((resolve, reject) => { + DB.query("SELECT * FROM nodeList").then(result => { + let nodes = {} + for (let i in result) + nodes[result[i].floID] = result[i]; + nodeURL = nodes; + nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL)); + nodeList = nodeKBucket.order; + //update dependents + transmit.nodeList = nodeList; + resolve(nodeList); + }).catch(error => reject(error)) + }) +} + +loadDataFromDB.trustedIDs = function() { + return new Promise((resolve, reject) => { + DB.query("SELECT * FROM trustedList").then(result => { + let trustedIDs = []; + for (let i in result) + trustedIDs.push(result[i].floID); + //update dependents + app.trustedIDs = trustedIDs; + resolve(trustedIDs); + }).catch(error => reject(error)) + }) +} + +function setDB(db) { + DB = db; + slave.DB = DB; + transmit.DB = DB; +} + +function connectWS(floID) { + let url = nodeURL[floID]; + return new Promise((resolve, reject) => { + const ws = new WebSocket(url); + ws.on('open', _ => resolve(ws)); + ws.on('error', _ => reject(error)); + }) +} + +function connectToMaster(i = 0) { + if (i >= nodeList.length) { + console.error("No master is found, and myFloID is not in list. This should not happen!"); + process.exit(1); + } + let floID = nodeList[i]; + if (floID === myFloID) + serveAsMaster(); + else + connectWS(floID).then(ws => { + ws.floID = floID; + ws.onclose = () => connectToMaster(i); + serveAsSlave(ws); + }).catch(error => { + console.log(`Node(${floID}) is offline`); + connectToMaster(i + 1) + }); +} + +function serveAsMaster() { + app.resume(); + slave.stop(); +} + +function serveAsSlave(masterWS) { + app.pause(); + slave.start(masterWS); +} + module.exports = function startServer(public_dir) { try { var _tmp = require('../args/keys.json'); @@ -35,13 +202,15 @@ module.exports = function startServer(public_dir) { global.PUBLIC_DIR = public_dir; console.debug(PUBLIC_DIR, global.myFloID); - Database(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(DB => { - const app = App(config['secret'], config['trusted-floIDs'], DB); - app.listen(PORT, () => console.log(`Server Running at port ${PORT}`)); - //start backup - if (config["backup-port"] && config["backup-floIDs"].length) { - var backupTransmitter = require('./backup/transmit'); - backupTransmitter = new backupTransmitter(DB, config["backup-port"], config["backup-floIDs"]); - } - }); + Database(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(db => { + setDB(db); + app = new App(config['secret'], DB); + refreshData(true).then(_ => { + app.start(PORT).then(result => { + console.log(result); + transmit.init(app.wss); + connectToMaster(); + }).catch(error => console.error(error)) + }).catch(error => console.error(error)) + }).catch(error => console.error(error)); }; \ No newline at end of file