diff --git a/src/app.js b/src/app.js new file mode 100644 index 0000000..83bf4f1 --- /dev/null +++ b/src/app.js @@ -0,0 +1,127 @@ +'use strict'; +const express = require('express'); +//const cookieParser = require("cookie-parser"); +//const sessions = require('express-session'); +const Request = require('./request'); +const path = require('path'); +const PUBLIC_DIR = path.resolve(__dirname, '..', 'docs'); + +module.exports = function App(secret) { + + if (!(this instanceof App)) + return new App(secret); + + var server = null; + const app = express(); + //session middleware + /*app.use(sessions({ + secret: secret, + saveUninitialized: true, + resave: false, + name: "session" + }));*/ + // parsing the incoming data + app.use(express.json()); + app.use(express.urlencoded({ + extended: true + })); + //serving public file + app.use(express.static(PUBLIC_DIR)); + // cookie parser middleware + //app.use(cookieParser()); + + /* Decentralising - Users will load from user-end files and request via APIs only + //Initital page loading + app.get('/', (req, res) => res.sendFile('home.html', { + root: PUBLIC_DIR + })); + */ + + app.use(function (req, res, next) { + res.setHeader('Access-Control-Allow-Origin', "*"); + // Request methods you wish to allow + res.setHeader('Access-Control-Allow-Methods', 'GET, POST'); + // Request headers you wish to allow + res.setHeader('Access-Control-Allow-Headers', 'X-Requested-With,content-type'); + // Pass to next layer of middleware + next(); + }) + + //get code for login + app.get('/get-login-code', Request.GetLoginCode); + + //login request + app.post('/login', Request.Login); + + //logout request + app.post('/logout', Request.Logout); + + //place sell or buy order + app.post('/buy', Request.PlaceBuyOrder); + app.post('/sell', Request.PlaceSellOrder); + + //cancel sell or buy order + app.post('/cancel', Request.CancelOrder); + + //transfer amount to another user + //app.post('/transfer-asset', Request.TransferAsset); + + //list all orders and trades + app.get('/list-sellorders', Request.ListSellOrders); + app.get('/list-buyorders', Request.ListBuyOrders); + app.get('/list-trades', Request.ListTradeTransactions); + + //get rates, balance and tx + app.get('/rate-history', Request.GetRateHistory); + //app.get('/get-balance', Request.GetBalance); + //app.get('/get-transaction', Request.GetTransaction); + app.get('/get-sink', Request.GetSink); + + //get account details + app.post('/account', Request.Account); + + //withdraw and deposit request + app.post('/deposit-asset', Request.DepositAsset); + app.post('/withdraw-asset', Request.WithdrawAsset); + app.post('/get-transact', Request.GetUserTransacts); + + //generate or discard sinks (admin only) + app.post('/generate-sink', Request.GenerateSink); + app.post('/reshare-sink', Request.ReshareSink); + app.post('/discard-sink', Request.DiscardSink); + + Request.secret = secret; + + //Properties + let self = this; + + //return server, express-app + Object.defineProperty(self, "server", { + get: () => server + }); + Object.defineProperty(self, "express", { + get: () => app + }); + + //Refresh data (from blockchain) + //self.refreshData = (nodeList) => Request.refreshData(nodeList); + + //Start (or) Stop servers + self.start = (port) => new Promise(resolve => { + server = app.listen(port, () => { + resolve(`Server Running at port ${port}`); + }); + }); + self.stop = () => new Promise(resolve => { + server.close(() => { + server = null; + resolve('Server stopped'); + }); + }); + + //(Node is not master) Pause serving the clients + self.pause = () => Request.pause(); + //(Node is master) Resume serving the clients + self.resume = () => Request.resume(); + +} \ No newline at end of file diff --git a/src/background.js b/src/background.js new file mode 100644 index 0000000..d8cb45b --- /dev/null +++ b/src/background.js @@ -0,0 +1,249 @@ +'use strict'; + +const keys = require('./keys'); +const blockchain = require('./blockchain'); +const pCode = require('../docs/scripts/floTradeAPI').processCode; +const DB = require("./database"); +const coupling = require('./coupling'); +const price = require('./price'); + +const { + PERIOD_INTERVAL, + REQUEST_TIMEOUT +} = require('./_constants')['background']; + +var updateBalance; // container for updateBalance function + +const verifyTx = {}; + +function confirmDepositFLO() { + DB.query("SELECT id, floID, txid FROM VaultTransactions WHERE mode=? AND asset=? AND asset_type=? AND r_status=?", [pCode.VAULT_MODE_DEPOSIT, "FLO", pCode.ASSET_TYPE_COIN, pCode.STATUS_PENDING]).then(results => { + results.forEach(r => { + verifyTx.FLO(r.floID, r.txid, keys.sink_groups.TRADE).then(amount => { + var txQueries = []; + txQueries.push(updateBalance.add(r.floID, "FLO", amount)); + txQueries.push(["UPDATE VaultTransactions SET r_status=?, amount=? WHERE id=?", [pCode.STATUS_SUCCESS, amount, r.id]]); + DB.transaction(txQueries) + .then(result => console.info("FLO deposited:", r.floID, amount)) + .catch(error => console.error(error)) + }).catch(error => { + console.error(error); + if (error[0]) + DB.query("UPDATE VaultTransactions SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id]) + .then(_ => null).catch(error => console.error(error)); + }); + }) + }).catch(error => console.error(error)) +} + +verifyTx.FLO = function (sender, txid, group) { + return new Promise((resolve, reject) => { + floBlockchainAPI.getTx(txid).then(tx => { + let vin_sender = tx.vin.filter(v => v.addr === sender) + if (!vin_sender.length) + return reject([true, "Transaction not sent by the sender"]); + if (vin_sender.length !== tx.vin.length) + return reject([true, "Transaction input containes other floIDs"]); + if (!tx.blockheight) + return reject([false, "Transaction not included in any block yet"]); + if (!tx.confirmations) + return reject([false, "Transaction not confirmed yet"]); + let amount = tx.vout.reduce((a, v) => keys.sink_chest.includes(group, v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); + if (amount == 0) + return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes) + else + resolve(amount); + }).catch(error => reject([false, error])) + }) +} + +function confirmDepositBTC() { + DB.query("SELECT id, floID, txid FROM VaultTransactions WHERE mode=? AND asset=? AND asset_type=? AND r_status=?", [pCode.VAULT_MODE_DEPOSIT, "BTC", pCode.ASSET_TYPE_COIN, pCode.STATUS_PENDING]).then(results => { + results.forEach(r => { + verifyTx.BTC(r.floID, r.txid, keys.sink_groups.TRADE).then(amount => { + var txQueries = []; + txQueries.push(updateBalance.add(r.floID, "BTC", amount)); + txQueries.push(["UPDATE VaultTransactions SET r_status=?, amount=? WHERE id=?", [pCode.STATUS_SUCCESS, amount, r.id]]); + DB.transaction(txQueries) + .then(result => console.info("BTC deposited:", r.floID, amount)) + .catch(error => console.error(error)) + }).catch(error => { + console.error(error); + if (error[0]) + DB.query("UPDATE VaultTransactions SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id]) + .then(_ => null).catch(error => console.error(error)); + }); + }) + }).catch(error => console.error(error)) +} + +verifyTx.BTC = function (sender, txid, group) { + return new Promise((resolve, reject) => { + btcOperator.getTx(txid).then(tx => { + let vin_sender = tx.inputs.filter(v => floCrypto.isSameAddr(v.address, sender)) + if (!vin_sender.length) + return reject([true, "Transaction not sent by the sender"]); + if (vin_sender.length !== tx.inputs.length) + return reject([true, "Transaction input containes other floIDs"]); + if (!tx.block) + return reject([false, "Transaction not included in any block yet"]); + if (!tx.confirmations) + return reject([false, "Transaction not confirmed yet"]); + let amount = tx.outputs.reduce((a, v) => + keys.sink_chest.includes(group, floCrypto.toFloID(v.address, { bech: [coinjs.bech32.version] })) ? a + parseFloat(v.value) : a, 0); + if (amount == 0) + return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes) + else + resolve(amount); + }).catch(error => reject([false, error])) + }) +} + +/* +function confirmDepositToken() { + DB.query("SELECT id, floID, txid FROM VaultTransactions WHERE mode=? AND asset_type=? AND r_status=?", [pCode.VAULT_MODE_DEPOSIT, pCode.ASSET_TYPE_TOKEN, pCode.STATUS_PENDING]).then(results => { + results.forEach(r => { + verifyTx.token(r.floID, r.txid, keys.sink_groups.TRADE).then(({ token, amount, flo_amount }) => { + DB.query("SELECT id FROM VaultTransactions where floID=? AND mode=? AND asset=? AND asset_type=? AND txid=?", [r.floID, pCode.VAULT_MODE_DEPOSIT, "FLO", pCode.ASSET_TYPE_COIN, r.txid]).then(result => { + let txQueries = []; + //Add the FLO balance if necessary + if (!result.length) { + txQueries.push(updateBalance.add(r.floID, "FLO", flo_amount)); + txQueries.push(["INSERT INTO VaultTransactions(txid, floID, mode, asset_type, asset, amount, r_status) VALUES (?)", [[r.txid, r.floID, pCode.VAULT_MODE_DEPOSIT, pCode.ASSET_TYPE_COIN, "FLO", flo_amount, pCode.STATUS_SUCCESS]]]); + } + txQueries.push(["UPDATE VaultTransactions SET r_status=?, asset=?, amount=? WHERE id=?", [pCode.STATUS_SUCCESS, token, amount, r.id]]); + txQueries.push(updateBalance.add(r.floID, token, amount)); + DB.transaction(txQueries) + .then(result => console.info("Token deposited:", r.floID, token, amount)) + .catch(error => console.error(error)); + }).catch(error => console.error(error)); + }).catch(error => { + console.error(error); + if (error[0]) + DB.query("UPDATE VaultTransactions SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id]) + .then(_ => null).catch(error => console.error(error)); + }); + }) + }).catch(error => console.error(error)) +} + +verifyTx.token = function (sender, txid, group, currencyOnly = false) { + return new Promise((resolve, reject) => { + floTokenAPI.getTx(txid).then(tx => { + if (tx.parsedFloData.type !== "transfer") + return reject([true, "Transaction type not 'transfer'"]); + else if (tx.parsedFloData.transferType !== "token") + return reject([true, "Transaction transfer is not 'token'"]); + var token = tx.parsedFloData.tokenIdentification, + amount = tx.parsedFloData.tokenAmount; + if (currencyOnly && token !== floGlobals.currency) + return reject([true, "Token not currency"]); + else if (!currencyOnly && ((!keys.assets.includes(token) && token !== floGlobals.currency) || token === "FLO")) + return reject([true, "Token not authorised"]); + let vin_sender = tx.transactionDetails.vin.filter(v => v.addr === sender) + if (!vin_sender.length) + return reject([true, "Transaction not sent by the sender"]); + let flo_amount = tx.transactionDetails.vout.reduce((a, v) => keys.sink_chest.includes(group, v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); + if (flo_amount == 0) + return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes) + else + resolve({ token, amount, flo_amount }); + }).catch(error => reject([false, error])) + }) +} +*/ + +function retryVaultWithdrawal() { + DB.query("SELECT id, floID, asset, asset_type, amount FROM VaultTransactions WHERE mode=? AND r_status=?", [pCode.VAULT_MODE_WITHDRAW, pCode.STATUS_PENDING]).then(results => { + results.forEach(r => { + if (r.asset_type == pCode.ASSET_TYPE_COIN) { + if (r.asset == "FLO" || r.asset == "BTC") + blockchain.withdrawAsset.retry(r.floID, r.asset, r.amount, r.id); + } else if (r.asset_type == pCode.ASSET_TYPE_TOKEN) + blockchain.withdrawAsset.retry(r.floID, r.asset, r.amount, r.id) + }) + }).catch(error => console.error(error)) +} + +function confirmVaultWithdraw() { + DB.query("SELECT id, floID, asset, asset_type, amount, txid FROM VaultTransactions WHERE mode=? AND r_status=?", [pCode.VAULT_MODE_WITHDRAW, pCode.STATUS_CONFIRMATION]).then(results => { + results.forEach(r => { + if (r.asset_type == pCode.ASSET_TYPE_COIN) { + if (r.asset == "FLO") + floBlockchainAPI.getTx(r.txid).then(tx => { + if (!tx.blockheight || !tx.confirmations) //Still not confirmed + return; + DB.query("UPDATE VaultTransactions SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id]) + .then(result => console.info("FLO withdrawed:", r.floID, r.amount)) + .catch(error => console.error(error)) + }).catch(error => console.error(error)); + else if (r.asset == "BTC") + btcOperator.getTx(r.txid).then(tx => { + if (!tx.block || !tx.confirmations) //Still not confirmed + return; + DB.query("UPDATE VaultTransactions SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id]) + .then(result => console.info("BTC withdrawed:", r.floID, r.amount)) + .catch(error => console.error(error)) + }).catch(error => console.error(error)); + } else if (r.asset_type == pCode.ASSET_TYPE_TOKEN) + floTokenAPI.getTx(r.txid).then(tx => { + if (!tx.transactionDetails.blockheight || !tx.transactionDetails.confirmations) //Still not confirmed + return; + DB.query("UPDATE VaultTransactions SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id]) + .then(result => console.info("Token withdrawed:", r.floID, r.asset, r.amount)) + .catch(error => console.error(error)); + }).catch(error => console.error(error)); + }) + }).catch(error => console.error(error)); +} + +//Periodic Process + +function processAll() { + //deposit-withdraw asset balance + if (keys.sink_chest.list(keys.sink_groups.TRADE).length) { + confirmDepositFLO(); + confirmDepositBTC(); + //confirmDepositToken(); //commented as its only BTC to FLO and vise versa for now + retryVaultWithdrawal(); + confirmVaultWithdraw(); + } +} + +var lastSyncBlockHeight = 0; + +function periodicProcess() { + floBlockchainAPI.promisedAPI('api/blocks?limit=1').then(result => { + if (lastSyncBlockHeight < result.blocks[0].height) { + lastSyncBlockHeight = result.blocks[0].height; + processAll(); + console.log("Last Block :", lastSyncBlockHeight); + } + }).catch(error => console.error(error)); +} + +function periodicProcess_start() { + periodicProcess_stop(); + periodicProcess(); + //keys.assets.tradeList.forEach(asset => coupling.initiate(asset)); //Trigger pairing only when order placed + periodicProcess.instance = setInterval(periodicProcess, PERIOD_INTERVAL); +} + +function periodicProcess_stop() { + if (periodicProcess.instance !== undefined) { + clearInterval(periodicProcess.instance); + delete periodicProcess.instance; + } + coupling.stopAll(); +} + +module.exports = { + blockchain, + periodicProcess: { + start: periodicProcess_start, + stop: periodicProcess_stop + }, + set updateBalance(f) { + updateBalance = f; + } +} \ No newline at end of file diff --git a/src/backup/head.js b/src/backup/head.js new file mode 100644 index 0000000..b332198 --- /dev/null +++ b/src/backup/head.js @@ -0,0 +1,492 @@ +'use strict'; + +const keys = require('../keys'); +const K_Bucket = require('../../docs/scripts/floTradeAPI').K_Bucket; +const slave = require('./slave'); +const sync = require('./sync'); +const WebSocket = require('ws'); + +const { BACKUP_INTERVAL } = require("../_constants")["backup"]; +const { DISCARD_COOLDOWN } = require("../_constants")["keys"]; + +var _app, _wss; //Container for app and wss +var nodeList, nodeURL, nodeKBucket; //Container for (backup) node list +const connectedSlaves = {}, + shares_collected = {}, + shares_pending = {}, + discarded_sinks = []; + +var _mode = null; +const SLAVE_MODE = 0, + MASTER_MODE = 1; + +//Shares +function generateShares(sinkKey) { + let nextNodes = nodeKBucket.nextNode(keys.node_id, null), + aliveNodes = Object.keys(connectedSlaves); + nextNodes.unshift(keys.node_id); + aliveNodes.unshift(keys.node_id); + let shares, mappedShares = {}; + shares = keys.generateShares(sinkKey, nextNodes.length, aliveNodes.length); + for (let i in nextNodes) + mappedShares[nextNodes[i]] = shares[i]; + return mappedShares; +} + +function sendShares(ws, sinkID) { + if (!(sinkID in shares_pending) || !(ws.floID in shares_pending[sinkID].shares)) + return; + let { ref, group } = shares_pending[sinkID], + shares = shares_pending[sinkID].shares[ws.floID]; + delete shares_pending[sinkID].shares[ws.floID]; //delete the share after sending it to respective slave + shares.forEach(s => ws.send(JSON.stringify({ + command: "SINK_SHARE", + sinkID, ref, group, + share: floCrypto.encryptData(s, ws.pubKey) + }))); +} + +function sendSharesToNodes(sinkID, group, shares) { + if (discarded_sinks.includes(sinkID)) { //sinkID is discarded, abort the new shares + let i = discarded_sinks.findIndex(sinkID); + discarded_sinks.splice(i, 1); + return; + } + let ref = Date.now(); + shares_pending[sinkID] = { shares, group, ref }; + if (keys.node_id in shares) { + shares_pending[sinkID].shares[keys.node_id].forEach(s => + keys.addShare(group, sinkID, ref, s).then(_ => null).catch(error => console.error(error))); + delete shares_pending[sinkID].shares[keys.node_id]; + } + for (let node in shares) + if (node in connectedSlaves) + sendShares(connectedSlaves[node], sinkID); + keys.sink_chest.set_id(group, sinkID, ref); +} + +function requestShare(ws, group, sinkID) { + ws.send(JSON.stringify({ + command: "SEND_SHARE", + group, sinkID, + pubKey: keys.node_pub + })); +} + +/* +function transferMoneyToNewSink(oldSinkID, oldSinkKey, newSink) { + const transferToken = token => new Promise((resolve, reject) => { + floTokenAPI.getBalance(oldSinkID, token).then(tokenBalance => { + floBlockchainAPI.writeData(oldSinkID, `send ${tokenBalance} ${token}# |Trade-market New sink`, oldSinkKey, newSink.floID, false) + .then(txid => resolve(txid)) + .catch(error => reject(error)) + }) + }); + return new Promise((resolve, reject) => { + console.debug("Transferring tokens to new Sink:", newSink.floID) + Promise.allSettled(tokenList.map(token => transferToken(token))).then(result => { + let failedFlag = false; + tokenList.forEach((token, i) => { + if (result[i].status === "fulfilled") + console.log(token, result[i].value); + else { + failedFlag = true; + console.error(token, result[i].reason); + } + }); + if (failedFlag) + return reject("Some token transfer has failed"); + floBlockchainAPI.getBalance(oldSinkID).then(floBalance => { + floTokenAPI.getBalance(oldSinkID).then(cashBalance => { + floBlockchainAPI.sendTx(oldSinkID, newSink.floID, floBalance - floGlobals.fee, oldSinkKey, `send ${cashBalance} ${floGlobals.currency}# |Trade-market New sink`) + .then(result => resolve(result)) + .catch(error => reject(error)) + }).catch(error => reject(error)); + }).catch(error => reject(error)) + }); + }) +} +*/ + +function collectAndCall(group, sinkID, callback, timeout = null) { + if (!(callback instanceof Function)) + throw Error("callback should be a function"); + if (!(sinkID in shares_collected)) { //if not already collecting shares for sinkID, then initiate collection + shares_collected[sinkID] = { group, ref: 0, callbacks: [], shares: {} }; + for (let floID in connectedSlaves) + requestShare(connectedSlaves[floID], group, sinkID); + keys.getShares(group, sinkID) + .then(({ ref, shares }) => shares.forEach(s => collectShares(sinkID, ref, s))) + .catch(error => console.error(error)) + } + shares_collected[sinkID].callbacks.push(callback); + if (timeout) + setTimeout(() => { + if (sinkID in shares_collected) { + let i = shares_collected[sinkID].callbacks.indexOf(callback); + delete shares_collected[sinkID].callbacks[i]; //deleting will empty the index, but space will be there so that order of other indexes are not affected + } + }, timeout); +} + +collectAndCall.isAlive = (sinkID, callbackRef) => (sinkID in shares_collected && shares_collected[sinkID].callbacks.indexOf(callbackRef) != -1); + +function collectShares(sinkID, ref, share) { + if (_mode !== MASTER_MODE) + return console.warn("Not serving as master"); + if (!(sinkID in shares_collected)) + return console.debug("Received shares for sink thats not been collected right now"); + if (shares_collected[sinkID].ref > ref) + return console.debug("Received expired share"); + else if (shares_collected[sinkID].ref < ref) { + shares_collected[sinkID].ref = ref; + shares_collected[sinkID].shares = []; + } + if (shares_collected[sinkID].shares.includes(share)) + return console.debug("Received duplicate share"); + shares_collected[sinkID].shares.push(share); + try { + let sinkKey = floCrypto.retrieveShamirSecret(shares_collected[sinkID].shares); + if (floCrypto.verifyPrivKey(sinkKey, sinkID)) { + console.debug("Shares collected successfully for", sinkID); + shares_collected[sinkID].callbacks.forEach(fn => fn instanceof Function ? fn(sinkKey) : null); + delete shares_collected[sinkID]; + } + } catch { + //Unable to retrive sink private key. Waiting for more shares! Do nothing for now + }; +} + +function connectWS(floID) { + let url = nodeURL[floID]; + return new Promise((resolve, reject) => { + const ws = new WebSocket('wss://' + url); + ws.on('open', _ => resolve(ws)); + ws.on('error', error => reject(error)); + }) +} + +function connectToMaster(i = 0, init = false) { + if (i >= nodeList.length) { + console.error("No master is found and Node not in list. This should not happen!"); + process.exit(1); + } + let floID = nodeList[i]; + if (floID === keys.node_id) + serveAsMaster(init); + else + connectWS(floID).then(ws => { + ws.floID = floID; + ws.onclose = () => connectToMaster(i); + serveAsSlave(ws, init); + }).catch(error => { + console.log(`Node(${floID}) is offline`); + connectToMaster(i + 1, init) + }); +} + +function informLiveNodes(init) { + let message = { + floID: keys.node_id, + type: "UPDATE_MASTER", + pubKey: keys.node_pub, + req_time: Date.now() + }; + message.sign = floCrypto.signData(message.type + "|" + message.req_time, keys.node_priv); + message = JSON.stringify(message); + let nodes = nodeList.filter(n => n !== keys.node_id); + Promise.allSettled(nodes.map(n => connectWS(n))).then(result => { + let flag = false; + for (let i in result) + if (result[i].status === "fulfilled") { + let ws = result[i].value; + ws.send(message); + ws.close(); + flag = true; + } else + console.warn(`Node(${nodes[i]}) is offline`); + if (init && flag) + syncRequest(); + keys.getStoredList().then(stored_list => { + if (Object.keys(stored_list).length) { + keys.getDiscardedList().then(discarded_list => { + let cur_time = Date.now(); + for (let group in stored_list) + stored_list[group].forEach(id => { + if (!(id in discarded_list)) + reconstructShares(group, id) + else if (cur_time - discarded_list[id] < DISCARD_COOLDOWN) //sinkID still in cooldown period + keys.sink_chest.set_id(group, id, null); + }); + }).catch(error => console.error(error)) + } else if (init && !flag) { + console.log("Starting the server..."); + //generate a sinkID for each group in starting list + keys.sink_groups.initial_list.forEach(group => + generateSink(group).then(_ => null).catch(e => console.error(e))); + } + }).catch(error => console.error(error)); + }); +} + +function syncRequest(cur = keys.node_id) { + //Sync data from next available node + let nextNode = nodeKBucket.nextNode(cur); + if (!nextNode) + return console.warn("No nodes available to Sync"); + connectWS(nextNode) + .then(ws => slave.syncRequest(ws)) + .catch(_ => syncRequest(nextNode)); +} + +function updateMaster(floID) { + let currentMaster = _mode === MASTER_MODE ? keys.node_id : slave.masterWS.floID; + if (nodeList.indexOf(floID) < nodeList.indexOf(currentMaster)) + connectToMaster(); +} + +function reconstructAllActiveShares() { + if (_mode !== MASTER_MODE) + return console.debug("Not serving as master"); + console.debug("Reconstructing shares for all active IDs") + let group_list = keys.sink_groups.list; + group_list.forEach(g => { + //active ids also ignore ids that are in queue for reconstructing shares + let active_ids = keys.sink_chest.active_list(g); + active_ids.forEach(id => reconstructShares(g, id)); + }); +} + +function reconstructShares(group, sinkID) { + if (_mode !== MASTER_MODE) + return console.warn(`Not serving as master, but reconstruct-shares is called for ${sinkID}(${group})`); + keys.sink_chest.set_id(group, sinkID, null); + collectAndCall(group, sinkID, sinkKey => sendSharesToNodes(sinkID, group, generateShares(sinkKey))); +} + +function slaveConnect(floID, pubKey, ws, slave_sinks) { + if (_mode !== MASTER_MODE) + return console.warn("Not serving as master"); + ws.floID = floID; + ws.pubKey = pubKey; + connectedSlaves[floID] = ws; + + //Send shares if need to be delivered + for (let sinkID in shares_pending) + if (floID in shares_pending[sinkID].shares) + sendShares(ws, sinkID); + //Request shares if any + for (let sinkID in shares_collected) + requestShare(ws, shares_collected[sinkID].group, sinkID); + //check if sinks in slaves are present + if (slave_sinks instanceof Object) { + for (let group in slave_sinks) + for (let sinkID of slave_sinks[group]) { + if (!keys.sink_chest.includes(group, sinkID)) + keys.checkIfDiscarded(sinkID) + .then(result => result === false ? reconstructShares(group, sinkID) : null) + .catch(error => console.error(error)) + } + } +} + +const eCode = require('../../docs/scripts/floTradeAPI').errorCode; + +function generateSink(group) { + return new Promise((resolve, reject) => { + if (!keys.sink_groups.generate_list.includes(group)) + return reject(INVALID(eCode.INVALID_VALUE, `Invalid Group ${group}`)); + try { + let newSink = floCrypto.generateNewID(); + console.debug("Generated sink:", group, newSink.floID); + sendSharesToNodes(newSink.floID, group, generateShares(newSink.privKey)); + resolve(`Generated ${newSink.floID} (${group})`); + } catch (error) { + reject(error) + } + }) +} + +function reshareSink(id) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(id)) + return reject(INVALID(eCode.INVALID_VALUE, `Invalid ID ${id}`)); + else { + let group = keys.sink_chest.find_group(id); + if (!group) + return reject(INVALID(eCode.NOT_FOUND, `ID ${id} not found`)); + else keys.checkIfDiscarded(id).then(result => { + if (result) + return reject(INVALID(eCode.NOT_FOUND, `ID is discarded`)); + try { + reconstructShares(group, id); + resolve(`Resharing ${id} (${group})`); + } catch (error) { + reject(error); + } + }).catch(error => reject(error)) + } + + }) +} + +function discardSink(id) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(id)) + return reject(INVALID(eCode.INVALID_VALUE, `Invalid ID ${id}`)); + else if (!keys.sink_chest.find_group(id)) + return reject(INVALID(eCode.NOT_FOUND, `ID ${id} not found`)); + else keys.checkIfDiscarded(id).then(result => { + if (result) + return reject(INVALID(eCode.DUPLICATE_ENTRY, `ID already discarded`)); + keys.discardSink(id).then(result => { + console.debug("Discarded sink:", id); + resolve(result); + }).catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + +function checkForDiscardedSinks() { + let cur_time = Date.now(), + all_sinks = keys.sink_chest.get_all(); + for (let group in all_sinks) + all_sinks[group].forEach(id => keys.checkIfDiscarded(id).then(result => { + console.debug(group, id); //Check if group is correctly mapped, or if its changed by loop + if (result != false) { + if (cur_time - result > DISCARD_COOLDOWN) + keys.sink_chest.rm_id(group, id); + else + keys.sink_chest.set_id(group, id, null); + if (id in shares_collected && !discarded_sinks.includes(id)) + discarded_sinks.push(id); + } + }).catch(error => console.debug(error))) +} + +//Master interval process +function intervalProcess() { + checkForDiscardedSinks(); +} + +intervalProcess.start = () => { + intervalProcess.stop(); + intervalProcess.instance = setInterval(intervalProcess, BACKUP_INTERVAL); +} + +intervalProcess.stop = () => { + if (intervalProcess.instance !== undefined) { + clearInterval(intervalProcess.instance); + delete intervalProcess.instance; + } +} + +//Node becomes master +function serveAsMaster(init) { + console.info('Starting master process'); + slave.stop(); + _mode = MASTER_MODE; + keys.sink_chest.reset(); + intervalProcess.start(); + informLiveNodes(init); + _app.resume(); +} + +//Node becomes slave +function serveAsSlave(ws, init) { + console.info('Starting slave process'); + intervalProcess.stop(); + _app.pause(); + slave.start(ws, init); + _mode = SLAVE_MODE; +} + +//Transmistter +function startBackupTransmitter(server) { + _wss = new WebSocket.Server({ + server + }); + _wss.on('connection', ws => { + ws.on('message', message => { + //verify if from a backup node + try { + let invalid = null, + request = JSON.parse(message); + //console.debug(request); + if (!nodeList.includes(request.floID)) + invalid = `floID ${request.floID} not in nodeList`; + else if (request.floID !== floCrypto.getFloID(request.pubKey)) + invalid = "Invalid pubKey"; + else if (!floCrypto.verifySign(request.type + "|" + request.req_time, request.sign, request.pubKey)) + invalid = "Invalid signature"; + //TODO: check if request time is valid; + else switch (request.type) { + case "BACKUP_SYNC": + sync.sendBackupData(request.last_time, request.checksum, ws); + break; + case "HASH_SYNC": + sync.sendTableHash(request.tables, ws); + break; + case "RE_SYNC": + sync.sendTableData(request.tables, ws); + break; + case "UPDATE_MASTER": + updateMaster(request.floID); + break; + case "SLAVE_CONNECT": + slaveConnect(request.floID, request.pubKey, ws, request.sinks); + break; + case "SINK_SHARE": + collectShares(request.sinkID, request.ref, floCrypto.decryptData(request.share, keys.node_priv)) + default: + invalid = "Invalid Request Type"; + } + if (invalid) + ws.send(JSON.stringify({ + type: request.type, + command: "REQUEST_ERROR", + error: invalid + })); + } catch (error) { + console.error(error); + ws.send(JSON.stringify({ + command: "REQUEST_ERROR", + error: 'Unable to process the request!' + })); + } + }); + ws.on('close', () => { + // remove from connected slaves (if needed) + if (ws.floID in connectedSlaves) + delete connectedSlaves[ws.floID]; + }) + }); +} + +function initProcess(app) { + _app = app; + startBackupTransmitter(_app.server); + connectToMaster(0, true); +} + +module.exports = { + init: initProcess, + collectAndCall, + reconstructAllActiveShares, + sink: { + generate: generateSink, + reshare: reshareSink, + discard: discardSink + }, + set nodeList(list) { + nodeURL = list; + nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL)); + nodeList = nodeKBucket.order; + }, + get nodeList() { + return nodeList; + }, + get wss() { + return _wss; + } +}; \ No newline at end of file diff --git a/src/backup/slave.js b/src/backup/slave.js new file mode 100644 index 0000000..315e101 --- /dev/null +++ b/src/backup/slave.js @@ -0,0 +1,429 @@ +'use strict'; + +const keys = require("../keys"); +const DB = require("../database"); +const { getTableHashes } = require("./sync"); + +const { + BACKUP_INTERVAL, + BACKUP_SYNC_TIMEOUT, + CHECKSUM_INTERVAL, + HASH_N_ROW +} = require("../_constants")["backup"]; + +var masterWS = null; //Container for Master websocket connection + +var intervalID = null; + +function startSlaveProcess(ws, init) { + if (!ws) throw Error("Master WS connection required"); + //stop existing process + stopSlaveProcess(); + //set masterWS + ws.on('message', processDataFromMaster); + masterWS = ws; + let sinks_stored = {}; + Promise.all([keys.getStoredList(), keys.getDiscardedList()]).then(result => { + let stored_list = result[0], + discarded_list = result[1]; + for (let group in stored_list) { + sinks_stored[group] = []; + for (let id of stored_list[group]) + if (!(id in discarded_list)) + sinks_stored[group].push(id); + } + }).catch(error => console.error(error)).finally(_ => { + //inform master + let message = { + floID: keys.node_id, + pubKey: keys.node_pub, + sinks: sinks_stored, + req_time: Date.now(), + type: "SLAVE_CONNECT" + } + message.sign = floCrypto.signData(message.type + "|" + message.req_time, keys.node_priv); + ws.send(JSON.stringify(message)); + //start sync + if (init) + requestInstance.open(); + intervalID = setInterval(() => requestInstance.open(), BACKUP_INTERVAL); + }) +} + +function stopSlaveProcess() { + if (masterWS !== null) { + masterWS.onclose = () => null; + masterWS.close(); + requestInstance.close(); + masterWS = null; + } + if (intervalID !== null) { + clearInterval(intervalID); + intervalID = null; + } +} + +function requestBackupSync(checksum_trigger, ws) { + return new Promise((resolve, reject) => { + DB.query('SELECT MAX(u_time) as last_time FROM _backup').then(result => { + let request = { + floID: keys.node_id, + pubKey: keys.node_pub, + type: "BACKUP_SYNC", + last_time: result[0].last_time, + checksum: checksum_trigger, + req_time: Date.now() + }; + request.sign = floCrypto.signData(request.type + "|" + request.req_time, keys.node_priv); + ws.send(JSON.stringify(request)); + resolve(request); + }).catch(error => reject(error)) + }) +} + +const requestInstance = { + ws: null, + cache: null, + checksum: null, + delete_data: null, + add_data: null, + request: null, + onetime: null, + last_response_time: null, + checksum_count_down: 0 +}; + +requestInstance.open = function (ws = null) { + const self = this; + //Check if there is an active request + if (self.request) { + console.log("A request is already active"); + if (self.last_response_time < Date.now() - BACKUP_SYNC_TIMEOUT) + self.close(); + else + return; + } + //Use websocket connection if passed, else use masterWS if available + if (ws) { + ws.on('message', processDataFromMaster); + self.onetime = true; + } else if (masterWS) + ws = masterWS; + else return console.warn("Not connected to master"); + + requestBackupSync(!self.checksum_count_down || self.onetime, ws).then(request => { + self.request = request; + self.cache = []; + self.last_response_time = Date.now(); + self.ws = ws; + }).catch(error => console.error(error)) +} + +requestInstance.close = function () { + const self = this; + if (self.onetime) + self.ws.close(); + else + self.checksum_count_down = self.checksum_count_down ? self.checksum_count_down - 1 : CHECKSUM_INTERVAL; + self.onetime = null; + self.ws = null; + self.cache = null; + self.checksum = null; + self.delete_data = null; + self.add_data = null; + self.request = null; + self.last_response_time = null; +} + +function processDataFromMaster(message) { + try { + message = JSON.parse(message); + //console.debug("Master:", message); + if (message.command.startsWith("SYNC")) + processBackupData(message); + else switch (message.command) { + case "SINK_SHARE": + storeSinkShare(message.group, message.sinkID, message.share, message.ref); + break; + case "SEND_SHARE": + sendSinkShare(message.group, message.sinkID, message.pubKey); + break; + case "REQUEST_ERROR": + console.log(message.error); + if (message.type === "BACKUP_SYNC") + requestInstance.close(); + break; + } + } catch (error) { + console.error(error); + } +} + +function storeSinkShare(group, sinkID, share, ref) { + share = floCrypto.decryptData(share, keys.node_priv); + keys.addShare(group, sinkID, ref, share) + .then(_ => null).catch(error => console.error(error)); +} + +function sendSinkShare(group, sinkID, pubKey) { + keys.getShares(group, sinkID).then(({ ref, shares }) => { + shares.forEach(s => { + let response = { + type: "SINK_SHARE", + sinkID, ref, + share: floCrypto.encryptData(s, pubKey), + floID: keys.node_id, + pubKey: keys.node_pub, + req_time: Date.now() + } + response.sign = floCrypto.signData(response.type + "|" + response.req_time, keys.node_priv); //TODO: strengthen signature + masterWS.send(JSON.stringify(response)); + }) + }).catch(error => console.error(error)); +} + +function processBackupData(response) { + //TODO: Sync improvements needed. (2 types) + //1. Either sync has to be completed or rollback all + //2. Each table/data should be treated as independent chunks + const self = requestInstance; + self.last_response_time = Date.now(); + switch (response.command) { + case "SYNC_END": + if (response.status) { + storeBackupData(self.cache, self.checksum).then(result => { + updateBackupTable(self.add_data, self.delete_data); + if (result) { + console.log("Backup Sync completed successfully"); + self.close(); + } else + console.log("Waiting for re-sync data"); + }).catch(_ => { + console.warn("Backup Sync was not successful"); + self.close(); + }); + } else { + console.info("Backup Sync was not successful! Failed info: ", response.info); + self.close(); + } + break; + case "SYNC_DELETE": + self.delete_data = response.delete_data; + self.cache.push(cacheBackupData(null, response.delete_data)); + break; + case "SYNC_HEADER": + self.add_data = response.add_data; + break; + case "SYNC_UPDATE": + self.cache.push(cacheBackupData(response.table, response.data)); + break; + case "SYNC_CHECKSUM": + self.checksum = response.checksum; + break; + case "SYNC_HASH": + verifyHash(response.hashes) + .then(mismatch => requestTableChunks(mismatch, self.ws)) + .catch(error => { + console.error(error); + self.close(); + }); + break; + } +} + +const cacheBackupData = (tableName, dataCache) => new Promise((resolve, reject) => { + DB.query("INSERT INTO _backupCache (t_name, data_cache) VALUE (?, ?)", [tableName, JSON.stringify(dataCache)]) + .then(_ => resolve(true)).catch(error => { + console.error(error); + reject(false); + }) +}); + +function storeBackupData(cache_promises, checksum_ref) { + return new Promise((resolve, reject) => { + Promise.allSettled(cache_promises).then(_ => { + console.log("START: BackupCache -> Tables"); + //Process 'Users' table 1st as it provides foreign key attribute to other tables + DB.query("SELECT * FROM _backupCache WHERE t_name=?", ["Users"]).then(data => { + Promise.allSettled(data.map(d => updateTableData("Users", JSON.parse(d.data_cache)))).then(result => { + storeBackupData.commit(data, result).then(_ => { + DB.query("SELECT * FROM _backupCache WHERE t_name IS NOT NULL").then(data => { + Promise.allSettled(data.map(d => updateTableData(d.t_name, JSON.parse(d.data_cache)))).then(result => { + storeBackupData.commit(data, result).then(_ => { + DB.query("SELECT * FROM _backupCache WHERE t_name IS NULL").then(data => { + Promise.allSettled(data.map(d => deleteTableData(JSON.parse(d.data_cache)))).then(result => { + storeBackupData.commit(data, result).then(_ => { + console.log("END: BackupCache -> Tables"); + if (!checksum_ref) //No checksum verification + resolve(true); + else + verifyChecksum(checksum_ref) + .then(result => resolve(result)) + .catch(error => reject(error)) + }); + }) + }) + }) + }) + }).catch(error => { + console.error(error); + console.warn("ABORT: BackupCache -> Tables"); + reject(false); + }); + }) + }) + }).catch(error => { + console.error(error); + console.warn("ABORT: BackupCache -> Tables"); + reject(false); + }) + }) + }) + +} + +storeBackupData.commit = function (data, result) { + let promises = []; + for (let i = 0; i < data.length; i++) + switch (result[i].status) { + case "fulfilled": + promises.push(DB.query("DELETE FROM _backupCache WHERE id=?", data[i].id)); + break; + case "rejected": + console.error(result[i].reason); + promises.push(DB.query("UPDATE _backupCache SET fail=TRUE WHERE id=?", data[i].id)); + break; + } + return Promise.allSettled(promises); +} + +function updateBackupTable(add_data, delete_data) { + //update _backup table for added data + DB.transaction(add_data.map(r => [ + "INSERT INTO _backup (t_name, id, mode, u_time) VALUE (?, ?, TRUE, ?) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=?", + [r.t_name, r.id, validateValue(r.u_time), validateValue(r.u_time)] + ])).then(_ => null).catch(error => console.error(error)); + //update _backup table for deleted data + DB.transaction(delete_data.map(r => [ + "INSERT INTO _backup (t_name, id, mode, u_time) VALUE (?, ?, NULL, ?) ON DUPLICATE KEY UPDATE mode=NULL, u_time=?", + [r.t_name, r.id, validateValue(r.u_time), validateValue(r.u_time)] + ])).then(_ => null).catch(error => console.error(error)); +} + +function deleteTableData(data) { + return new Promise((resolve, reject) => { + let delete_needed = {}; + data.forEach(r => r.t_name in delete_needed ? delete_needed[r.t_name].push(r.id) : delete_needed[r.t_name] = [r.id]); + let queries = []; + for (let table in delete_needed) + queries.push(["DELETE FROM ?? WHERE id IN (?)", [table, delete_needed[table]]]); + DB.transaction(queries).then(_ => resolve(true)).catch(error => reject(error)); + }) +} + +function updateTableData(table, data) { + return new Promise((resolve, reject) => { + if (!data.length) + return resolve(null); + let cols = Object.keys(data[0]); + let values = data.map(r => cols.map(c => validateValue(r[c]))); + let statement = "INSERT INTO ?? (??) VALUES ? ON DUPLICATE KEY UPDATE " + Array(cols.length).fill("??=VALUES(??)").join(); + let query_values = [table, cols, values]; + cols.forEach(c => query_values.push(c, c)); + DB.query(statement, query_values).then(_ => resolve(true)).catch(error => reject(error)); + }) +} + +const validateValue = val => (typeof val === "string" && /\.\d{3}Z$/.test(val)) ? new Date(val) : val; + +function verifyChecksum(checksum_ref) { + return new Promise((resolve, reject) => { + DB.query("CHECKSUM TABLE ??", [Object.keys(checksum_ref)]).then(result => { + let checksum = Object.fromEntries(result.map(r => [r.Table.split(".").pop(), r.Checksum])); + let mismatch = []; + for (let table in checksum) + if (checksum[table] != checksum_ref[table]) + mismatch.push(table); + //console.debug("Checksum-mismatch:", mismatch); + if (!mismatch.length) //Checksum of every table is verified. + resolve(true); + else { //If one or more tables checksum is not correct, re-request the table data + requestHash(mismatch); + resolve(false); + } + }).catch(error => { + console.error(error); + reject(false); + }) + }) +} + +function requestHash(tables) { + //TODO: resync only necessary data (instead of entire table) + let self = requestInstance; + let request = { + floID: keys.node_id, + pubKey: keys.node_pub, + type: "HASH_SYNC", + tables: tables, + req_time: Date.now() + }; + request.sign = floCrypto.signData(request.type + "|" + request.req_time, keys.node_priv); + self.ws.send(JSON.stringify(request)); + self.request = request; + self.checksum = null; + self.cache = []; +} + +function verifyHash(hashes) { + const convertIntArray = obj => Object.keys(obj).map(i => parseInt(i)); + const checkHash = (table, hash_ref) => new Promise((res, rej) => { + getTableHashes(table).then(hash_cur => { + for (let i in hash_ref) + if (hash_ref[i] === hash_cur[i]) { + delete hash_ref[i]; + delete hash_cur[i]; + } + res([convertIntArray(hash_ref), convertIntArray(hash_cur)]); + }).catch(error => rej(error)) + }) + return new Promise((resolve, reject) => { + let tables = Object.keys(hashes); + Promise.allSettled(tables.map(t => checkHash(t, hashes[t]))).then(result => { + let mismatch = {}; + for (let t in tables) + if (result[t].status === "fulfilled") { + mismatch[tables[t]] = result[t].value; //Data that are incorrect/missing/deleted + //Data to be deleted (incorrect data will be added by resync) + let id_end = result[t].value[1].map(i => i * HASH_N_ROW); //eg if i=2 AND H_R_C = 5 then id_end = 2 * 5 = 10 (ie, range 6-10) + Promise.allSettled(id_end.map(i => + DB.query("DELETE FROM ?? WHERE id BETWEEN ? AND ?", [tables[t], i - HASH_N_ROW + 1, i]) //eg, i - HASH_N_ROW + 1 = 10 - 5 + 1 = 6 + )).then(_ => null); + } else + console.error(result[t].reason); + //console.debug("Hash-mismatch", mismatch); + resolve(mismatch); + }).catch(error => reject(error)) + }) +} + +function requestTableChunks(tables, ws) { + let request = { + floID: keys.node_id, + pubKey: keys.node_pub, + type: "RE_SYNC", + tables: tables, + req_time: Date.now() + }; + request.sign = floCrypto.signData(request.type + "|" + request.req_time, keys.node_priv); + ws.send(JSON.stringify(request)); +} + +module.exports = { + get masterWS() { + return masterWS; + }, + start: startSlaveProcess, + stop: stopSlaveProcess, + syncRequest: ws => requestInstance.open(ws) +} \ No newline at end of file diff --git a/src/backup/sync.js b/src/backup/sync.js new file mode 100644 index 0000000..313e2d8 --- /dev/null +++ b/src/backup/sync.js @@ -0,0 +1,282 @@ +'use strict'; + +const DB = require("../database"); + +const { + HASH_N_ROW +} = require("../_constants")["backup"]; + +//Backup Transfer +function sendBackupData(last_time, checksum, ws) { + if (!last_time) last_time = 0; + else if (typeof last_time === "string" && /\.\d{3}Z$/.test(last_time)) + last_time = last_time.substring(0, last_time.length - 1); + let promises = [ + backupSync_data(last_time, ws), + backupSync_delete(last_time, ws) + ]; + if (checksum) + promises.push(backupSync_checksum(ws)); + Promise.allSettled(promises).then(result => { + let failedSync = []; + result.forEach(r => r.status === "rejected" ? failedSync.push(r.reason) : null); + if (failedSync.length) { + console.info("Backup Sync Failed:", failedSync); + ws.send(JSON.stringify({ + command: "SYNC_END", + status: false, + info: failedSync + })); + } else { + console.info("Backup Sync completed"); + ws.send(JSON.stringify({ + command: "SYNC_END", + status: true + })); + } + }); +} + +function backupSync_delete(last_time, ws) { + return new Promise((resolve, reject) => { + DB.query("SELECT * FROM _backup WHERE mode is NULL AND u_time > ?", [last_time]).then(result => { + ws.send(JSON.stringify({ + command: "SYNC_DELETE", + delete_data: result + })); + resolve("deleteSync"); + }).catch(error => { + console.error(error); + reject("deleteSync"); + }); + }) +} + +function backupSync_data(last_time, ws) { + const sendTable = (table, id_list) => new Promise((res, rej) => { + DB.query("SELECT * FROM ?? WHERE id IN (?)", [table, id_list]) + .then(data => { + ws.send(JSON.stringify({ + table, + command: "SYNC_UPDATE", + data + })); + res(table); + }).catch(error => { + console.error(error); + rej(table); + }); + }); + return new Promise((resolve, reject) => { + DB.query("SELECT * FROM _backup WHERE mode=TRUE AND u_time > ?", [last_time]).then(result => { + let sync_needed = {}; + result.forEach(r => r.t_name in sync_needed ? sync_needed[r.t_name].push(r.id) : sync_needed[r.t_name] = [r.id]); + ws.send(JSON.stringify({ + command: "SYNC_HEADER", + add_data: result + })); + let promises = []; + for (let table in sync_needed) + promises.push(sendTable(table, sync_needed[table])); + Promise.allSettled(promises).then(result => { + let failedTables = []; + result.forEach(r => r.status === "rejected" ? failedTables.push(r.reason) : null); + if (failedTables.length) + reject(["dataSync", failedTables]); + else + resolve("dataSync"); + }); + }).catch(error => { + console.error(error); + reject("dataSync"); + }); + }); +} + +function backupSync_checksum(ws) { + return new Promise((resolve, reject) => { + DB.query("SELECT DISTINCT t_name FROM _backup").then(result => { + let tableList = result.map(r => r['t_name']); + if (!tableList.length) + return resolve("checksum"); + DB.query("CHECKSUM TABLE ??", [tableList]).then(result => { + let checksum = Object.fromEntries(result.map(r => [r.Table.split(".").pop(), r.Checksum])); + ws.send(JSON.stringify({ + command: "SYNC_CHECKSUM", + checksum: checksum + })); + resolve("checksum"); + }).catch(error => { + console.error(error); + reject("checksum"); + }) + }).catch(error => { + console.error(error); + reject("checksum"); + }) + }) + +} + +function sendTableHash(tables, ws) { + Promise.allSettled(tables.map(t => getTableHashes(t))).then(result => { + let hashes = {}; + for (let i in tables) + if (result[i].status === "fulfilled") + hashes[tables[i]] = result[i].value; + else + console.error(result[i].reason); + ws.send(JSON.stringify({ + command: "SYNC_HASH", + hashes: hashes + })); + }) +} + +function getTableHashes(table) { + return new Promise((resolve, reject) => { + DB.query("SHOW COLUMNS FROM ??", [table]).then(result => { + //columns + let columns = result.map(r => r["Field"]).sort(); + //select statement + let statement = "SELECT CEIL(id/?) as group_id,"; + let query_values = [HASH_N_ROW]; + //aggregate column values + let col_aggregate = columns.map(c => "IFNULL(CRC32(??), 0)").join('+'); + columns.forEach(c => query_values.push(c)); + //aggregate rows via group by + statement += " SUM(CRC32(MD5(" + col_aggregate + "))) as hash FROM ?? GROUP BY group_id ORDER BY group_id"; + query_values.push(table); + //query + DB.query(statement, query_values) + .then(result => resolve(Object.fromEntries(result.map(r => [r.group_id, r.hash])))) + .catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + +function sendTableData(tables, ws) { + let promises = [ + tableSync_data(tables, ws), + tableSync_delete(tables, ws), + tableSync_checksum(Object.keys(tables), ws) + ]; + Promise.allSettled(promises).then(result => { + let failedSync = []; + result.forEach(r => r.status === "rejected" ? failedSync.push(r.reason) : null); + if (failedSync.length) { + console.info("Backup Sync Failed:", failedSync); + ws.send(JSON.stringify({ + command: "SYNC_END", + status: false, + info: failedSync + })); + } else { + console.info("Backup Sync completed"); + ws.send(JSON.stringify({ + command: "SYNC_END", + status: true + })); + } + }); +} + +function tableSync_delete(tables, ws) { + let getDelete = (table, group_id) => new Promise((res, rej) => { + let id_end = group_id * HASH_N_ROW, + id_start = id_end - HASH_N_ROW + 1; + DB.query("SELECT * FROM _backup WHERE t_name=? AND mode is NULL AND (id BETWEEN ? AND ?)", [table, id_start, id_end]) + .then(result => res(result)) + .catch(error => rej(error)) + }) + return new Promise((resolve, reject) => { + let promises = []; + for (let t in tables) + for (let g_id in tables[t][1]) //tables[t] is [convertIntArray(hash_ref), convertIntArray(hash_cur)] + promises.push(getDelete(t, g_id)); + Promise.allSettled(promises).then(results => { + let delete_sync = results.filter(r => r.status === "fulfilled").map(r => r.value); //Filtered results + delete_sync = [].concat(...delete_sync); //Convert 2d array into 1d + ws.send(JSON.stringify({ + command: "SYNC_DELETE", + delete_data: delete_sync + })); + resolve("deleteSync"); + }); + }) +} + +function tableSync_data(tables, ws) { + const sendTable = (table, group_id) => new Promise((res, rej) => { + let id_end = group_id * HASH_N_ROW, + id_start = id_end - HASH_N_ROW + 1; + DB.query("SELECT * FROM ?? WHERE id BETWEEN ? AND ?", [table, id_start, id_end]).then(data => { + ws.send(JSON.stringify({ + table, + command: "SYNC_UPDATE", + data + })); + res(table); + }).catch(error => { + console.error(error); + rej(table); + }); + }); + const getUpdate = (table, group_id) => new Promise((res, rej) => { + let id_end = group_id * HASH_N_ROW, + id_start = id_end - HASH_N_ROW + 1; + DB.query("SELECT * FROM _backup WHERE t_name=? AND mode=TRUE AND (id BETWEEN ? AND ?)", [table, id_start, id_end]) + .then(result => res(result)) + .catch(error => rej(error)) + }) + return new Promise((resolve, reject) => { + let promises = []; + for (let t in tables) + for (let g_id of tables[t][0]) //tables[t] is [convertIntArray(hash_ref), convertIntArray(hash_cur)] + promises.push(getUpdate(t, g_id)); + Promise.allSettled(promises).then(results => { + let update_sync = results.filter(r => r.status === "fulfilled").map(r => r.value); //Filtered results + update_sync = [].concat(...update_sync); //Convert 2d array into 1d + ws.send(JSON.stringify({ + command: "SYNC_HEADER", + add_data: update_sync + })); + let promises = []; + for (let t in tables) + for (let g_id of tables[t][0]) //tables[t] is [convertIntArray(hash_ref), convertIntArray(hash_cur)] + promises.push(sendTable(t, g_id)); + Promise.allSettled(promises).then(result => { + let failedTables = []; + result.forEach(r => r.status === "rejected" ? failedTables.push(r.reason) : null); + if (failedTables.length) + reject(["dataSync", [...new Set(failedTables)]]); + else + resolve("dataSync"); + }); + }); + }) +} + +function tableSync_checksum(tables, ws) { + return new Promise((resolve, reject) => { + DB.query("CHECKSUM TABLE ??", [tables]).then(result => { + let checksum = Object.fromEntries(result.map(r => [r.Table.split(".").pop(), r.Checksum])); + ws.send(JSON.stringify({ + command: "SYNC_CHECKSUM", + checksum: checksum + })); + resolve("checksum"); + }).catch(error => { + console.error(error); + reject("checksum"); + }) + }) + +} + +module.exports = { + getTableHashes, + sendBackupData, + sendTableHash, + sendTableData +} \ No newline at end of file diff --git a/src/blockchain.js b/src/blockchain.js new file mode 100644 index 0000000..1209580 --- /dev/null +++ b/src/blockchain.js @@ -0,0 +1,123 @@ +'use strict'; + +const pCode = require('../docs/scripts/floTradeAPI').processCode; +const { collectAndCall } = require('./backup/head'); +const keys = require('./keys'); +const DB = require("./database"); + +const TYPE_VAULT = "VAULT" + +const SINK_GROUP = { + [TYPE_VAULT]: keys.sink_groups.TRADE +} + +const balance_locked = {}, + balance_cache = {}, + callbackCollection = { + [TYPE_VAULT]: {} + }; + +function getBalance(sinkID, asset) { + switch (asset) { + case "FLO": + return floBlockchainAPI.getBalance(sinkID); + case "BTC": + let btc_id = btcOperator.convert.legacy2bech(sinkID); + return btcOperator.getBalance(btc_id); + default: + return floTokenAPI.getBalance(sinkID, asset); + } +} + +function getSinkID(type, quantity, asset, sinkList = null) { + return new Promise((resolve, reject) => { + if (!sinkList) + sinkList = keys.sink_chest.list(SINK_GROUP[type]).map(s => [s, s in balance_cache ? balance_cache[s][asset] || 0 : 0]) //TODO: improve sorting + .sort((a, b) => b[1] - a[1]).map(x => x[0]); + if (!sinkList.length) + return reject(`Insufficient balance for asset(${asset}) in chest(${SINK_GROUP[type]})`); + let sinkID = sinkList.shift(); + getBalance(sinkID, asset).then(balance => { + if (!(sinkID in balance_cache)) + balance_cache[sinkID] = {}; + balance_cache[sinkID][asset] = balance; + if (balance > (quantity + (sinkID in balance_locked ? balance_locked[sinkID][asset] || 0 : 0))) + return resolve(sinkID); + else + getSinkID(type, quantity, asset, sinkList) + .then(result => resolve(result)) + .catch(error => reject(error)) + }).catch(error => { + console.error(error); + getSinkID(type, quantity, asset, sinkList) + .then(result => resolve(result)) + .catch(error => reject(error)) + }); + }) +} + +const WITHDRAWAL_MESSAGE = { + [TYPE_VAULT]: "(withdrawal from market)" +} + +function sendTx(floID, asset, quantity, sinkID, sinkKey, message) { + switch (asset) { + case "FLO": + return floBlockchainAPI.sendTx(sinkID, floID, quantity, sinkKey, message); + case "BTC": + let btc_sinkID = btcOperator.convert.legacy2bech(sinkID), + btc_receiver = btcOperator.convert.legacy2bech(floID); + return btcOperator.sendTx(btc_sinkID, sinkKey, btc_receiver, quantity, null, { fee_from_receiver: true }); + default: + return floTokenAPI.sendToken(sinkKey, quantity, floID, message, asset); + } +} + +const updateSyntax = { + [TYPE_VAULT]: "UPDATE VaultTransactions SET r_status=?, txid=? WHERE id=?" +}; + +function sendAsset(floID, asset, quantity, type, id) { + quantity = global.toStandardDecimal(quantity); + getSinkID(type, quantity, asset).then(sinkID => { + let callback = (sinkKey) => { + //Send asset to user via API + sendTx(floID, asset, quantity, sinkID, sinkKey, WITHDRAWAL_MESSAGE[type]).then(txid => { + if (!txid) + console.error("Transaction not successful"); + else //Transaction was successful, Add in database + DB.query(updateSyntax[type], [pCode.STATUS_CONFIRMATION, txid, id]) + .then(_ => null).catch(error => console.error(error)); + }).catch(error => console.error(error)).finally(_ => { + delete callbackCollection[type][id]; + balance_locked[sinkID][asset] -= quantity; + }); + } + collectAndCall(sinkID, callback); //TODO: add timeout to prevent infinite wait + callbackCollection[type][id] = callback; + if (!(sinkID in balance_locked)) + balance_locked[sinkID] = {}; + balance_locked[sinkID][asset] = (balance_locked[sinkID][asset] || 0) + quantity; + }).catch(error => console.error(error)) +} + +function withdrawAsset_init(floID, asset, amount) { + amount = global.toStandardDecimal(amount); + let asset_type = ["FLO", "BTC"].includes(asset) ? pCode.ASSET_TYPE_COIN : pCode.ASSET_TYPE_TOKEN; + DB.query("INSERT INTO VaultTransactions (floID, mode, asset_type, asset, amount, r_status) VALUES (?)", [[floID, pCode.VAULT_MODE_WITHDRAW, asset_type, asset, amount, pCode.STATUS_PENDING]]) + .then(result => sendAsset(floID, asset, amount, TYPE_VAULT, result.insertId)) + .catch(error => console.error(error)) +} + +function withdrawAsset_retry(floID, asset, amount, id) { + if (id in callbackCollection[TYPE_VAULT]) + console.debug("A callback is already pending for this Coin transfer"); + else sendAsset(floID, asset, amount, TYPE_VAULT, id); +} + +module.exports = { + withdrawAsset: { + init: withdrawAsset_init, + retry: withdrawAsset_retry + } +} \ No newline at end of file diff --git a/src/coupling.js b/src/coupling.js new file mode 100644 index 0000000..e494199 --- /dev/null +++ b/src/coupling.js @@ -0,0 +1,203 @@ +'use strict'; + +const price = require("./price"); +const DB = require("./database"); + +const { + WAIT_TIME, + TRADE_HASH_PREFIX +} = require("./_constants")["market"]; + +const updateBalance = {}; +updateBalance.consume = (floID, asset, amount) => ["UPDATE UserBalance SET quantity=quantity-? WHERE floID=? AND asset=?", [amount, floID, asset]]; +updateBalance.add = (floID, asset, amount) => ["INSERT INTO UserBalance (floID, asset, quantity) VALUE (?) ON DUPLICATE KEY UPDATE quantity=quantity+?", [[floID, asset, amount], amount]]; + +const couplingInstance = {}, + couplingTimeout = {}; + +function stopAllInstance() { + for (let asset in couplingTimeout) { + if (couplingTimeout[asset]) + clearTimeout(couplingTimeout[asset]); + delete couplingInstance[asset]; + delete couplingTimeout[asset]; + } +} + +function startCouplingForAsset(asset) { + console.debug("startCouplingForAsset") + if (couplingInstance[asset] === true) { //if coupling is already running for asset + /*if (updatePrice) { //wait until current instance is over + if (couplingTimeout[asset]) clearTimeout(couplingTimeout[asset]); + couplingTimeout[asset] = setTimeout(() => startCouplingForAsset(asset, true), WAIT_TIME); + }*/ + return; + } + couplingInstance[asset] = true; //set instance as running + recursiveCoupling(asset); +} + +const getBestSeller = (asset) => new Promise((resolve, reject) => { + DB.query("SELECT SellOrder.id, SellOrder.floID, SellOrder.quantity, SellOrder.price FROM SellOrder" + + " INNER JOIN UserBalance ON UserBalance.floID = SellOrder.floID AND UserBalance.asset = SellOrder.asset" + + " WHERE UserBalance.quantity >= SellOrder.quantity AND SellOrder.asset = ?" + + " ORDER BY SellOrder.price ASC, SellOrder.time_placed ASC" + + " LIMIT 1", [asset] + ).then(result => { + if (result.length) + resolve(result[0]); + else + resolve(null); + }).catch(error => reject(error)) +}); + +const getBestBuyer = (asset, sell_price) => new Promise((resolve, reject) => { + DB.query("SELECT BuyOrder.id, BuyOrder.floID, BuyOrder.quantity FROM BuyOrder" + + " INNER JOIN UserBalance ON UserBalance.floID = BuyOrder.floID AND UserBalance.asset = ?" + + " WHERE UserBalance.quantity >= BuyOrder.maxPrice * BuyOrder.quantity AND BuyOrder.asset = ? AND BuyOrder.maxPrice >= ?" + + " ORDER BY BuyOrder.maxPrice DESC, BuyOrder.time_placed ASC" + + " LIMIT 1", [floGlobals.currency, asset, sell_price] + ).then(result => { + if (result.length) + resolve(result[0]); + else + resolve(null); + }).catch(error => reject(error)) +}); + +function recursiveCoupling(asset) { + console.debug("recursiveCoupling") + processCoupling(asset).then(result => { + if (!result) { //no valid orders to pair: exit pairing + console.debug("No valid orders to pair"); + delete couplingInstance[asset]; + return; + } + console.log(result); + if (couplingInstance[asset] === true) + recursiveCoupling(asset); + }).catch(error => { + console.error(error) + delete couplingInstance[asset]; + }) +} + +function processCoupling(asset) { + return new Promise((resolve, reject) => { + getBestSeller(asset).then(best_sell => { + console.debug("Sell:", best_sell); + if (!best_sell) //no valid sell orders + return resolve(null); + let sell_rate = best_sell.price; + getBestBuyer(asset, sell_rate).then(best_buy => { + console.debug("Buy:", best_buy); + if (!best_buy) //no valid buy orders + return resolve(null); + let quantity = Math.min(best_buy.quantity, best_sell.quantity); + let txQueries = processOrders(best_sell, best_buy, asset, sell_rate, quantity); + //begin audit + beginAudit(best_sell.floID, best_buy.floID, asset, sell_rate, quantity).then(audit => { + //process txn query in SQL + DB.transaction(txQueries).then(_ => { + audit.end(); + price.storeHistory(asset, sell_rate); + resolve(`Transaction was successful! BuyOrder:${best_buy.id}| SellOrder:${best_sell.id}`) + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }).catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + +function processOrders(seller_best, buyer_best, asset, sell_rate, quantity) { + let txQueries = []; + if (quantity > buyer_best.quantity || quantity > seller_best.quantity) + throw Error("Tx quantity cannot be more than order quantity"); + + //Process Buy Order + if (quantity == buyer_best.quantity) + txQueries.push(["DELETE FROM BuyOrder WHERE id=?", [buyer_best.id]]); + else + txQueries.push(["UPDATE BuyOrder SET quantity=quantity-? WHERE id=?", [quantity, buyer_best.id]]); + + //Process Sell Order + if (quantity == seller_best.quantity) + txQueries.push(["DELETE FROM SellOrder WHERE id=?", [seller_best.id]]); + else + txQueries.push(["UPDATE SellOrder SET quantity=quantity-? WHERE id=?", [quantity, seller_best.id]]); + + //Update cash/asset balance for seller and buyer + let totalAmount = sell_rate * quantity; + txQueries.push(updateBalance.add(seller_best.floID, floGlobals.currency, totalAmount)); + txQueries.push(updateBalance.consume(buyer_best.floID, floGlobals.currency, totalAmount)); + txQueries.push(updateBalance.consume(seller_best.floID, asset, quantity)); + txQueries.push(updateBalance.add(buyer_best.floID, asset, quantity)); + + //Record transaction + let time = Date.now(); + let hash = TRADE_HASH_PREFIX + Crypto.SHA256(JSON.stringify({ + seller: seller_best.floID, + buyer: buyer_best.floID, + asset: asset, + quantity: quantity, + unitValue: sell_rate, + tx_time: time, + })); + txQueries.push([ + "INSERT INTO TradeTransactions (seller, buyer, asset, quantity, unitValue, tx_time, txid) VALUES (?)", + [[seller_best.floID, buyer_best.floID, asset, quantity, sell_rate, new Date(time), hash]] + ]); + + return txQueries; +} + +function beginAudit(sellerID, buyerID, asset, unit_price, quantity) { + return new Promise((resolve, reject) => { + auditBalance(sellerID, buyerID, asset).then(old_bal => resolve({ + end: () => endAudit(sellerID, buyerID, asset, old_bal, unit_price, quantity) + })).catch(error => reject(error)) + }) +} + +function endAudit(sellerID, buyerID, asset, old_bal, unit_price, quantity) { + auditBalance(sellerID, buyerID, asset).then(new_bal => { + DB.query("INSERT INTO AuditTrade (asset, quantity, unit_price, total_cost," + + " sellerID, seller_old_cash, seller_old_asset, seller_new_cash, seller_new_asset," + + " buyerID, buyer_old_cash, buyer_old_asset, buyer_new_cash, buyer_new_asset)" + + " Value (?)", [[ + asset, quantity, unit_price, quantity * unit_price, + sellerID, old_bal[sellerID].cash, old_bal[sellerID].asset, new_bal[sellerID].cash, new_bal[sellerID].asset, + buyerID, old_bal[buyerID].cash, old_bal[buyerID].asset, new_bal[buyerID].cash, new_bal[buyerID].asset, + ]]).then(_ => null).catch(error => console.error(error)) + }).catch(error => console.error(error)); +} + +function auditBalance(sellerID, buyerID, asset) { + return new Promise((resolve, reject) => { + let balance = { + [sellerID]: { + cash: 0, + asset: 0 + }, + [buyerID]: { + cash: 0, + asset: 0 + } + }; + DB.query("SELECT floID, quantity, asset FROM UserBalance WHERE floID IN (?) AND asset IN (?)", [[sellerID, buyerID], [floGlobals.currency, asset]]).then(result => { + for (let i in result) { + if (result[i].asset === floGlobals.currency) + balance[result[i].floID].cash = result[i].quantity; + else if (result[i].asset === asset) + balance[result[i].floID].asset = result[i].quantity; + } + resolve(balance); + }).catch(error => reject(error)) + }) +} + +module.exports = { + initiate: startCouplingForAsset, + stopAll: stopAllInstance, + updateBalance +} \ No newline at end of file diff --git a/src/database.js b/src/database.js new file mode 100644 index 0000000..affa349 --- /dev/null +++ b/src/database.js @@ -0,0 +1,104 @@ +'use strict'; +var mysql = require('mysql'); + +var pool;//container for connected pool; + +function connectToDatabase(user, password, dbname, host = 'localhost') { + return new Promise((resolve, reject) => { + pool = mysql.createPool({ + host: host, + user: user, + password: password, + database: dbname, + //dateStrings : true, + //timezone: 'UTC' + }); + getConnection().then(conn => { + conn.release(); + resolve(pool); + }).catch(error => reject(error)); + }); +} + +function getConnection() { + return new Promise((resolve, reject) => { + if (!pool) + return reject("Database not connected"); + pool.getConnection((error, conn) => { + if (error) + reject(error); + else + resolve(conn); + }); + }) +} + +function SQL_query(sql, values) { + return new Promise((resolve, reject) => { + getConnection().then(conn => { + const fn = (err, res) => { + conn.release(); + (err ? reject(err) : resolve(res)); + }; + if (values) + conn.query(sql, values, fn); + else + conn.query(sql, fn); + }).catch(error => reject(error)); + }) +} + +function SQL_transaction(queries) { + return new Promise((resolve, reject) => { + getConnection().then(conn => { + conn.beginTransaction(err => { + if (err) + conn.rollback(() => { + conn.release(); + reject(err); + }); + else { + (function queryFn(result) { + if (!queries.length) { + conn.commit(err => { + if (err) + conn.rollback(() => { + conn.release(); + reject(err); + }); + else { + conn.release(); + resolve(result); + } + }); + } else { + let q_i = queries.shift(); + const callback = function (err, res) { + if (err) + conn.rollback(() => { + conn.release(); + reject(err); + }); + else { + result.push(res); + queryFn(result); + } + }; + if (!Array.isArray(q_i)) + q_i = [q_i]; + if (q_i[1]) + conn.query(q_i[0], q_i[1], callback); + else + conn.query(q_i[0], callback); + } + })([]); + } + }); + }).catch(error => reject(error)); + }) +} +module.exports = { + connect: connectToDatabase, + query: SQL_query, + transaction: SQL_transaction +}; \ No newline at end of file diff --git a/src/keys.js b/src/keys.js new file mode 100644 index 0000000..d47b8f7 --- /dev/null +++ b/src/keys.js @@ -0,0 +1,465 @@ +'use strict'; + +const fs = require('fs'); +const path = require('path'); +const lockfile = require('proper-lockfile'); +const DB = require("./database"); + +var _I = ""; //Instance support +for (let arg of process.argv) + if (/^-I=/.test(arg)) { + _I = arg.split(/=(.*)/s)[1]; + break; + } + +const { + SHARES_PER_NODE, + SHARE_THRESHOLD, + SHUFFLE_INTERVAL +} = require("./_constants")["keys"]; + +const PRIV_EKEY_MIN = 32, + PRIV_EKEY_MAX = 48, + PRIME_FILE_TYPE = 'binary', + INDEX_FILE_TYPE = 'utf-8', + UNSIGNED_INT_MIN = 0, + UNSIGNED_INT_MAX = 4294967295, + INDEX_FILE_NAME_LENGTH = 16, + INDEX_FILE_EXT = '.txt', + MIN_DUMMY_FILES = 16, + MAX_DUMMY_FILES = 24, + MIN_DUMMY_SIZE_MUL = 0.5, + MAX_DUMMY_SIZE_MUL = 1.5, + SIZE_FACTOR = 100, + LOCK_RETRY_MIN_TIME = 1 * 1000, + LOCK_RETRY_MAX_TIME = 2 * 1000; + +var node_priv, e_key, node_id, node_pub; //containers for node-key wrapper +const _x = { + get node_priv() { + if (!node_priv || !e_key) + throw Error("keys not set"); + return Crypto.AES.decrypt(node_priv, e_key); + }, + set node_priv(key) { + node_pub = floCrypto.getPubKeyHex(key); + node_id = floCrypto.getFloID(node_pub); + if (!key || !node_pub || !node_id) + throw Error("Invalid Keys"); + let n = floCrypto.randInt(PRIV_EKEY_MIN, PRIV_EKEY_MAX) + e_key = floCrypto.randString(n); + node_priv = Crypto.AES.encrypt(key, e_key); + }, + args_dir: path.resolve(__dirname, '..', 'args'), + get index_dir() { + return path.join(this.args_dir, `indexes${_I}`) + }, + get prime_file() { + return path.join(this.args_dir, `prime_index${_I}.b`) + }, + get index_file() { + try { + let data = fs.readFileSync(this.prime_file, PRIME_FILE_TYPE), + fname = Crypto.AES.decrypt(data, this.node_priv); + return path.join(this.index_dir, fname + INDEX_FILE_EXT); + } catch (error) { + console.debug(error); + throw Error("Prime-Index Missing/Corrupted"); + } + } +} + +function initialize() { + return new Promise((resolve, reject) => { + fs.readFile(_x.prime_file, PRIME_FILE_TYPE, (err, res) => { + var data, cur_filename, new_filename, priv_key; + try { + priv_key = _x.node_priv; + } catch (error) { + return reject(error); + } + if (!err) { + if (res.length) { //prime file not empty + try { + cur_filename = Crypto.AES.decrypt(res, priv_key); + } catch (error) { + console.debug(error); + return reject("Prime file corrupted"); + } try { //read data from index file + let tmp = fs.readFileSync(path.join(_x.index_dir, cur_filename + INDEX_FILE_EXT), INDEX_FILE_TYPE); + tmp = Crypto.AES.decrypt(tmp, priv_key); + JSON.parse(tmp); //check if data is JSON parse-able + data = tmp; + } catch (error) { + console.debug(error); + return reject("Index file corrupted"); + } + } + } + try { + if (!fs.existsSync(_x.index_dir)) { + fs.mkdirSync(_x.index_dir); + } + } catch (error) { + console.debug(error); + return reject("Index directory creation failed"); + } + try { //delete all old dummy files + let files = fs.readdirSync(_x.index_dir); + for (const file of files) + if (!cur_filename || file !== cur_filename + INDEX_FILE_EXT) //check if file is current file + fs.unlinkSync(path.join(_x.index_dir, file)); + } catch (error) { + console.debug(error); + return reject("Clear index directory failed"); + } try { //create files (dummy and new index file) + let N = floCrypto.randInt(MIN_DUMMY_FILES, MAX_DUMMY_FILES), + k = floCrypto.randInt(0, N); + if (typeof data === 'undefined' || data.length == 0) //no existing data, initialize + data = JSON.stringify({}); + let data_size = data.length; + for (let i = 0; i <= N; i++) { + let f_data, f_name = floCrypto.randString(INDEX_FILE_NAME_LENGTH); + if (i == k) { + new_filename = f_name; + f_data = data; + } else { + let d_size = data_size * (floCrypto.randInt(MIN_DUMMY_SIZE_MUL * SIZE_FACTOR, MAX_DUMMY_SIZE_MUL * SIZE_FACTOR) / SIZE_FACTOR); + f_data = floCrypto.randString(d_size, false); + } + f_data = Crypto.AES.encrypt(f_data, priv_key); + fs.writeFileSync(path.join(_x.index_dir, f_name + INDEX_FILE_EXT), f_data, INDEX_FILE_TYPE); + } + } catch (error) { + console.debug(error); + return reject("Index file creation failed"); + } try { //update prime file + let en_filename = Crypto.AES.encrypt(new_filename, priv_key); + fs.writeFileSync(_x.prime_file, en_filename, PRIME_FILE_TYPE); + } catch (error) { + console.debug(error); + return reject("Update prime file failed"); + } + if (cur_filename) + fs.unlink(path.join(_x.index_dir, cur_filename + INDEX_FILE_EXT), err => err ? console.debug(err) : null); + shuffle.interval = setInterval(shuffle, SHUFFLE_INTERVAL); + resolve("Key management initiated"); + }) + }) +} + +function shuffle() { + readIndexFile().then(data => { + let new_filename, cur_filename = Crypto.AES.decrypt(fs.readFileSync(_x.prime_file, PRIME_FILE_TYPE), _x.node_priv); + fs.readdir(_x.index_dir, (err, files) => { + if (err) + return console.error(err); + data = JSON.stringify(data); + let data_size = data.length; + for (let file of files) { + let f_data, f_name = floCrypto.randString(INDEX_FILE_NAME_LENGTH); + if (file === cur_filename + INDEX_FILE_EXT) { + new_filename = f_name; + f_data = data; + } else { + let d_size = data_size * (floCrypto.randInt(MIN_DUMMY_SIZE_MUL * SIZE_FACTOR, MAX_DUMMY_SIZE_MUL * SIZE_FACTOR) / SIZE_FACTOR); + f_data = floCrypto.randString(d_size, false); + } + f_data = Crypto.AES.encrypt(f_data, _x.node_priv); + //rename and rewrite the file + try { + fs.renameSync(path.join(_x.index_dir, file), path.join(_x.index_dir, f_name + INDEX_FILE_EXT)); + fs.writeFileSync(path.join(_x.index_dir, f_name + INDEX_FILE_EXT), f_data, INDEX_FILE_TYPE); + } catch (error) { + console.error(error) + } + } + //update prime file + if (!new_filename) + return console.error("Index file has not been renamed"); + let en_filename = Crypto.AES.encrypt(new_filename, _x.node_priv); + try { + fs.writeFileSync(_x.prime_file, en_filename, PRIME_FILE_TYPE); + } catch (error) { + console.error(error); + } + }) + }).catch(error => console.error(error)) +} + +function readIndexFile() { + return new Promise((resolve, reject) => { + fs.readFile(_x.index_file, INDEX_FILE_TYPE, (err, data) => { + if (err) { + console.debug(err); + return reject('Unable to read Index file'); + } + try { + data = JSON.parse(Crypto.AES.decrypt(data, _x.node_priv)); + resolve(data); + } catch { + reject("Index file corrupted"); + } + }) + }) +} + +function writeIndexFile(data) { + return new Promise((resolve, reject) => { + let en_data = Crypto.AES.encrypt(JSON.stringify(data), _x.node_priv); + fs.writeFile(_x.index_file, en_data, INDEX_FILE_TYPE, (err) => { + if (err) { + console.debug(err); + return reject('Unable to write Index file'); + } else resolve("Updated Index file"); + }) + }) +} + +function getShares(group, id, ignoreDiscarded = true) { + return new Promise((resolve, reject) => { + checkIfDiscarded(id).then(result => { + if (ignoreDiscarded && result != false) + return reject("Trying to get share for discarded ID"); + readIndexFile().then(data => { + if (!(group in data)) + reject("Group not found in Index file"); + else if (!(id in data[group])) + reject("ID not found in Index file"); + else { + let ref = data[group][id].shift(); + DB.query("SELECT share FROM sinkShares WHERE num IN (?)", [data[group][id]]) + .then(result => resolve({ ref, shares: result.map(r => Crypto.AES.decrypt(r.share, _x.node_priv)) })) + .catch(error => reject(error)) + } + }).catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + +function storeShareAtRandom(share) { + return new Promise((resolve, reject) => { + let rand = floCrypto.randInt(UNSIGNED_INT_MIN, UNSIGNED_INT_MAX); + DB.query("INSERT INTO sinkShares(num, share) VALUE (?)", [[rand, share]]) + .then(result => resolve(result.insertId)).catch(error => { + if (error.code === "ER_DUP_ENTRY") + storeShareAtRandom(share) //try again (with diff rand_num) + .then(result => resolve(result)) + .catch(error => reject(error)) + else + reject(error); + }) + }) +} + +function addShare(group, id, ref, share) { + return new Promise((resolve, reject) => { + checkIfDiscarded(id).then(result => { + if (result != false) + return reject("Trying to store share for discarded ID"); + lockfile.lock(_x.index_file, { retries: { forever: true, minTimeout: LOCK_RETRY_MIN_TIME, maxTimeout: LOCK_RETRY_MAX_TIME } }).then(release => { + const releaseAndReject = err => { + release().then(_ => null).catch(error => console.error(error)); + reject(err); + } + readIndexFile().then(data => { + if (!(group in data)) + data[group] = {}; + if (!(id in data[group])) + data[group][id] = [ref]; + else if (ref < data[group][id][0]) + return reject("reference is lower than current"); + else if (ref > data[group][id][0]) { + let old_shares = data[group][id]; + data[group][id] = [ref]; + old_shares.shift(); + DB.query("DELETE FROM sinkShares WHERE num in (?)", [old_shares])//delete old shares + .then(_ => null).catch(error => console.error(error)); + } + let encrypted_share = Crypto.AES.encrypt(share, _x.node_priv); + console.debug(ref, '|sinkID:', id, '|EnShare:', encrypted_share); + storeShareAtRandom(encrypted_share).then(i => { + data[group][id].push(i); + writeIndexFile(data).then(_ => resolve(i)).catch(error => reject(error)) + .finally(_ => release().then(_ => null).catch(error => console.error(error))); + }).catch(error => releaseAndReject(error)) + }).catch(error => releaseAndReject(error)) + }).catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + +function generateShares(sinkKey, total_n, min_n) { + let shares = floCrypto.createShamirsSecretShares(sinkKey, total_n * SHARES_PER_NODE, min_n * SHARES_PER_NODE * SHARE_THRESHOLD); + let node_shares = Array(total_n); + for (let i = 0; i < total_n; i++) + node_shares[i] = shares.splice(0, SHARES_PER_NODE); + return node_shares; +} + +function getStoredList(group = null) { + return new Promise((resolve, reject) => { + readIndexFile().then(data => { + if (group !== null) { + if (group in data) + resolve(Object.keys(data.group)); + else + reject("Group not found in Index file"); + } else { + let ids = {}; + for (let group in data) + ids[group] = Object.keys(data[group]); + resolve(ids); + } + }).catch(error => reject(error)) + }) +} + +function getDiscardedList() { + return new Promise((resolve, reject) => { + DB.query("SELECT floID, discard_time FROM discardedSinks") + .then(result => resolve(Object.fromEntries(result.map(r => [r.floID, r.discard_time])))) + .catch(error => reject(error)) + }) +} + +function checkIfDiscarded(id) { + return new Promise((resolve, reject) => { + DB.query("SELECT discard_time FROM discardedSinks WHERE floID=?", [id]) + .then(result => resolve(result.length ? result[0].discard_time : false)) + .catch(error => reject(error)) + }) +} + +function discardSink(id) { + return new Promise((resolve, reject) => { + DB.query("INSERT INTO discardedSinks(floID) VALUE (?)", [id]) + .then(result => resolve(`Discarded ${id}`)) + .catch(error => reject(error)) + }) +} + +//Sink groups and chest +const sink_groups = { + get TRADE() { return "TRADE" }, + get list() { //total list + return [this.TRADE] + }, + get initial_list() { //list to generate when starting server + return [this.TRADE] + }, + get generate_list() { //list allowed to generate + return [this.TRADE] + } +}; + +const sink_ids = {}, sink_chest = { + reset() { + for (let i in sink_ids) + delete sink_ids[i]; + }, + set_id(group, id, value) { + if (!(group in sink_ids)) + sink_ids[group] = {}; + sink_ids[group][id] = value; + }, + rm_id(group, id) { + return delete sink_ids[group][id]; + }, + get_id(group, id) { + return sink_ids[group][id]; + }, + list(group) { + return Object.keys(sink_ids[group] || {}); + }, + active_list(group) { + let ids = []; + if (group in sink_ids) + for (let id in sink_ids[group]) + if (sink_ids[group][id]) + ids.push(id); + return ids; + }, + includes(group, id) { + return group in sink_ids ? (id in sink_ids[group]) : null; + }, + isActive(group, id) { + return group in sink_ids ? (id in sink_ids && sink_ids[id]) : null; + }, + pick(group) { + let ids = this.list(group), + i = floCrypto.randInt(0, ids.length - 1); + return ids[i]; + }, + active_pick(group) { + let ids = this.active_list(group), + i = floCrypto.randInt(0, ids.length - 1); + return ids[i]; + }, + find_group(id) { + let group = null; + for (let g in sink_ids) + if (id in sink_ids[g]) { + group = g; break; + } + return group; + }, + get_all() { + let ids = {}; + for (let g in sink_ids) + ids[g] = Object.keys(sink_ids[g]) + return ids; + } +}; + +//Asset list +var assetList, currency, tradeList; //container for allowed assets, currency and trade asset +const assets = { + set_list(assets) { + if (Array.isArray(assets)) + assetList = assets; + }, + get list() { return Array.from(assetList) }, + includes(asset) { return assetList.includes(asset) }, + set_currency(c) { currency = c }, + get currency() { return currency }, + isCurrency(asset) { return asset === currency }, + set_tradeable(assets) { + if (Array.isArray(assets)) + tradeList = assets; + }, + get tradeList() { return Array.from(tradeList) }, + isTradeable(asset) { return tradeList.includes(asset) } +} + +module.exports = { + init: initialize, + getShares, + addShare, + generateShares, + getStoredList, + getDiscardedList, + checkIfDiscarded, + discardSink, + set node_priv(key) { + _x.node_priv = key; + }, + get node_priv() { + return _x.node_priv; + }, + get node_id() { + return node_id; + }, + get node_pub() { + return node_pub; + }, + get sink_groups() { + return sink_groups; + }, + get sink_chest() { + return sink_chest; + }, + get assets() { + return assets; + } +} \ No newline at end of file diff --git a/src/main.js b/src/main.js new file mode 100644 index 0000000..0ac24cb --- /dev/null +++ b/src/main.js @@ -0,0 +1,179 @@ +'use strict'; +global.floGlobals = require('../docs/scripts/floGlobals'); +require('./set_globals'); +require('../docs/scripts/lib'); +global.floCrypto = require('../docs/scripts/floCrypto'); +global.floBlockchainAPI = require('../docs/scripts/floBlockchainAPI'); +global.floTokenAPI = require('../docs/scripts/floTokenAPI'); +global.btcOperator = require('../docs/scripts/btcOperator'); + +const keys = require('./keys'); +const DB = require("./database"); +const App = require('./app'); + +const backup = require('./backup/head'); + +const { + BLOCKCHAIN_REFRESH_INTERVAL +} = require("./_constants")["app"]; + +(function () { + const { adminID, application, currency } = require("../docs/scripts/floTradeAPI"); + floGlobals.adminID = adminID; + floGlobals.application = application; + keys.assets.set_currency(currency); +})(); + +var app; + +function refreshData(startup = false) { + return new Promise((resolve, reject) => { + refreshDataFromBlockchain().then(changes => { + loadDataFromDB(changes, startup).then(_ => { + if (!startup && changes.nodes) + backup.reconstructAllActiveShares(); + //app.refreshData(backup.nodeList); + resolve("Data refresh successful") + }).catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + +function refreshDataFromBlockchain() { + return new Promise((resolve, reject) => { + DB.query("SELECT txid FROM LastTx WHERE floID=?", [floGlobals.adminID]).then(result => { + var query_options = { sentOnly: true, pattern: floGlobals.application }; + + let lastTx = result.length ? result[0].txid : undefined; + if (typeof lastTx == 'string' && /^[0-9a-f]{64}/i.test(lastTx))//lastTx is txid of last tx + query_options.after = lastTx; + else if (!isNaN(lastTx))//lastTx is tx count (*backward support) + query_options.ignoreOld = parseInt(lastTx); + + floBlockchainAPI.readData(floGlobals.adminID, query_options).then(result => { + let promises = [], + nodes_change = false, + assets_change = false; + result.data.reverse().forEach(data => { + var content = JSON.parse(data)[floGlobals.application]; + //Node List + if (content.Nodes) { + nodes_change = true; + if (content.Nodes.remove) + for (let n of content.Nodes.remove) + promises.push(DB.query("DELETE FROM NodeList WHERE floID=?", [n])); + if (content.Nodes.add) + for (let n in content.Nodes.add) + promises.push(DB.query("INSERT INTO NodeList (floID, uri) VALUE (?) ON DUPLICATE KEY UPDATE uri=?", [[n, content.Nodes.add[n]], content.Nodes.add[n]])); + if (content.Nodes.update) + for (let n in content.Nodes.update) + promises.push(DB.query("UPDATE NodeList SET uri=? WHERE floID=?", [content.Nodes.update[n], n])); + } + //Asset List + if (content.Assets) { + assets_change = true; + for (let a in content.Assets) + promises.push(DB.query("INSERT INTO AssetList (asset, isTradeable) VALUE (?) ON DUPLICATE KEY UPDATE isTradeable=?", [[a, content.Assets[a]], content.Assets[a]])); + } + }); + promises.push(DB.query("INSERT INTO LastTx (floID, txid) VALUE (?) ON DUPLICATE KEY UPDATE txid=?", [[floGlobals.adminID, result.lastItem], result.lastItem])); + //Check if all save process were successful + Promise.allSettled(promises).then(results => { + //console.debug(results.filter(r => r.status === "rejected")); + if (results.reduce((a, r) => r.status === "rejected" ? ++a : a, 0)) + console.warn("Some blockchain data might not have been saved in database correctly"); + resolve({ + nodes: nodes_change, + assets: assets_change + }); + }); + }).catch(error => reject(error)); + }).catch(error => reject(error)) + }) +} + +function loadDataFromDB(changes, startup) { + return new Promise((resolve, reject) => { + let promises = []; + if (startup || changes.nodes) + promises.push(loadDataFromDB.nodeList()); + if (startup || changes.assets) + promises.push(loadDataFromDB.assetList()); + Promise.all(promises) + .then(_ => resolve("Data load successful")) + .catch(error => reject(error)) + }) +} + +loadDataFromDB.nodeList = function () { + return new Promise((resolve, reject) => { + DB.query("SELECT floID, uri FROM NodeList").then(result => { + let nodes = {} + for (let i in result) + nodes[result[i].floID] = result[i].uri; + //update dependents + backup.nodeList = nodes; + resolve(nodes); + }).catch(error => reject(error)) + }) +} + +loadDataFromDB.assetList = function () { + return new Promise((resolve, reject) => { + DB.query("SELECT asset, isTradeable FROM AssetList").then(result => { + let assets = [], tradeAssets = []; + for (let i in result) { + assets.push(result[i].asset); + if (result[i].isTradeable) + tradeAssets.push(result[i].asset); + } + //update dependents + keys.assets.set_list(assets); + keys.assets.set_tradeable(tradeAssets); + resolve(assets); + }).catch(error => reject(error)) + }) +} + +module.exports = function startServer() { + let _pass, _I = ""; + for (let arg of process.argv) { + if (/^-I=/.test(arg)) + _I = arg.split(/=(.*)/s)[1]; + else if (/^-password=/i.test(arg)) + _pass = arg.split(/=(.*)/s)[1]; + } + const config = require(`../args/config${_I}.json`); + try { + let _tmp = require(`../args/keys${_I}.json`); + _tmp = floCrypto.retrieveShamirSecret(_tmp); + if (!_pass) { + console.error('Password not entered!'); + process.exit(1); + } + keys.node_priv = Crypto.AES.decrypt(_tmp, _pass); + } catch (error) { + console.error('Unable to load private key!'); + process.exit(1); + } + + console.log("Logged in as", keys.node_id); + + DB.connect(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(result => { + keys.init().then(result => { + console.log(result); + app = new App(config['secret']); + refreshData(true).then(_ => { + app.start(config['port']).then(result => { + console.log(result); + backup.init(app); + setInterval(() => { + refreshData() + .then(result => console.log(result)) + .catch(error => console.error(error)) + }, BLOCKCHAIN_REFRESH_INTERVAL); + }).catch(error => console.error(error)) + }).catch(error => console.error(error)) + }).catch(error => console.error(error)) + }).catch(error => console.error(error)); +}; \ No newline at end of file diff --git a/src/market.js b/src/market.js new file mode 100644 index 0000000..e4404d8 --- /dev/null +++ b/src/market.js @@ -0,0 +1,458 @@ +'use strict'; + +const coupling = require('./coupling'); +const price = require("./price"); +const background = require('./background'); +const DB = require("./database"); +const blockchain = require('./blockchain'); +const keys = require('./keys'); + +const { + TRADE_HASH_PREFIX, + TRANSFER_HASH_PREFIX +} = require('./_constants')["market"]; + +const eCode = require('../docs/scripts/floTradeAPI').errorCode; +const pCode = require('../docs/scripts/floTradeAPI').processCode; + +const updateBalance = background.updateBalance = coupling.updateBalance; + +function login(floID, proxyKey) { + return new Promise((resolve, reject) => { + DB.query("INSERT INTO UserSession (floID, proxyKey) VALUE (?) ON DUPLICATE KEY UPDATE session_time=DEFAULT, proxyKey=?", [[floID, proxyKey], proxyKey]) + .then(result => resolve("Login Successful")) + .catch(error => reject(error)) + }) +} + +function logout(floID) { + return new Promise((resolve, reject) => { + DB.query("DELETE FROM UserSession WHERE floID=?", [floID]) + .then(result => resolve("Logout successful")) + .catch(error => reject(error)) + }) +} + +function getRateHistory(asset, duration) { + return new Promise((resolve, reject) => { + if (!asset || !keys.assets.isTradeable(asset)) + reject(INVALID(eCode.INVALID_ASSET_NAME, `Invalid asset(${asset})`)); + else + price.getHistory(asset, duration) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) +} + +function getBalance(floID, asset) { + return new Promise((resolve, reject) => { + if (floID && !floCrypto.validateAddr(floID)) + reject(INVALID(eCode.INVALID_FLO_ID, `Invalid floID(${floID})`)); + else if (asset && !keys.assets.includes(asset)) + reject(INVALID(eCode.INVALID_ASSET_NAME, `Invalid asset(${asset})`)); + else if (!floID && !asset) + reject(INVALID(eCode.MISSING_PARAMETER, 'Missing parameters: requires atleast one (floID, asset)')); + else { + var promise; + if (floID && asset) + promise = getBalance.floID_asset(floID, asset); + else if (floID) + promise = getBalance.floID(floID); + else if (asset) + promise = getBalance.asset(asset); + promise.then(result => resolve(result)).catch(error => reject(error)) + } + }) +} + +getBalance.floID_asset = (floID, asset) => new Promise((resolve, reject) => { + DB.query("SELECT quantity AS balance FROM UserBalance WHERE floID=? AND asset=?", [floID, asset]).then(result => resolve({ + floID, + asset, + balance: result.length ? global.toStandardDecimal(result[0].balance) : 0 + })).catch(error => reject(error)) +}); + +getBalance.floID = (floID) => new Promise((resolve, reject) => { + DB.query("SELECT asset, quantity AS balance FROM UserBalance WHERE floID=?", [floID]).then(result => { + let response = { + floID, + balance: {} + }; + for (let row of result) + response.balance[row.asset] = global.toStandardDecimal(row.balance); + resolve(response); + }).catch(error => reject(error)) +}); + +getBalance.asset = (asset) => new Promise((resolve, reject) => { + DB.query("SELECT floID, quantity AS balance FROM UserBalance WHERE asset=?", [asset]).then(result => { + let response = { + asset: asset, + balance: {} + }; + for (let row of result) + response.balance[row.floID] = global.toStandardDecimal(row.balance); + resolve(response); + }).catch(error => reject(error)) +}); + +const getAssetBalance = (floID, asset) => new Promise((resolve, reject) => { + let promises = []; + promises.push(DB.query("SELECT IFNULL(SUM(quantity), 0) AS balance FROM UserBalance WHERE floID=? AND asset=?", [floID, asset])); + promises.push(asset === floGlobals.currency ? + DB.query("SELECT IFNULL(SUM(quantity*maxPrice), 0) AS locked FROM BuyOrder WHERE floID=?", [floID]) : + DB.query("SELECT IFNULL(SUM(quantity), 0) AS locked FROM SellOrder WHERE floID=? AND asset=?", [floID, asset]) + ); + Promise.all(promises).then(result => resolve({ + total: result[0][0].balance, + locked: result[1][0].locked, + net: result[0][0].balance - result[1][0].locked + })).catch(error => reject(error)) +}); + +getAssetBalance.check = (floID, asset, amount) => new Promise((resolve, reject) => { + getAssetBalance(floID, asset).then(balance => { + if (balance.total < amount) + reject(INVALID(eCode.INSUFFICIENT_BALANCE, `Insufficient ${asset}`)); + else if (balance.net < amount) + reject(INVALID(eCode.INSUFFICIENT_BALANCE, `Insufficient ${asset} (Some are locked in orders)`)); + else + resolve(true); + }).catch(error => reject(error)) +}); + +function addSellOrder(floID, asset, quantity, min_price) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(floID)) + return reject(INVALID(eCode.INVALID_FLO_ID, `Invalid floID (${floID})`)); + else if (typeof quantity !== "number" || quantity <= 0) + return reject(INVALID(eCode.INVALID_NUMBER, `Invalid quantity (${quantity})`)); + else if (typeof min_price !== "number" || min_price <= 0) + return reject(INVALID(eCode.INVALID_NUMBER, `Invalid min_price (${min_price})`)); + else if (!keys.assets.isTradeable(asset)) + return reject(INVALID(eCode.INVALID_ASSET_NAME, `Invalid asset (${asset})`)); + getAssetBalance.check(floID, asset, quantity).then(_ => { + DB.query("INSERT INTO SellOrder(floID, asset, quantity, price) VALUES (?)", [[floID, asset, quantity, min_price]]).then(result => { + resolve('Sell Order placed successfully'); + console.debug("sell order placed"); + coupling.initiate(asset); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }); +} + +function addBuyOrder(floID, asset, quantity, max_price) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(floID)) + return reject(INVALID(eCode.INVALID_FLO_ID, `Invalid floID (${floID})`)); + else if (typeof quantity !== "number" || quantity <= 0) + return reject(INVALID(eCode.INVALID_NUMBER, `Invalid quantity (${quantity})`)); + else if (typeof max_price !== "number" || max_price <= 0) + return reject(INVALID(eCode.INVALID_NUMBER, `Invalid max_price (${max_price})`)); + else if (!keys.assets.isTradeable(asset)) + return reject(INVALID(eCode.INVALID_ASSET_NAME, `Invalid asset (${asset})`)); + getAssetBalance.check(floID, floGlobals.currency, quantity * max_price).then(_ => { + DB.query("INSERT INTO BuyOrder(floID, asset, quantity, maxPrice) VALUES (?)", [[floID, asset, quantity, max_price]]).then(result => { + console.debug("before resolve") + resolve('Buy Order placed successfully'); + console.debug("buy order placed"); + coupling.initiate(asset); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }); +} + +function cancelOrder(type, id, floID) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(floID)) + return reject(INVALID(eCode.INVALID_FLO_ID, `Invalid floID (${floID})`)); + let tableName; + if (type === "buy") + tableName = "BuyOrder"; + else if (type === "sell") + tableName = "SellOrder"; + else + return reject(INVALID(eCode.INVALID_TYPE, "Invalid Order type! Order type must be buy (or) sell")); + DB.query("SELECT floID, asset FROM ?? WHERE id=?", [tableName, id]).then(result => { + if (result.length < 1) + return reject(INVALID(eCode.NOT_FOUND, "Order not found!")); + else if (result[0].floID !== floID) + return reject(INVALID(eCode.NOT_OWNER, "Order doesnt belong to the current user")); + let asset = result[0].asset; + //Delete the order + DB.query("DELETE FROM ?? WHERE id=?", [tableName, id]).then(result => { + resolve(tableName + "#" + id + " cancelled successfully"); + coupling.initiate(asset); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }); +} + +function getAccountDetails(floID) { + return new Promise((resolve, reject) => { + let promises = [ + DB.query("SELECT asset, quantity FROM UserBalance WHERE floID=?", [floID]), + DB.query("SELECT id, asset, quantity, price, time_placed FROM SellOrder WHERE floID=?", [floID]), + DB.query("SELECT id, asset, quantity, maxPrice, time_placed FROM BuyOrder WHERE floID=?", [floID]) + ]; + Promise.allSettled(promises).then(results => { + let response = { + floID: floID, + time: Date.now() + }; + results.forEach((a, i) => { + if (a.status === "rejected") + console.error(a.reason); + else + switch (i) { + case 0: + response.tokenBalance = a.value; + break; + case 1: + response.sellOrders = a.value; + break; + case 2: + response.buyOrders = a.value; + break; + } + }); + DB.query("SELECT * FROM TradeTransactions WHERE seller=? OR buyer=?", [floID, floID]) + .then(result => response.transactions = result) + .catch(error => console.error(error)) + .finally(_ => resolve(response)); + }); + }); +} + +function getUserTransacts(floID) { + return new Promise((resolve, reject) => { + DB.query("SELECT mode, asset, amount, txid, locktime, r_status FROM VaultTransactions WHERE floID=?", [floID]) + .then(result => resolve(result)) + .catch(error => reject(error)) + }) +} + +/* +function getTransactionDetails(txid) { + return new Promise((resolve, reject) => { + let tableName, type; + if (txid.startsWith(TRANSFER_HASH_PREFIX)) { + tableName = 'TransferTransactions'; + type = 'transfer'; + } else if (txid.startsWith(TRADE_HASH_PREFIX)) { + tableName = 'TradeTransactions'; + type = 'trade'; + } else + return reject(INVALID(eCode.INVALID_TX_ID, "Invalid TransactionID")); + DB.query("SELECT * FROM ?? WHERE txid=?", [tableName, txid]).then(result => { + if (result.length) { + let details = result[0]; + details.type = type; + if (tableName === 'TransferTransactions') //As json object is stored for receiver in transfer (to support one-to-many) + details.receiver = JSON.parse(details.receiver); + resolve(details); + } else + reject(INVALID(eCode.NOT_FOUND, "Transaction not found")); + }).catch(error => reject(error)) + }) +} +*/ + +/* +function transferAsset(sender, receivers, asset) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(sender)) + reject(INVALID(eCode.INVALID_FLO_ID, `Invalid sender (${sender})`)); + else if (!keys.assets.includes(asset)) + reject(INVALID(eCode.INVALID_ASSET_NAME, `Invalid asset (${asset})`)); + else { + let invalidIDs = [], + totalAmount = 0; + for (let floID in receivers) + if (!floCrypto.validateAddr(floID)) + invalidIDs.push(floID); + else + totalAmount += receivers[floID]; + if (invalidIDs.length) + reject(INVALID(eCode.INVALID_FLO_ID, `Invalid receiver (${invalidIDs})`)); + else getAssetBalance.check(sender, asset, totalAmount).then(_ => { + let txQueries = []; + txQueries.push(updateBalance.consume(sender, asset, totalAmount)); + for (let floID in receivers) + txQueries.push(updateBalance.add(floID, asset, receivers[floID])); + let time = Date.now(); + let hash = TRANSFER_HASH_PREFIX + Crypto.SHA256(JSON.stringify({ + sender: sender, + receiver: receivers, + asset: asset, + totalAmount: totalAmount, + tx_time: time, + })); + txQueries.push([ + "INSERT INTO TransferTransactions (sender, receiver, asset, totalAmount, tx_time, txid) VALUE (?)", + [[sender, JSON.stringify(receivers), asset, totalAmount, new Date(time), hash]] + ]); + DB.transaction(txQueries) + .then(result => resolve(hash)) + .catch(error => reject(error)) + }).catch(error => reject(error)) + } + }) +} +*/ + +function depositAsset(floID, asset, txid) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(floID)) + return reject(INVALID(eCode.INVALID_FLO_ID, `Invalid floID (${floID})`)); + else if (!keys.assets.includes(asset)) + return reject(INVALID(eCode.INVALID_ASSET_NAME, "Invalid Asset")); + var f; + switch (asset) { + case "FLO": f = depositFLO(floID, txid); break; + case "BTC": f = depositBTC(floID, txid); break; + default: f = depositToken(floID, txid); break; + } + f.then(result => resolve(result)).catch(error => reject(error)); + }) +} + +function withdrawAsset(floID, asset, amount) { + return new Promise((resolve, reject) => { + if (!floCrypto.validateAddr(floID)) + return reject(INVALID(eCode.INVALID_FLO_ID, `Invalid floID (${floID})`)); + else if (!keys.assets.includes(asset)) + return reject(INVALID(eCode.INVALID_ASSET_NAME, "Invalid Asset")); + else if (typeof amount !== "number" || amount <= 0) + return reject(INVALID(eCode.INVALID_NUMBER, `Invalid amount (${amount})`)); + var f; + switch (asset) { + case "FLO": f = withdrawFLO(floID, txid); break; + case "BTC": f = withdrawBTC(floID, txid); break; + default: f = withdrawToken(floID, asset, txid); break; + } + f.then(result => resolve(result)).catch(error => reject(error)); + }) +} + +function depositFLO(floID, txid) { + return new Promise((resolve, reject) => { + DB.query("SELECT r_status FROM VaultTransactions WHERE txid=? AND floID=? AND asset=?", [txid, floID, "FLO"]).then(result => { + if (result.length) { + switch (result[0].r_status) { + case pCode.STATUS_PENDING: + return reject(INVALID(eCode.DUPLICATE_ENTRY, "Transaction already in process")); + case pCode.STATUS_REJECTED: + return reject(INVALID(eCode.DUPLICATE_ENTRY, "Transaction already rejected")); + case pCode.STATUS_SUCCESS: + return reject(INVALID(eCode.DUPLICATE_ENTRY, "Transaction already used to add coins")); + } + } else + DB.query("INSERT INTO VaultTransactions(floID, mode, asset_type, asset, txid, r_status) VALUES (?)", [[floID, pCode.VAULT_MODE_DEPOSIT, pCode.ASSET_TYPE_COIN, "FLO", txid, pCode.STATUS_PENDING]]) + .then(result => resolve("Deposit request in process")) + .catch(error => reject(error)); + }).catch(error => reject(error)) + }); +} + +function withdrawFLO(floID, amount) { + return new Promise((resolve, reject) => { + getAssetBalance.check(floID, "FLO", amount).then(_ => { + let txQueries = []; + txQueries.push(updateBalance.consume(floID, "FLO", amount)); + DB.transaction(txQueries).then(result => { + blockchain.withdrawAsset.init(floID, "FLO", amount); + resolve("Withdrawal request is in process"); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }); +} + +function depositBTC(floID, txid) { + return new Promise((resolve, reject) => { + DB.query("SELECT r_status FROM VaultTransactions WHERE txid=? AND floID=? AND asset=?", [txid, floID, "BTC"]).then(result => { + if (result.length) { + switch (result[0].r_status) { + case pCode.STATUS_PENDING: + return reject(INVALID(eCode.DUPLICATE_ENTRY, "Transaction already in process")); + case pCode.STATUS_REJECTED: + return reject(INVALID(eCode.DUPLICATE_ENTRY, "Transaction already rejected")); + case pCode.STATUS_SUCCESS: + return reject(INVALID(eCode.DUPLICATE_ENTRY, "Transaction already used to add coins")); + } + } else + DB.query("INSERT INTO VaultTransactions(floID, mode, asset_type, asset, txid, r_status) VALUES (?)", [[floID, pCode.VAULT_MODE_DEPOSIT, pCode.ASSET_TYPE_COIN, "BTC", txid, pCode.STATUS_PENDING]]) + .then(result => resolve("Deposit request in process")) + .catch(error => reject(error)); + }).catch(error => reject(error)) + }); +} + +function withdrawBTC(floID, amount) { + return new Promise((resolve, reject) => { + getAssetBalance.check(floID, "BTC", amount).then(_ => { + let txQueries = []; + txQueries.push(updateBalance.consume(floID, "BTC", amount)); + DB.transaction(txQueries).then(result => { + blockchain.withdrawAsset.init(floID, "BTC", amount); + resolve("Withdrawal request is in process"); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }); +} + +function depositToken(floID, txid) { + return new Promise((resolve, reject) => { + DB.query("SELECT r_status FROM VaultTransactions WHERE txid=? AND floID=? AND asset_type=?", [txid, floID, pCode.ASSET_TYPE_TOKEN]).then(result => { + if (result.length) { + switch (result[0].r_status) { + case pCode.STATUS_PENDING: + return reject(INVALID(eCode.DUPLICATE_ENTRY, "Transaction already in process")); + case pCode.STATUS_REJECTED: + return reject(INVALID(eCode.DUPLICATE_ENTRY, "Transaction already rejected")); + case pCode.STATUS_SUCCESS: + return reject(INVALID(eCode.DUPLICATE_ENTRY, "Transaction already used to add tokens")); + } + } else + DB.query("INSERT INTO VaultTransactions(floID, mode, asset_type, txid, r_status) VALUES (?)", [[floID, pCode.VAULT_MODE_DEPOSIT, pCode.ASSET_TYPE_TOKEN, txid, pCode.STATUS_PENDING]]) + .then(result => resolve("Deposit request in process")) + .catch(error => reject(error)); + }).catch(error => reject(error)) + }); +} + +function withdrawToken(floID, token, amount) { + return new Promise((resolve, reject) => { + //Check for FLO balance (transaction fee) + let required_flo = floGlobals.sendAmt + floGlobals.fee; + getAssetBalance.check(floID, "FLO", required_flo).then(_ => { + getAssetBalance.check(floID, token, amount).then(_ => { + let txQueries = []; + txQueries.push(updateBalance.consume(floID, "FLO", required_flo)); + txQueries.push(updateBalance.consume(floID, token, amount)); + DB.transaction(txQueries).then(result => { + //Send Token to user via token API + blockchain.withdrawAsset.init(floID, token, amount); + resolve("Withdrawal request is in process"); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }).catch(error => reject(error)); + }); +} + +module.exports = { + login, + logout, + addBuyOrder, + addSellOrder, + cancelOrder, + getRateHistory, + getBalance, + getAccountDetails, + getUserTransacts, + //getTransactionDetails, + //transferAsset, + depositAsset, + withdrawAsset +}; \ No newline at end of file diff --git a/src/pairing.js b/src/pairing.js new file mode 100644 index 0000000..364224d --- /dev/null +++ b/src/pairing.js @@ -0,0 +1,27 @@ +const getBestSeller = (asset) => new Promise((resolve, reject) => { + DB.query("SELECT SellOrder.id, SellOrder.floID, SellOrder.quantity FROM SellOrder" + + " INNER JOIN UserBalance ON UserBalance.floID = SellOrder.floID AND UserBalance.asset = SellOrder.asset" + + " WHERE UserBalance.quantity >= SellOrder.quantity AND SellOrder.asset = ?" + + " ORDER BY SellOrder.price ASC, SellOrder.time_placed ASC" + + " LIMIT 1", [asset] + ).then(result => { + if (result.length) + resolve(result[0]); + else + reject(null); + }).catch(error => reject(error)) +}); + +const getBestBuyer = (asset) => new Promise((resolve, reject) => { + DB.query("SELECT BuyOrder.id, BuyOrder.floID, BuyOrder.quantity FROM BuyOrder" + + " INNER JOIN UserBalance ON UserBalance.floID = BuyOrder.floID AND UserBalance.asset = ?" + + " WHERE UserBalance.quantity >= BuyOrder.maxPrice * BuyOrder.quantity AND BuyOrder.asset = ?" + + " ORDER BY BuyOrder.maxPrice DESC, BuyOrder.time_placed ASC" + + " LIMIT 1", [floGlobals.currency, asset] + ).then(result => { + if (result.length) + resolve(result[0]); + else + reject(null); + }).catch(error => reject(error)) +}); \ No newline at end of file diff --git a/src/price.js b/src/price.js new file mode 100644 index 0000000..764bd37 --- /dev/null +++ b/src/price.js @@ -0,0 +1,61 @@ +'use strict'; +const DB = require("./database"); + +var currentRate = {}; //container for FLO price (from API or by model) + +//store FLO price in database every 1 hr +function storeHistory(asset, rate) { + DB.query("INSERT INTO PriceHistory (asset, rate) VALUE (?)", [[asset, global.toStandardDecimal(rate)]]) + .then(_ => null).catch(error => console.error(error)) +} + +function getHistory(asset, duration = '') { + return new Promise((resolve, reject) => { + let { statement, values } = getHistory.getRateStatement(asset, duration); + DB.query(statement, values) + .then(result => resolve(result)) + .catch(error => reject(error)) + }); +} + +getHistory.statement = { + 'all-time': "SELECT DATE(rec_time) AS time, AVG(rate) as rate FROM PriceHistory WHERE asset=? GROUP BY time ORDER BY time", + 'year': "SELECT DATE(rec_time) AS time, AVG(rate) as rate FROM PriceHistory WHERE asset=? AND rec_time >= NOW() - INTERVAL ? year GROUP BY time ORDER BY time", + 'month': "SELECT DATE(rec_time) AS time, AVG(rate) as rate FROM PriceHistory WHERE asset=? AND rec_time >= NOW() - INTERVAL ? month GROUP BY time ORDER BY time", + 'week': "SELECT rec_time AS time, rate FROM PriceHistory WHERE asset=? AND rec_time >= NOW() - INTERVAL ? week ORDER BY time", + 'day': "SELECT rec_time AS time, rate FROM PriceHistory WHERE asset=? AND rec_time >= NOW() - INTERVAL ? day ORDER BY time" +} + +getHistory.getRateStatement = (asset, duration) => { + let n = duration.match(/\d+/g), + d = duration.match(/\D+/g); + n = n ? n[0] || 1 : 1; + d = d ? d[0].replace(/[-\s]/g, '') : ""; + + switch (d.toLowerCase()) { + case "day": + case "days": + return { statement: getHistory.statement['day'], values: [asset, n] }; + case "week": + case "weeks": + return { statement: getHistory.statement['week'], values: [asset, n] }; + case "month": + case "months": + return { statement: getHistory.statement['month'], values: [asset, n] }; + case "year": + case "years": + return { statement: getHistory.statement['year'], values: [asset, n] }; + case "alltime": + return { statement: getHistory.statement['all-time'], values: [asset] }; + default: + return { statement: getHistory.statement['day'], values: [asset, 1] }; + } +} + +module.exports = { + getHistory, + storeHistory, + get currentRates() { + return Object.assign({}, currentRate); + } +} \ No newline at end of file diff --git a/src/request.js b/src/request.js new file mode 100644 index 0000000..3a70c7f --- /dev/null +++ b/src/request.js @@ -0,0 +1,483 @@ +'use strict'; +const DB = require("./database"); + +const market = require("./market"); +const background = require("./background"); +const sink = require("./backup/head").sink; +const keys = require("./keys"); + +const { + SIGN_EXPIRE_TIME, + MAX_SESSION_TIMEOUT +} = require("./_constants")["request"]; + +const eCode = require('../docs/scripts/floTradeAPI').errorCode; +const serviceList = require('../docs/scripts/floTradeAPI').serviceList; + +var secret; //containers for secret + +global.INVALID = function (ecode, message) { + if (!(this instanceof INVALID)) + return new INVALID(ecode, message); + this.message = message; + this.ecode = ecode; +} +INVALID.e_code = 400; +INVALID.prototype.toString = function () { + return "E" + this.ecode + ": " + this.message; +} +INVALID.str = (ecode, message) => INVALID(ecode, message).toString(); + +global.INTERNAL = function INTERNAL(message) { + if (!(this instanceof INTERNAL)) + return new INTERNAL(message); + this.message = message; +} +INTERNAL.e_code = 500; +INTERNAL.prototype.toString = function () { + return "E" + eCode.INTERNAL_ERROR + ": " + this.message; +} +INTERNAL.str = (ecode, message) => INTERNAL(ecode, message).toString(); + +const INCORRECT_SERVER_ERROR = INVALID(eCode.INCORRECT_SERVER, "Incorrect server"); + +var serving; + +function validateRequest(request, sign, floID, pubKey) { + return new Promise((resolve, reject) => { + if (!serving) + reject(INCORRECT_SERVER_ERROR); + else if (!request.timestamp) + reject(INVALID(eCode.MISSING_PARAMETER, "Timestamp parameter missing")); + else if (Date.now() - SIGN_EXPIRE_TIME > request.timestamp) + reject(INVALID(eCode.EXPIRED_SIGNATURE, "Signature Expired")); + else if (!floCrypto.validateAddr(floID)) + reject(INVALID(eCode.INVALID_FLO_ID, "Invalid floID")); + else if (typeof request !== "object") + reject(INVALID(eCode.INVALID_REQUEST_FORMAT, "Request is not an object")); + else validateRequest.getSignKey(floID, pubKey).then(signKey => { + let req_str = Object.keys(request).sort().map(r => r + ":" + request[r]).join("|"); + try { + if (!floCrypto.verifySign(req_str, sign, signKey)) + reject(INVALID(eCode.INVALID_SIGNATURE, "Invalid request signature")); + else validateRequest.checkIfSignUsed(sign) + .then(result => resolve(req_str)) + .catch(error => reject(error)) + } catch { + reject(INVALID(eCode.INVALID_SIGNATURE, "Corrupted sign/key")); + } + }).catch(error => reject(error)); + }); +} + +validateRequest.getSignKey = (floID, pubKey) => new Promise((resolve, reject) => { + if (!pubKey) + DB.query("SELECT session_time, proxyKey FROM UserSession WHERE floID=?", [floID]).then(result => { + if (result.length < 1) + reject(INVALID(eCode.SESSION_INVALID, "Session not active")); + else if (result[0].session_time + MAX_SESSION_TIMEOUT < Date.now()) + reject(INVALID(eCode.SESSION_EXPIRED, "Session Expired! Re-login required")); + else + resolve(result[0].proxyKey); + }).catch(error => reject(error)); + else if (floCrypto.getFloID(pubKey) === floID) + resolve(pubKey); + else + reject(INVALID(eCode.INVALID_PUBLIC_KEY, "Invalid pubKey")); +}); + +validateRequest.checkIfSignUsed = sign => new Promise((resolve, reject) => { + DB.query("SELECT id FROM RequestLog WHERE sign=?", [sign]).then(result => { + if (result.length) + reject(INVALID(eCode.DUPLICATE_SIGNATURE, "Duplicate signature")); + else + resolve(true); + }).catch(error => reject(error)) +}); + +function logRequest(floID, req_str, sign, proxy = false) { + //console.debug(floID, req_str); + DB.query("INSERT INTO RequestLog (floID, request, sign, proxy) VALUES (?)", [[floID, req_str, sign, proxy]]) + .then(_ => null).catch(error => console.error(error)); +} + +function processRequest(res, floID, pubKey, sign, rText, validateObj, marketFn, log = true) { + validateRequest(validateObj, sign, floID, pubKey).then(req_str => { + marketFn().then(result => { + if (log) logRequest(floID, req_str, sign, !pubKey); + res.send(result); + }).catch(error => { + if (error instanceof INVALID) + res.status(INVALID.e_code).send(error.toString()); + else { + console.error(error); + res.status(INTERNAL.e_code).send(INTERNAL.str(rText + " failed! Try again later!")); + } + }) + }).catch(error => { + if (error instanceof INVALID) + res.status(INVALID.e_code).send(error.toString()); + else { + console.error(error); + res.status(INTERNAL.e_code).send(INTERNAL.str("Request processing failed! Try again later!")); + } + }) +} + +/* User Requests */ + +function Account(req, res) { + let data = req.body; + validateRequest({ + type: "get_account", + timestamp: data.timestamp + }, data.sign, data.floID, data.pubKey).then(req_str => { + market.getAccountDetails(data.floID).then(result => { + res.send(result); + }); + }).catch(error => { + if (error instanceof INVALID) + res.status(INVALID.e_code).send(error.toString()); + else { + console.error(error); + res.status(INTERNAL.e_code).send(INTERNAL.str("Request processing failed! Try again later!")); + } + }); +} + +function Login(req, res) { + let data = req.body; + if (!data.code || data.hash != Crypto.SHA1(data.code + secret)) + res.status(INVALID.e_code).send(INVALID.str(eCode.INVALID_LOGIN_CODE, "Invalid Code")); + else if (!data.pubKey) + res.status(INVALID.e_code).send(INVALID.str(eCode.MISSING_PARAMETER, "Public key missing")); + else + processRequest(res, data.floID, data.pubKey, data.sign, "Login", { + type: "login", + random: data.code, + proxyKey: data.proxyKey, + timestamp: data.timestamp + }, () => market.login(data.floID, data.proxyKey)); +} + +function Logout(req, res) { + let data = req.body; + processRequest(res, data.floID, data.pubKey, data.sign, "Logout", { + type: "logout", + timestamp: data.timestamp + }, () => market.logout(data.floID)); +} + +function PlaceSellOrder(req, res) { + let data = req.body; + processRequest(res, data.floID, data.pubKey, data.sign, "Sell order placement", { + type: "sell_order", + asset: data.asset, + quantity: data.quantity, + min_price: data.min_price, + timestamp: data.timestamp + }, () => market.addSellOrder(data.floID, data.asset, data.quantity, data.min_price)); +} + +function PlaceBuyOrder(req, res) { + let data = req.body; + processRequest(res, data.floID, data.pubKey, data.sign, "Buy order placement", { + type: "buy_order", + asset: data.asset, + quantity: data.quantity, + max_price: data.max_price, + timestamp: data.timestamp + }, () => market.addBuyOrder(data.floID, data.asset, data.quantity, data.max_price)); +} + +function CancelOrder(req, res) { + let data = req.body; + processRequest(res, data.floID, data.pubKey, data.sign, "Order cancellation", { + type: "cancel_order", + order: data.orderType, + id: data.orderID, + timestamp: data.timestamp + }, () => market.cancelOrder(data.orderType, data.orderID, data.floID)); +} + +/*Currently not used +function TransferAsset(req, res) { + let data = req.body; + processRequest(res, data.floID, data.pubKey, data.sign, "Token Transfer", { + type: "transfer_token", + receiver: JSON.stringify(data.receiver), + asset: data.asset, + timestamp: data.timestamp + }, () => market.transferAsset(data.floID, data.receiver, data.asset)); +} +*/ + +function DepositAsset(req, res) { + let data = req.body; + processRequest(res, data.floID, data.pubKey, data.sign, "Deposit Asset", { + type: "deposit_asset", + txid: data.txid, + asset: data.asset, + timestamp: data.timestamp + }, () => market.depositAsset(data.floID, data.asset, data.txid)); +} + +function WithdrawAsset(req, res) { + let data = req.body; + processRequest(res, data.floID, data.pubKey, data.sign, "Withdraw Asset", { + type: "withdraw_asset", + asset: data.asset, + amount: data.amount, + timestamp: data.timestamp + }, () => market.withdrawAsset(data.floID, data.asset, data.amount)); +} + +function GetUserTransacts(req, res) { + let data = req.body; + processRequest(res, data.floID, data.pubKey, data.sign, "User Transacts", { + type: "get_transact", + timestamp: data.timestamp + }, () => market.getUserTransacts(data.floID)); +} + +function GenerateSink(req, res) { + let data = req.body; + if (data.floID !== floGlobals.adminID) + res.status(INVALID.e_code).send(INVALID.str(eCode.ACCESS_DENIED, "Access Denied")); + else if (!data.pubKey) + res.status(INVALID.e_code).send(INVALID.str(eCode.MISSING_PARAMETER, "Public key missing")); + else processRequest(res, data.floID, data.pubKey, data.sign, "Generate Sink", { + type: "generate_sink", + group: data.group, + timestamp: data.timestamp + }, () => sink.generate(data.group)); +} + +function ReshareSink(req, res) { + let data = req.body; + console.debug(data) + if (data.floID !== floGlobals.adminID) + res.status(INVALID.e_code).send(INVALID.str(eCode.ACCESS_DENIED, "Access Denied")); + else if (!data.pubKey) + res.status(INVALID.e_code).send(INVALID.str(eCode.MISSING_PARAMETER, "Public key missing")); + else if (!floCrypto.validateAddr(data.id)) + res.status(INVALID.e_code).send(INVALID.str(eCode.INVALID_VALUE, `Invalid ID ${data.id}`)); + else processRequest(res, data.floID, data.pubKey, data.sign, "Reshare Sink", { + type: "reshare_sink", + id: data.id, + timestamp: data.timestamp + }, () => sink.reshare(data.id)); +} + +function DiscardSink(req, res) { + let data = req.body; + if (data.floID !== floGlobals.adminID) + res.status(INVALID.e_code).send(INVALID.str(eCode.ACCESS_DENIED, "Access Denied")); + else if (!data.pubKey) + res.status(INVALID.e_code).send(INVALID.str(eCode.MISSING_PARAMETER, "Public key missing")); + else processRequest(res, data.floID, data.pubKey, data.sign, "Discard Sink", { + type: "discard_sink", + id: data.id, + timestamp: data.timestamp + }, () => sink.discard(data.id)); +} + +/* Public Requests */ + +function GetLoginCode(req, res) { + if (!serving) + res.status(INVALID.e_code).send(INCORRECT_SERVER_ERROR.toString()); + else { + let randID = floCrypto.randString(8, true) + Math.round(Date.now() / 1000); + let hash = Crypto.SHA1(randID + secret); + res.send({ + code: randID, + hash: hash + }); + } +} + +function ListSellOrders(req, res) { + if (!serving) + res.status(INVALID.e_code).send(INCORRECT_SERVER_ERROR.toString()); + else { + let asset = req.query.asset; + if (asset && !market.keys.assets.isTradeable(asset)) + res.status(INVALID.e_code).send(INVALID.str(eCode.INVALID_ASSET_NAME, "Invalid asset parameter")); + else + DB.query("SELECT SellOrder.floID, SellOrder.asset, SellOrder.price, SellOrder.quantity, SellOrder.time_placed FROM SellOrder" + + " INNER JOIN UserBalance ON UserBalance.floID = SellOrder.floID AND UserBalance.asset = SellOrder.asset" + + " WHERE UserBalance.quantity >= SellOrder.quantity" + + (asset ? " AND SellOrder.asset = ?" : "") + + //" GROUP BY SellOrder.id" + + " ORDER BY SellOrder.price ASC, SellOrder.time_placed ASC" + + " LIMIT 100", [asset || null]) + .then(result => res.send(result)) + .catch(error => { + console.error(error); + res.status(INTERNAL.e_code).send(INTERNAL.str("Try again later!")); + }); + } + +} + +function ListBuyOrders(req, res) { + if (!serving) + res.status(INVALID.e_code).send(INCORRECT_SERVER_ERROR.toString()); + else { + let asset = req.query.asset; + if (asset && !market.keys.assets.isTradeable(asset)) + res.status(INVALID.e_code).send(INVALID.str(eCode.INVALID_ASSET_NAME, "Invalid asset parameter")); + else + DB.query("SELECT BuyOrder.floID, BuyOrder.asset, BuyOrder.maxPrice, BuyOrder.quantity, BuyOrder.time_placed FROM BuyOrder" + + " INNER JOIN UserBalance ON UserBalance.floID = BuyOrder.floID AND UserBalance.asset = ?" + + " WHERE UserBalance.quantity >= BuyOrder.maxPrice * BuyOrder.quantity" + + (asset ? " AND BuyOrder.asset = ?" : "") + + //" GROUP BY BuyOrder.id" + + " ORDER BY BuyOrder.maxPrice DESC, BuyOrder.time_placed ASC" + + " LIMIT 100", [floGlobals.currency, asset || null]) + .then(result => res.send(result)) + .catch(error => { + console.error(error); + res.status(INTERNAL.e_code).send(INTERNAL.str("Try again later!")); + }); + } +} + +function ListTradeTransactions(req, res) { + if (!serving) + res.status(INVALID.e_code).send(INCORRECT_SERVER_ERROR.toString()); + else { + let asset = req.query.asset; + if (asset && !market.keys.assets.isTradeable(asset)) + res.status(INVALID.e_code).send(INVALID.str(eCode.INVALID_ASSET_NAME, "Invalid asset parameter")); + else + DB.query("SELECT * FROM TradeTransactions" + + (asset ? " WHERE asset = ?" : "") + + " ORDER BY tx_time DESC LIMIT 1000", [asset || null]) + .then(result => res.send(result)) + .catch(error => { + console.error(error); + res.status(INTERNAL.e_code).send(INTERNAL.str("Try again later!")); + }); + } +} + +function GetSink(req, res) { + if (!serving) + res.status(INVALID.e_code).send(INCORRECT_SERVER_ERROR.toString()); + else { + let service = req.query.service; + if (!service) + service = serviceList.TRADE; //By default use TRADE service + else if (!(Object.values(serviceList).includes(service))) + res.status(INVALID.e_code).send(INVALID.str(eCode.INVALID_VALUE, "Invalid service parameter")); + else { + let group; + switch (service) { + case serviceList.TRADE: group = keys.sink_groups.TRADE; break; + } + res.send(keys.sink_chest.active_pick(group)); + } + } +} + +function GetRateHistory(req, res) { + if (!serving) + res.status(INVALID.e_code).send(INCORRECT_SERVER_ERROR.toString()); + else { + let asset = req.query.asset, + duration = req.query.duration || ""; + market.getRateHistory(asset, duration) + .then(result => res.send(result)) + .catch(error => { + if (error instanceof INVALID) + res.status(INVALID.e_code).send(error.toString()); + else { + console.error(error); + res.status(INTERNAL.e_code).send(INTERNAL.str("Unable to process! Try again later!")); + } + }); + } +} + +/* +function GetTransaction(req, res) { + if (!serving) + res.status(INVALID.e_code).send(INCORRECT_SERVER_ERROR.toString()); + else { + let txid = req.query.txid; + if (!txid) + res.status(INVALID.e_code).send(INVALID.str(eCode.MISSING_PARAMETER, "txid (transactionID) parameter missing")); + else market.getTransactionDetails(txid) + .then(result => res.send(result)) + .catch(error => { + if (error instanceof INVALID) + res.status(INVALID.e_code).send(error.toString()); + else { + console.error(error); + res.status(INTERNAL.e_code).send(INTERNAL.str("Unable to process! Try again later!")); + } + }); + } +} +*/ + +/* +function GetBalance(req, res) { + if (!serving) + res.status(INVALID.e_code).send(INCORRECT_SERVER_ERROR.toString()); + else { + let floID = req.query.floID || req.query.addr, + asset = req.query.asset || req.query.asset; + market.getBalance(floID, asset) + .then(result => res.send(result)) + .catch(error => { + if (error instanceof INVALID) + res.status(INVALID.e_code).send(error.toString()); + else { + console.error(error); + res.status(INTERNAL.e_code).send(INTERNAL.str("Unable to process! Try again later!")); + } + }); + } +} +*/ + +module.exports = { + GetLoginCode, + Login, + Logout, + PlaceBuyOrder, + PlaceSellOrder, + CancelOrder, + //TransferAsset, + ListSellOrders, + ListBuyOrders, + ListTradeTransactions, + GetRateHistory, + //GetTransaction, + //GetBalance, + GetSink, + Account, + DepositAsset, + WithdrawAsset, + GetUserTransacts, + //admin control + GenerateSink, + ReshareSink, + DiscardSink, + + set secret(s) { + secret = s; + }, + + pause() { + serving = false; + background.periodicProcess.stop(); + }, + resume() { + serving = true; + background.periodicProcess.start(); + } +}; \ No newline at end of file diff --git a/src/set_globals.js b/src/set_globals.js new file mode 100644 index 0000000..0bb2d5e --- /dev/null +++ b/src/set_globals.js @@ -0,0 +1,28 @@ +'use strict'; +//fetch for node js (used in floBlockchainAPI.js) +global.fetch = require("node-fetch"); + +//Set browser paramaters from param.json (or param-default.json) +var param; +try { + param = require('../args/param.json'); +} catch { + param = require('../args/param-default.json'); +} finally { + for (let p in param) + global[p] = param[p]; +} + +global.toStandardDecimal = num => parseFloat((parseInt(num * 1e8) * 1e-8).toFixed(8)) + +if (!process.argv.includes("--debug")) + global.console.debug = () => null; + +/* +//Trace the debug logs in node js +var debug = console.debug; +console.debug = function() { + debug.apply(console, arguments); + console.trace(); +}; +*/ \ No newline at end of file diff --git a/start.js b/start.js new file mode 100644 index 0000000..32915f3 --- /dev/null +++ b/start.js @@ -0,0 +1,2 @@ +const start = require('./src/main'); +start(); \ No newline at end of file