Update main.js
- fetch data (nodeList, TagList, trustedIDs) from blockchain - Act as slave or master when needed
This commit is contained in:
parent
0b70f099b0
commit
db9ff02488
187
src/main.js
187
src/main.js
@ -9,6 +9,173 @@ const Database = require("./database");
|
||||
const App = require('./app');
|
||||
const PORT = config['port'];
|
||||
|
||||
const K_Bucket = require('./backup/KBucket');
|
||||
const slave = require('./backup/storage');
|
||||
const transmit = require('./backup/transmit');
|
||||
|
||||
var DB, app;
|
||||
|
||||
var nodeList, nodeURL, nodeKBucket;
|
||||
|
||||
function refreshData(startup = false) {
|
||||
return new Promise((resolve, reject) => {
|
||||
refreshDataFromBlockchain().then(result => {
|
||||
loadDataFromDB(result, startup)
|
||||
.then(_ => resolve("Data refresh successful"))
|
||||
.catch(error => reject(error))
|
||||
}).catch(error => reject(error))
|
||||
})
|
||||
}
|
||||
|
||||
function refreshDataFromBlockchain() {
|
||||
return new Promise((resolve, reject) => {
|
||||
DB.query("SELECT num FROM lastTx WHERE floID=?", [floGlobals.adminID]).then(result => {
|
||||
let lastTx = result.length ? result[0].num : 0;
|
||||
floBlockchainAPI.readData(floGlobals.adminID, {
|
||||
ignoreOld: lastTx,
|
||||
sentOnly: true,
|
||||
pattern: floGlobals.application
|
||||
}).then(result => {
|
||||
let promises = [],
|
||||
nodes_change = false,
|
||||
trusted_change = false;
|
||||
result.data.reverse().forEach(data => {
|
||||
var content = JSON.parse(data)[floGlobals.application];
|
||||
//Node List
|
||||
if (content.Nodes) {
|
||||
nodes_change = true;
|
||||
if (content.Nodes.remove)
|
||||
for (let n of content.Nodes.remove)
|
||||
promises.push(DB.query("DELETE FROM nodeURL WHERE floID=?", [n]));
|
||||
if (content.Nodes.add)
|
||||
for (let n in content.Nodes.add)
|
||||
promises.push(DB.query("INSERT INTO nodeURL (floID, url) VALUE (?,?) ON DUPLICATE KEY UPDATE url=NEW.url", [n, content.Nodes.add[n]]));
|
||||
}
|
||||
//Trusted List
|
||||
if (content.Trusted) {
|
||||
trusted_change = true;
|
||||
if (content.Trusted.remove)
|
||||
for (let id of content.Trusted.remove)
|
||||
promises.push(DB.query("DELETE FROM trustedList WHERE floID=?", [id]));
|
||||
if (content.Trusted.add)
|
||||
for (let id of content.Trusted.add)
|
||||
promises.push(DB.query("INSERT INTO trustedList (floID) VALUE (?) ON DUPLICATE KEY UPDATE floID=NEW.floID", [id]));
|
||||
}
|
||||
//Tag List with priority and API
|
||||
if (content.Tag) {
|
||||
if (content.Tag.remove)
|
||||
for (let t of content.Tag.remove)
|
||||
promises.push(DB.query("DELETE FROM TagList WHERE tag=?", [t]));
|
||||
if (content.Tag.add)
|
||||
for (let t in content.Tag.add)
|
||||
promises.push(DB.query("INSERT INTO TagList (tag, sellPriority, buyPriority, api) VALUE (?,?,?,?)", [t, content.Tag.add[t].sellPriority, content.Tag.add[t].buyPriority, content.Tag.add[t].api]));
|
||||
if (content.Tag.update)
|
||||
for (let t in content.Tag.update)
|
||||
for (let a in content.Tag.update[t])
|
||||
promises.push(`UPDATE TagList WHERE tag=? SET ${a}=?`, [t, content.Tag.update[t][a]]);
|
||||
}
|
||||
});
|
||||
promises.push(DB.query("INSERT INTO lastTx (floID, num) VALUE (?, ?) ON DUPLICATE KEY UPDATE num=NEW.num", [floGlobals.adminID, result.totalTxs]));
|
||||
//Check if all save process were successful
|
||||
Promise.allSettled(promises).then(results => {
|
||||
if (results.reduce((a, r) => r.status === "rejected" ? ++a : a, 0))
|
||||
console.warn("Some data might not have been saved in database correctly");
|
||||
});
|
||||
resolve({
|
||||
nodes: nodes_change,
|
||||
trusted: trusted_change
|
||||
});
|
||||
}).catch(error => reject(error));
|
||||
}).catch(error => reject(error))
|
||||
})
|
||||
}
|
||||
|
||||
function loadDataFromDB(changes, startup) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let promises = [];
|
||||
if (startup || changes.nodes)
|
||||
promises.push(loadDataFromDB.nodeList());
|
||||
if (startup || changes.trusted)
|
||||
promises.push(loadDataFromDB.trustedIDs);
|
||||
Promise.all(promises)
|
||||
.then(_ => resolve("Data load successful"))
|
||||
.catch(error => reject(error))
|
||||
})
|
||||
}
|
||||
|
||||
loadDataFromDB.nodeList = function() {
|
||||
return new Promise((resolve, reject) => {
|
||||
DB.query("SELECT * FROM nodeList").then(result => {
|
||||
let nodes = {}
|
||||
for (let i in result)
|
||||
nodes[result[i].floID] = result[i];
|
||||
nodeURL = nodes;
|
||||
nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL));
|
||||
nodeList = nodeKBucket.order;
|
||||
//update dependents
|
||||
transmit.nodeList = nodeList;
|
||||
resolve(nodeList);
|
||||
}).catch(error => reject(error))
|
||||
})
|
||||
}
|
||||
|
||||
loadDataFromDB.trustedIDs = function() {
|
||||
return new Promise((resolve, reject) => {
|
||||
DB.query("SELECT * FROM trustedList").then(result => {
|
||||
let trustedIDs = [];
|
||||
for (let i in result)
|
||||
trustedIDs.push(result[i].floID);
|
||||
//update dependents
|
||||
app.trustedIDs = trustedIDs;
|
||||
resolve(trustedIDs);
|
||||
}).catch(error => reject(error))
|
||||
})
|
||||
}
|
||||
|
||||
function setDB(db) {
|
||||
DB = db;
|
||||
slave.DB = DB;
|
||||
transmit.DB = DB;
|
||||
}
|
||||
|
||||
function connectWS(floID) {
|
||||
let url = nodeURL[floID];
|
||||
return new Promise((resolve, reject) => {
|
||||
const ws = new WebSocket(url);
|
||||
ws.on('open', _ => resolve(ws));
|
||||
ws.on('error', _ => reject(error));
|
||||
})
|
||||
}
|
||||
|
||||
function connectToMaster(i = 0) {
|
||||
if (i >= nodeList.length) {
|
||||
console.error("No master is found, and myFloID is not in list. This should not happen!");
|
||||
process.exit(1);
|
||||
}
|
||||
let floID = nodeList[i];
|
||||
if (floID === myFloID)
|
||||
serveAsMaster();
|
||||
else
|
||||
connectWS(floID).then(ws => {
|
||||
ws.floID = floID;
|
||||
ws.onclose = () => connectToMaster(i);
|
||||
serveAsSlave(ws);
|
||||
}).catch(error => {
|
||||
console.log(`Node(${floID}) is offline`);
|
||||
connectToMaster(i + 1)
|
||||
});
|
||||
}
|
||||
|
||||
function serveAsMaster() {
|
||||
app.resume();
|
||||
slave.stop();
|
||||
}
|
||||
|
||||
function serveAsSlave(masterWS) {
|
||||
app.pause();
|
||||
slave.start(masterWS);
|
||||
}
|
||||
|
||||
module.exports = function startServer(public_dir) {
|
||||
try {
|
||||
var _tmp = require('../args/keys.json');
|
||||
@ -35,13 +202,15 @@ module.exports = function startServer(public_dir) {
|
||||
global.PUBLIC_DIR = public_dir;
|
||||
console.debug(PUBLIC_DIR, global.myFloID);
|
||||
|
||||
Database(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(DB => {
|
||||
const app = App(config['secret'], config['trusted-floIDs'], DB);
|
||||
app.listen(PORT, () => console.log(`Server Running at port ${PORT}`));
|
||||
//start backup
|
||||
if (config["backup-port"] && config["backup-floIDs"].length) {
|
||||
var backupTransmitter = require('./backup/transmit');
|
||||
backupTransmitter = new backupTransmitter(DB, config["backup-port"], config["backup-floIDs"]);
|
||||
}
|
||||
});
|
||||
Database(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(db => {
|
||||
setDB(db);
|
||||
app = new App(config['secret'], DB);
|
||||
refreshData(true).then(_ => {
|
||||
app.start(PORT).then(result => {
|
||||
console.log(result);
|
||||
transmit.init(app.wss);
|
||||
connectToMaster();
|
||||
}).catch(error => console.error(error))
|
||||
}).catch(error => console.error(error))
|
||||
}).catch(error => console.error(error));
|
||||
};
|
||||
Loading…
Reference in New Issue
Block a user