diff --git a/args/schema.sql b/args/schema.sql index 0b7fcf4..a7746e4 100644 --- a/args/schema.sql +++ b/args/schema.sql @@ -285,7 +285,7 @@ CREATE TABLE DirectConvert( PRIMARY KEY(id) ); -CREATE TABLE RefundTransact( +CREATE TABLE RefundConvert( id INT NOT NULL AUTO_INCREMENT, floID CHAR(34) NOT NULL, asset_type TINYINT NOT NULL, @@ -317,12 +317,27 @@ CREATE table _backupCache( ); CREATE TABLE sinkShares( - floID CHAR(34) NOT NULL, + num INT NOT NULL AUTO_INCREMENT, share TEXT, time_stored TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(num) +); + +CREATE TABLE discardedSinks( + id INT AUTO_INCREMENT, + floID CHAR(34) NOT NULL, + discard_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + KEY(id), PRIMARY KEY(floID) ); +CREATE TRIGGER discardedSinks_I AFTER INSERT ON discardedSinks +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('discardedSinks', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT; +CREATE TRIGGER discardedSinks_U AFTER UPDATE ON discardedSinks +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('discardedSinks', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT; +CREATE TRIGGER discardedSinks_D AFTER DELETE ON discardedSinks +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('discardedSinks', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, u_time=DEFAULT; + CREATE TRIGGER RequestLog_I AFTER INSERT ON RequestLog FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RequestLog', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT; CREATE TRIGGER RequestLog_U AFTER UPDATE ON RequestLog @@ -400,12 +415,12 @@ FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('DirectConvert', NEW.id) O CREATE TRIGGER DirectConvert_D AFTER DELETE ON DirectConvert FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('DirectConvert', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, u_time=DEFAULT; -CREATE TRIGGER RefundTransact_I AFTER INSERT ON RefundTransact -FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundTransact', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT; -CREATE TRIGGER RefundTransact_U AFTER UPDATE ON RefundTransact -FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundTransact', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT; -CREATE TRIGGER RefundTransact_D AFTER DELETE ON RefundTransact -FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundTransact', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, u_time=DEFAULT; +CREATE TRIGGER RefundConvert_I AFTER INSERT ON RefundConvert +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundConvert', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT; +CREATE TRIGGER RefundConvert_U AFTER UPDATE ON RefundConvert +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundConvert', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT; +CREATE TRIGGER RefundConvert_D AFTER DELETE ON RefundConvert +FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundConvert', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, u_time=DEFAULT; CREATE TRIGGER UserTag_I AFTER INSERT ON UserTag FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserTag', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT; diff --git a/args/truncateAll.sql b/args/truncateAll.sql index b289f75..402a1df 100644 --- a/args/truncateAll.sql +++ b/args/truncateAll.sql @@ -18,7 +18,7 @@ TRUNCATE CloseBondTransact; TRUNCATE CloseFundTransact; TRUNCATE ConvertFund; TRUNCATE DirectConvert; -TRUNCATE RefundTransact; +TRUNCATE RefundConvert; /* Blockchain data */ TRUNCATE LastTx; diff --git a/src/_constants.js b/src/_constants.js index 9efffa8..5b248b2 100644 --- a/src/_constants.js +++ b/src/_constants.js @@ -6,10 +6,17 @@ module.exports = { SIGN_EXPIRE_TIME: 5 * 60 * 1000, //5 mins MAX_SESSION_TIMEOUT: 30 * 24 * 60 * 60 * 1000, //30 days }, - market: { + background: { PERIOD_INTERVAL: 5 * 60 * 1000, //5 min, WAIT_TIME: 2 * 60 * 1000, //2 mins, REQUEST_TIMEOUT: 24 * 60 * 60 * 1000, //1 day + }, + keys: { + SHARES_PER_NODE: 8, + SHARE_THRESHOLD: 50 / 100, //50% + DISCARD_COOLDOWN: 24 * 60 * 60 * 1000, //1 day + }, + market: { LAUNCH_SELLER_TAG: "launch-seller", MAXIMUM_LAUNCH_SELL_CHIPS: 100000, TRADE_HASH_PREFIX: "z1", @@ -29,7 +36,6 @@ module.exports = { MIN_FUND: 0.3 // 30% }, backup: { - SHARE_THRESHOLD: 50 / 100, //50% HASH_N_ROW: 100, BACKUP_INTERVAL: 5 * 60 * 1000, //5 min BACKUP_SYNC_TIMEOUT: 10 * 60 * 1000, //10 mins diff --git a/src/background.js b/src/background.js index 5962dc5..8b264ca 100644 --- a/src/background.js +++ b/src/background.js @@ -1,15 +1,22 @@ 'use strict'; + +const keys = require('./keys'); const blockchain = require('./blockchain'); const conversion_rates = require('./services/conversion').getRate; const bond_util = require('./services/bonds').util; const fund_util = require('./services/bobs-fund').util; const pCode = require('../docs/scripts/floExchangeAPI').processCode; const DB = require("./database"); +const coupling = require('./coupling'); +const price = require('./price'); +const { + PERIOD_INTERVAL, + REQUEST_TIMEOUT +} = require('./_constants')['background']; const { LAUNCH_SELLER_TAG, - MAXIMUM_LAUNCH_SELL_CHIPS, - REQUEST_TIMEOUT, + MAXIMUM_LAUNCH_SELL_CHIPS } = require('./_constants')["market"]; var assetList; //container and allowed assets @@ -20,7 +27,7 @@ const verifyTx = {}; function confirmDepositFLO() { DB.query("SELECT id, floID, txid FROM VaultTransactions WHERE mode=? AND asset=? AND asset_type=? AND r_status=?", [pCode.VAULT_MODE_DEPOSIT, "FLO", pCode.ASSET_TYPE_COIN, pCode.STATUS_PENDING]).then(results => { results.forEach(r => { - verifyTx.FLO(r.floID, r.txid).then(amount => { + verifyTx.FLO(r.floID, r.txid, keys.sink_groups.EXCHANGE).then(amount => { addSellChipsIfLaunchSeller(r.floID, amount).then(txQueries => { txQueries.push(updateBalance.add(r.floID, "FLO", amount)); txQueries.push(["UPDATE VaultTransactions SET r_status=?, amount=? WHERE id=?", [pCode.STATUS_SUCCESS, amount, r.id]]); @@ -38,7 +45,7 @@ function confirmDepositFLO() { }).catch(error => console.error(error)) } -verifyTx.FLO = function (sender, txid) { +verifyTx.FLO = function (sender, txid, group) { return new Promise((resolve, reject) => { floBlockchainAPI.getTx(txid).then(tx => { let vin_sender = tx.vin.filter(v => v.addr === sender) @@ -50,7 +57,7 @@ verifyTx.FLO = function (sender, txid) { return reject([false, "Transaction not included in any block yet"]); if (!tx.confirmations) return reject([false, "Transaction not confirmed yet"]); - let amount = tx.vout.reduce((a, v) => blockchain.chests.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); + let amount = tx.vout.reduce((a, v) => keys.sink_chest.includes(group, v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); if (amount == 0) return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes) else @@ -95,7 +102,7 @@ function addSellChipsIfLaunchSeller(floID, quantity) { function confirmDepositToken() { DB.query("SELECT id, floID, txid FROM VaultTransactions WHERE mode=? AND asset_type=? AND r_status=?", [pCode.VAULT_MODE_DEPOSIT, pCode.ASSET_TYPE_TOKEN, pCode.STATUS_PENDING]).then(results => { results.forEach(r => { - verifyTx.token(r.floID, r.txid).then(({ token, amount, flo_amount }) => { + verifyTx.token(r.floID, r.txid, keys.sink_groups.EXCHANGE).then(({ token, amount, flo_amount }) => { DB.query("SELECT id FROM VaultTransactions where floID=? AND mode=? AND asset=? AND asset_type=? AND txid=?", [r.floID, pCode.VAULT_MODE_DEPOSIT, "FLO", pCode.ASSET_TYPE_COIN, r.txid]).then(result => { let txQueries = []; //Add the FLO balance if necessary @@ -119,7 +126,7 @@ function confirmDepositToken() { }).catch(error => console.error(error)) } -verifyTx.token = function (sender, txid, currencyOnly = false) { +verifyTx.token = function (sender, txid, group, currencyOnly = false) { return new Promise((resolve, reject) => { floTokenAPI.getTx(txid).then(tx => { if (tx.parsedFloData.type !== "transfer") @@ -135,7 +142,7 @@ verifyTx.token = function (sender, txid, currencyOnly = false) { let vin_sender = tx.transactionDetails.vin.filter(v => v.addr === sender) if (!vin_sender.length) return reject([true, "Transaction not sent by the sender"]); - let flo_amount = tx.transactionDetails.vout.reduce((a, v) => blockchain.chests.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); + let flo_amount = tx.transactionDetails.vout.reduce((a, v) => keys.sink_chest.includes(group, v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); if (flo_amount == 0) return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes) else @@ -188,7 +195,7 @@ function confirmVaultWithdraw() { }).catch(error => console.error(error)); } -verifyTx.BTC = function (sender, txid) { +verifyTx.BTC = function (sender, txid, group) { return new Promise((resolve, reject) => { btcOperator.getTx(txid).then(tx => { let vin_sender = tx.inputs.filter(v => floCrypto.isSameAddr(v.address, sender)) @@ -201,7 +208,7 @@ verifyTx.BTC = function (sender, txid) { if (!tx.confirmations) return reject([false, "Transaction not confirmed yet"]); let amount = tx.outputs.reduce((a, v) => - blockchain.chests.includes(floCrypto.toFloID(v.address, { bech: [coinjs.bech32.version] })) ? a + parseFloat(v.value) : a, 0); + keys.sink_chest.includes(group, floCrypto.toFloID(v.address, { bech: [coinjs.bech32.version] })) ? a + parseFloat(v.value) : a, 0); if (amount == 0) return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes) else @@ -213,7 +220,7 @@ verifyTx.BTC = function (sender, txid) { function verifyConvert() { //Set all timeout convert request to refund mode (thus, asset will be refund if tx gets confirmed later) let req_timeout = new Date(Date.now() - REQUEST_TIMEOUT), - to_refund_sql = "INSERT INTO RefundTransact (floID, in_txid, asset_type, asset, r_status)" + + to_refund_sql = "INSERT INTO RefundConvert (floID, in_txid, asset_type, asset, r_status)" + " SELECT floID, in_txid, ? AS asset_type, ? AS asset, r_status" + " WHERE r_status=? AND locktime { results.forEach(r => { if (r.mode == pCode.CONVERT_MODE_GET) { - verifyTx.token(r.floID, r.in_txid, true).then(({ amount }) => { + verifyTx.token(r.floID, r.in_txid, keys.sink_groups.CONVERT, true).then(({ amount }) => { if (r.amount !== amount) throw ([true, "Transaction amount mismatched in blockchain"]); conversion_rates.BTC_INR().then(rate => { @@ -237,7 +244,7 @@ function verifyConvert() { .then(_ => null).catch(error => console.error(error)); }); } else if (r.mode == pCode.CONVERT_MODE_PUT) { - verifyTx.BTC(r.floID, r.in_txid).then(quantity => { + verifyTx.BTC(r.floID, r.in_txid, keys.sink_groups.CONVERT).then(quantity => { if (r.quantity !== quantity) throw ([true, "Transaction quantity mismatched in blockchain"]); conversion_rates.BTC_INR().then(rate => { @@ -293,7 +300,7 @@ function verifyConvertFundDeposit() { DB.query("SELECT id, mode, txid, coin FROM ConvertFund WHERE r_status=? AND coin=?", [pCode.STATUS_PROCESSING, "BTC"]).then(results => { results.forEach(r => { if (r.mode == pCode.CONVERT_MODE_GET) { //deposit currency - verifyTx.token(floGlobals.adminID, r.txid, true).then(({ amount }) => { + verifyTx.token(floGlobals.adminID, r.txid, keys.sink_groups.CONVERT, true).then(({ amount }) => { DB.query("UPDATE ConvertFund SET r_status=?, amount=? WHERE id=?", [pCode.STATUS_SUCCESS, amount, r.id]) .then(result => console.info(`Deposit-fund ${amount} ${floGlobals.currency} successful`)) .catch(error => console.error(error)); @@ -304,7 +311,7 @@ function verifyConvertFundDeposit() { .then(_ => null).catch(error => console.error(error)); }); } else if (r.mode == pCode.CONVERT_MODE_PUT) {//deposit coin - verifyTx.BTC(floGlobals.adminID, r.txid).then(quantity => { + verifyTx.BTC(floGlobals.adminID, r.txid, keys.sink_groups.CONVERT).then(quantity => { DB.query("UPDATE ConvertFund SET r_status=?, quantity=? WHERE id=?", [pCode.STATUS_SUCCESS, quantity, r.id]) .then(result => console.info(`Deposit-fund ${quantity} ${r.coin} successful`)) .catch(error => console.error(error)); @@ -354,67 +361,47 @@ function confirmConvertFundWithdraw() { }).catch(error => console.error(error)) } -function verifyRefund() { - DB.query("SELECT id, floID, asset_type, asset, in_txid FROM RefundTransact WHERE r_status=?", [pCode.STATUS_PENDING]).then(results => { +function verifyConvertRefund() { + DB.query("SELECT id, floID, asset_type, asset, in_txid FROM RefundConvert WHERE r_status=?", [pCode.STATUS_PENDING]).then(results => { results.forEach(r => { if (r.ASSET_TYPE_COIN) { - if (r.asset == "FLO") - verifyTx.FLO(r.floID, r.in_txid) - .then(amount => blockchain.refundTransact.init(r.floID, r.asset, amount, r.id)) + if (r.asset == "BTC") //Convert is only for BTC right now + verifyTx.BTC(r.floID, r.in_txid, keys.sink_groups.CONVERT) + .then(amount => blockchain.refundConvert.init(r.floID, r.asset, amount, r.id)) .catch(error => { console.error(error); if (error[0]) - DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id]) - .then(_ => null).catch(error => console.error(error)); - }); - else if (r.asset == "BTC") - verifyTx.BTC(r.floID, r.in_txid) - .then(amount => blockchain.refundTransact.init(r.floID, r.asset, amount, r.id)) - .catch(error => { - console.error(error); - if (error[0]) - DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id]) + DB.query("UPDATE RefundConvert SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id]) .then(_ => null).catch(error => console.error(error)); }); } else if (r.ASSET_TYPE_TOKEN) - verifyTx.token(r.floID, r.in_txid).then(({ token, amount }) => { - if (token !== r.asset) - throw ([true, "Transaction token mismatched"]); - else - blockchain.refundTransact.init(r.floID, token, amount, r.id); + verifyTx.token(r.floID, r.in_txid, keys.sink_groups.CONVERT, true).then(({ amount }) => { + blockchain.refundConvert.init(r.floID, floGlobals.currency, amount, r.id); }).catch(error => { console.error(error); if (error[0]) - DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id]) + DB.query("UPDATE RefundConvert SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id]) .then(_ => null).catch(error => console.error(error)); }); }) }).catch(error => console.error(error)) } -function retryRefund() { - DB.query("SELECT id, floID, asset, amount FROM RefundTransact WHERE r_status=?", [pCode.STATUS_PROCESSING]).then(results => { - results.forEach(r => blockchain.refundTransact.retry(r.floID, r.asset, r.amount, r.id)) +function retryConvertRefund() { + DB.query("SELECT id, floID, asset, amount FROM RefundConvert WHERE r_status=?", [pCode.STATUS_PROCESSING]).then(results => { + results.forEach(r => blockchain.refundConvert.retry(r.floID, r.asset, r.amount, r.id)) }).catch(error => console.error(error)) } -function confirmRefund() { - DB.query("SELECT * FROM RefundTransact WHERE r_status=?", [pCode.STATUS_CONFIRMATION]).then(results => { - results.forEach(r => { //TODO +function confirmConvertRefund() { + DB.query("SELECT * FROM RefundConvert WHERE r_status=?", [pCode.STATUS_CONFIRMATION]).then(results => { + results.forEach(r => { if (r.ASSET_TYPE_COIN) { - if (r.asset == "FLO") - floBlockchainAPI.getTx(r.out_txid).then(tx => { - if (!tx.blockheight || !tx.confirmations) //Still not confirmed - return; - DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id]) - .then(result => console.info(`Refunded ${r.amount} ${r.asset} to ${r.floID}`)) - .catch(error => console.error(error)) - }).catch(error => console.error(error)); - else if (r.asset == "BTC") + if (r.asset == "BTC") //Convert is only for BTC right now btcOperator.getTx(r.out_txid).then(tx => { if (!tx.blockhash || !tx.confirmations) //Still not confirmed return; - DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id]) + DB.query("UPDATE RefundConvert SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id]) .then(result => console.info(`Refunded ${r.amount} ${r.asset} to ${r.floID}`)) .catch(error => console.error(error)) }).catch(error => console.error(error)); @@ -422,7 +409,7 @@ function confirmRefund() { floTokenAPI.getTx(r.out_txid).then(tx => { if (!tx.transactionDetails.blockheight || !tx.transactionDetails.confirmations) //Still not confirmed return; - DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id]) + DB.query("UPDATE RefundConvert SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id]) .then(result => console.info(`Refunded ${r.amount} ${r.asset} to ${r.floID}`)) .catch(error => console.error(error)); }).catch(error => console.error(error)); @@ -443,7 +430,7 @@ function confirmBondClosing() { if (!tx.blockhash || !tx.confirmations) //Still not confirmed return; let closeBondString = bond_util.stringify.end(r.bond_id, r.end_date, r.btc_net, r.usd_net, r.amount, r.ref_sign, r.txid); - floBlockchainAPI.writeData(global.myFloID, closeBondString, global.myPrivKey, bond_util.config.adminID).then(txid => { + floBlockchainAPI.writeData(keys.node_id, closeBondString, keys.node_priv, bond_util.config.adminID).then(txid => { DB.query("UPDATE CloseBondTransact SET r_status=?, close_id=? WHERE id=?", [pCode.STATUS_SUCCESS, txid, r.id]) .then(result => console.info("Bond closed:", r.bond_id)) .catch(error => console.error(error)); @@ -466,7 +453,7 @@ function confirmFundClosing() { if (!tx.blockhash || !tx.confirmations) //Still not confirmed return; let closeFundString = fund_util.stringify.end(r.fund_id, r.floID, r.end_date, r.btc_net, r.usd_net, r.amount, r.ref_sign, r.txid); - floBlockchainAPI.writeData(global.myFloID, closeFundString, global.myPrivKey, fund_util.config.adminID).then(txid => { + floBlockchainAPI.writeData(keys.node_id, closeFundString, keys.node_priv, fund_util.config.adminID).then(txid => { DB.query("UPDATE CloseFundTransact SET r_status=?, close_id=? WHERE id=?", [pCode.STATUS_SUCCESS, txid, r.id]) .then(result => console.info("Fund investment closed:", r.fund_id, r.floID)) .catch(error => console.error(error)); @@ -476,34 +463,75 @@ function confirmFundClosing() { }).catch(error => console.error(error)) } +//Periodic Process + function processAll() { //deposit-withdraw asset balance - confirmDepositFLO(); - confirmDepositToken(); - retryVaultWithdrawal(); - confirmVaultWithdraw(); + if (keys.sink_chest.list(keys.sink_groups.EXCHANGE).length) { + confirmDepositFLO(); + confirmDepositToken(); + retryVaultWithdrawal(); + confirmVaultWithdraw(); + } //convert service - verifyConvert(); - retryConvert(); - confirmConvert(); - verifyConvertFundDeposit(); - retryConvertFundWithdraw(); - confirmConvertFundWithdraw(); + if (keys.sink_chest.list(keys.sink_groups.CONVERT).length) { + verifyConvert(); + retryConvert(); + confirmConvert(); + verifyConvertFundDeposit(); + retryConvertFundWithdraw(); + confirmConvertFundWithdraw(); + verifyConvertRefund(); + retryConvertRefund(); + confirmConvertRefund(); + } //blockchain-bond service - retryBondClosing(); - confirmBondClosing(); + if (keys.sink_chest.list(keys.sink_groups.BLOCKCHAIN_BONDS).length) { + retryBondClosing(); + confirmBondClosing(); + } //bob's fund service - retryFundClosing(); - confirmFundClosing(); - //refund transactions - verifyRefund(); - retryRefund(); - confirmRefund(); + if (keys.sink_chest.list(keys.sink_groups.EXCHANGE).length) { + retryFundClosing(); + confirmFundClosing(); + } +} + +var lastSyncBlockHeight = 0; + +function periodicProcess() { + floBlockchainAPI.promisedAPI('api/blocks?limit=1').then(result => { + if (lastSyncBlockHeight < result.blocks[0].height) { + lastSyncBlockHeight = result.blocks[0].height; + processAll(); + console.log("Last Block :", lastSyncBlockHeight); + } + }).catch(error => console.error(error)); +} + +function periodicProcess_start() { + periodicProcess_stop(); + periodicProcess(); + assetList.forEach(asset => coupling.initiate(asset, true)); + price.storeHistory.start(); + periodicProcess.instance = setInterval(periodicProcess, PERIOD_INTERVAL); +} + +function periodicProcess_stop() { + if (periodicProcess.instance !== undefined) { + clearInterval(periodicProcess.instance); + delete periodicProcess.instance; + } + coupling.stopAll(); + price.storeHistory.stop(); } module.exports = { blockchain, - process: processAll, + periodicProcess: { + start: periodicProcess_start, + stop: periodicProcess_stop + }, set assetList(assets) { assetList = assets; }, diff --git a/src/backup/head.js b/src/backup/head.js index 7647388..5ec9b8e 100644 --- a/src/backup/head.js +++ b/src/backup/head.js @@ -1,93 +1,74 @@ 'use strict'; +const keys = require('../keys'); const K_Bucket = require('../../docs/scripts/floExchangeAPI').K_Bucket; const slave = require('./slave'); const sync = require('./sync'); const WebSocket = require('ws'); -const DB = require("../database"); -const { - SHARE_THRESHOLD -} = require("../_constants")["backup"]; +const { BACKUP_INTERVAL } = require("../_constants")["backup"]; +const { DISCARD_COOLDOWN } = require("../_constants")["keys"]; -var app, wss, tokenList; //Container for app and wss +var _app, _wss, tokenList; //Container for app and wss var nodeList, nodeURL, nodeKBucket; //Container for (backup) node list const connectedSlaves = {}, shares_collected = {}, - shares_pending = {}; + shares_pending = {}, + discarded_sinks = []; + var _mode = null; const SLAVE_MODE = 0, MASTER_MODE = 1; -const sinkList = {}; - -const chests = { - get list() { - return Object.keys(sinkList); - }, - get activeList() { - let result = []; - for (let id in sinkList) - if (sinkList[id]) - result.push(id); - return result; - }, - includes(id) { - return (id in sinkList); - }, - isActive(id) { - return (id in sinkList && sinkList[id] !== null); - }, - get pick() { - let sinks = Object.keys(sinkList), - i = floCrypto.randInt(0, sinks.length); - return sinks[i]; - } -}; - //Shares function generateShares(sinkKey) { - let nextNodes = nodeKBucket.nextNode(global.myFloID, null), + let nextNodes = nodeKBucket.nextNode(keys.node_id, null), aliveNodes = Object.keys(connectedSlaves); - nextNodes.unshift(global.myFloID); - aliveNodes.unshift(global.myFloID); - let N = nextNodes.length + 1, - th = Math.ceil(aliveNodes.length * SHARE_THRESHOLD) + 1, - shares, refShare, mappedShares = {}; - shares = floCrypto.createShamirsSecretShares(sinkKey, N, th); - refShare = shares.pop(); + nextNodes.unshift(keys.node_id); + aliveNodes.unshift(keys.node_id); + let shares, mappedShares = {}; + shares = keys.generateShares(sinkKey, nextNodes.length, aliveNodes.length); for (let i in nextNodes) - mappedShares[nextNodes[i]] = [refShare, shares[i]].join("|"); + mappedShares[nextNodes[i]] = shares[i]; return mappedShares; } -function sendShare(ws, sinkID, keyShare) { - if (shares_pending[sinkID][ws.floID] === keyShare) //this should always be true unless there is an error - delete shares_pending[sinkID][ws.floID]; //delete the share after sending it to respective slave - ws.send(JSON.stringify({ +function sendShares(ws, sinkID) { + if (!(sinkID in shares_pending) || !(ws.floID in shares_pending[sinkID].shares)) + return; + let { ref, group } = shares_pending[sinkID], + shares = shares_pending[sinkID].shares[ws.floID]; + delete shares_pending[sinkID].shares[ws.floID]; //delete the share after sending it to respective slave + shares.forEach(s => ws.send(JSON.stringify({ command: "SINK_SHARE", - sinkID, - keyShare: floCrypto.encryptData(keyShare, ws.pubKey) - })); + sinkID, ref, group, + share: floCrypto.encryptData(s, ws.pubKey) + }))); } -function sendSharesToNodes(sinkID, shares) { - shares_pending[sinkID] = shares; +function sendSharesToNodes(sinkID, group, shares) { + if (discarded_sinks.includes(sinkID)) { //sinkID is discarded, abort the new shares + let i = discarded_sinks.findIndex(sinkID); + discarded_sinks.splice(i, 1); + return; + } + let ref = Date.now(); + shares_pending[sinkID] = { shares, group, ref }; + if (keys.node_id in shares) { + shares.forEach(s => keys.addShare(group, sinkID, ref, s)) + delete shares_pending[sinkID].shares[keys.node_id]; + } for (let node in shares) if (node in connectedSlaves) - sendShare(connectedSlaves[node], sinkID, shares[node]); - if (global.myFloID in shares) { - slave.storeShare(sinkID, shares[global.myFloID], false); - delete shares_pending[sinkID][global.myFloID]; - } - sinkList[sinkID] = Date.now(); + sendShares(connectedSlaves[node], sinkID); + keys.sink_chest.set_id(group, sinkID, ref); } -function requestShare(ws, sinkID) { +function requestShare(ws, group, sinkID) { ws.send(JSON.stringify({ command: "SEND_SHARE", - sinkID: sinkID, - pubKey: global.myPubKey + group, sinkID, + pubKey: keys.node_pub })); } @@ -126,20 +107,16 @@ function transferMoneyToNewSink(oldSinkID, oldSinkKey, newSink) { } */ -function collectAndCall(sinkID, callback, timeout = null) { +function collectAndCall(group, sinkID, callback, timeout = null) { if (!(callback instanceof Function)) throw Error("callback should be a function"); if (!(sinkID in shares_collected)) { //if not already collecting shares for sinkID, then initiate collection - shares_collected[sinkID] = { - callbacks: [], - shares: {} - }; + shares_collected[sinkID] = { group, ref: 0, callbacks: [], shares: {} }; for (let floID in connectedSlaves) - requestShare(connectedSlaves[floID], sinkID); - DB.query("SELECT share FROM sinkShares WHERE floID=?", [sinkID]).then(result => { - if (result.length) - collectShares(myFloID, sinkID, Crypto.AES.decrypt(result[0].share, global.myPrivKey)) - }).catch(error => console.error(error)) + requestShare(connectedSlaves[floID], group, sinkID); + keys.getShares(group, sinkID) + .then(({ ref, shares }) => shares.forEach(s => collectShares(sinkID, ref, s))) + .catch(error => console.error(error)) } shares_collected[sinkID].callbacks.push(callback); if (timeout) @@ -153,14 +130,22 @@ function collectAndCall(sinkID, callback, timeout = null) { collectAndCall.isAlive = (sinkID, callbackRef) => (sinkID in shares_collected && shares_collected[sinkID].callbacks.indexOf(callbackRef) != -1); -function collectShares(floID, sinkID, share) { +function collectShares(sinkID, ref, share) { if (_mode !== MASTER_MODE) return console.warn("Not serving as master"); if (!(sinkID in shares_collected)) return console.error("Something is wrong! Slaves are sending sinkID thats not been collected"); - shares_collected[sinkID].shares[floID] = share.split("|"); + if (shares_collected[sinkID].ref > ref) + return console.debug("Received expired share"); + else if (shares_collected[sinkID].ref < ref) { + shares_collected[sinkID].ref = ref; + shares_collected[sinkID].shares = []; + } + if (shares_collected[sinkID].shares.includes(share)) + return console.debug("Received duplicate share"); + shares_collected[sinkID].shares.push(share); try { - let sinkKey = floCrypto.retrieveShamirSecret([].concat(...Object.values(shares_collected[sinkID].shares))); + let sinkKey = floCrypto.retrieveShamirSecret(shares_collected[sinkID].shares); if (floCrypto.verifyPrivKey(sinkKey, sinkID)) { console.debug("Shares collected successfully for", sinkID); shares_collected[sinkID].callbacks.forEach(fn => fn instanceof Function ? fn(sinkKey) : null); @@ -182,11 +167,11 @@ function connectWS(floID) { function connectToMaster(i = 0, init = false) { if (i >= nodeList.length) { - console.error("No master is found, and myFloID is not in list. This should not happen!"); + console.error("No master is found and Node not in list. This should not happen!"); process.exit(1); } let floID = nodeList[i]; - if (floID === myFloID) + if (floID === keys.node_id) serveAsMaster(init); else connectWS(floID).then(ws => { @@ -199,32 +184,16 @@ function connectToMaster(i = 0, init = false) { }); } -//Node becomes master -function serveAsMaster(init) { - console.info('Starting master process'); - slave.stop(); - _mode = MASTER_MODE; - informLiveNodes(init); - app.resume(); -} - -function serveAsSlave(ws, init) { - console.info('Starting slave process'); - app.pause(); - slave.start(ws, init); - _mode = SLAVE_MODE; -} - function informLiveNodes(init) { let message = { - floID: global.myFloID, + floID: keys.node_id, type: "UPDATE_MASTER", - pubKey: global.myPubKey, + pubKey: keys.node_pub, req_time: Date.now() }; - message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey); + message.sign = floCrypto.signData(message.type + "|" + message.req_time, keys.node_priv); message = JSON.stringify(message); - let nodes = nodeList.filter(n => n !== global.myFloID); + let nodes = nodeList.filter(n => n !== keys.node_id); Promise.allSettled(nodes.map(n => connectWS(n))).then(result => { let flag = false; for (let i in result) @@ -237,20 +206,29 @@ function informLiveNodes(init) { console.warn(`Node(${nodes[i]}) is offline`); if (init && flag) syncRequest(); - DB.query("SELECT floID, share FROM sinkShares ORDER BY time_stored DESC").then(result => { - if (result.length) - result.forEach(r => reconstructShares(r.floID)); - else if (!flag) { + keys.getStoredList().then(result => { + if (Object.keys(result).length) { + keys.getDiscardedList().then(discarded_list => { + for (let group in result) + result.group.forEach(id => { + if (!(id in discarded_list)) + reconstructShares(group, id); + }); + }).catch(error => console.error(error)) + } else if (!flag) { console.log("Starting the exchange..."); - let newSink = floCrypto.generateNewID(); - console.debug("Generated sink:", newSink.floID, newSink.privKey); - sendSharesToNodes(newSink.floID, generateShares(newSink.privKey)); + //generate a sinkID for each group in starting list + keys.sink_groups.generate_list.forEach(group => { + let newSink = floCrypto.generateNewID(); + console.debug("Generated sink:", group, newSink.floID); + sendSharesToNodes(newSink.floID, group, generateShares(newSink.privKey)); + }) } }).catch(error => console.error(error)); }); } -function syncRequest(cur = global.myFloID) { +function syncRequest(cur = keys.node_id) { //Sync data from next available node let nextNode = nodeKBucket.nextNode(cur); if (!nextNode) @@ -261,16 +239,16 @@ function syncRequest(cur = global.myFloID) { } function updateMaster(floID) { - let currentMaster = _mode === MASTER_MODE ? global.myFloID : slave.masterWS.floID; + let currentMaster = _mode === MASTER_MODE ? keys.node_id : slave.masterWS.floID; if (nodeList.indexOf(floID) < nodeList.indexOf(currentMaster)) connectToMaster(); } -function reconstructShares(sinkID) { +function reconstructShares(group, sinkID) { if (_mode !== MASTER_MODE) return console.warn("Not serving as master"); - sinkList[sinkID] = null; - collectAndCall(sinkID, sinkKey => sendSharesToNodes(sinkID, generateShares(sinkKey))); + keys.sink_chest.set_id(group, sinkID, null); + collectAndCall(group, sinkID, sinkKey => sendSharesToNodes(sinkID, group, generateShares(sinkKey))); } function slaveConnect(floID, pubKey, ws, sinks) { @@ -282,29 +260,82 @@ function slaveConnect(floID, pubKey, ws, sinks) { //Send shares if need to be delivered for (let sinkID in shares_pending) - if (floID in shares_pending[sinkID]) - sendShare(ws, sinkID, shares_pending[floID]); + if (floID in shares_pending[sinkID].shares) + sendShares(ws, sinkID); //Request shares if any for (let sinkID in shares_collected) - requestShare(ws, sinkID); //if (!(floID in shares_collected[sinkID].shares)) + requestShare(ws, shares_collected[sinkID].group, sinkID); //check if sinks in slaves are present - if (Array.isArray(sinks)) - for (let sinkID of sinks) - if (!(sinkID in sinkList)) - reconstructShares(sinkID); - /* - if (shares_pending === null || //The 1st backup is connected - Object.keys(connectedSlaves).length < Math.pow(SHARE_THRESHOLD, 2) * Object.keys(shares_pending).length) //re-calib shares for better - sendSharesToNodes(sinkID, generateShares(sinkPrivKey)) - */ + if (Array.isArray(sinks)) { + for (let sinkID of sinks) { + if (!keys.sink_chest.includes(shares_collected[sinkID].group, sinkID)) + keys.checkIfDiscarded(sinkID) + .then(result => result === false ? reconstructShares(shares_collected[sinkID].group, sinkID) : null) + .catch(error => console.error(error)) + } + } +} + +function checkForDiscardedSinks() { + let cur_time = Date.now(), + all_sinks = keys.sink_chest.get_all(); + for (let group in all_sinks) + all_sinks[group].forEach(id => keys.checkIfDiscarded(id).then(result => { + console.debug(group, id); //Check if group is correctly mapped, or if its changed by loop + if (result != false) { + if (cur_time - result > DISCARD_COOLDOWN) + keys.sink_chest.rm_id(group, id); + else + keys.sink_chest.set_id(group, id, null); + if (sinkID in shares_collected && !discarded_sinks.includes(sinkID)) + discarded_sinks.push(sinkID); + } + }).catch(error => console.debug(error))) +} + +//Master interval process +function intervalProcess() { + checkForDiscardedSinks(); +} + +intervalProcess.start = () => { + intervalProcess.stop(); + intervalProcess.instance = setInterval(intervalProcess, BACKUP_INTERVAL); +} + +intervalProcess.stop = () => { + if (intervalProcess.instance !== undefined) { + clearInterval(intervalProcess.instance); + delete intervalProcess.instance; + } +} + +//Node becomes master +function serveAsMaster(init) { + console.info('Starting master process'); + slave.stop(); + _mode = MASTER_MODE; + keys.sink_chest.reset(); + intervalProcess.start(); + informLiveNodes(init); + _app.resume(); +} + +//Node becomes slave +function serveAsSlave(ws, init) { + console.info('Starting slave process'); + intervalProcess.stop(); + _app.pause(); + slave.start(ws, init); + _mode = SLAVE_MODE; } //Transmistter function startBackupTransmitter(server) { - wss = new WebSocket.Server({ + _wss = new WebSocket.Server({ server }); - wss.on('connection', ws => { + _wss.on('connection', ws => { ws.on('message', message => { //verify if from a backup node try { @@ -335,7 +366,7 @@ function startBackupTransmitter(server) { slaveConnect(request.floID, request.pubKey, ws, request.sinks); break; case "SINK_SHARE": - collectShares(request.floID, request.sinkID, floCrypto.decryptData(request.share, global.myPrivKey)) + collectShares(request.sinkID, request.ref, floCrypto.decryptData(request.share, keys.node_priv)) default: invalid = "Invalid Request Type"; } @@ -361,16 +392,15 @@ function startBackupTransmitter(server) { }); } -function initProcess(a) { - app = a; - app.chests = chests; - app.collectAndCall = collectAndCall; - startBackupTransmitter(app.server); +function initProcess(app) { + _app = app; + startBackupTransmitter(_app.server); connectToMaster(0, true); } module.exports = { init: initProcess, + collectAndCall, set nodeList(list) { nodeURL = list; nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL)); @@ -383,6 +413,6 @@ module.exports = { tokenList = assets.filter(a => a.toUpperCase() !== "FLO"); }, get wss() { - return wss; + return _wss; } }; \ No newline at end of file diff --git a/src/backup/slave.js b/src/backup/slave.js index 55eae2f..689a016 100644 --- a/src/backup/slave.js +++ b/src/backup/slave.js @@ -1,5 +1,6 @@ 'use strict'; +const keys = require("../keys"); const DB = require("../database"); const { @@ -21,18 +22,23 @@ function startSlaveProcess(ws, init) { ws.on('message', processDataFromMaster); masterWS = ws; let sinks_stored = []; - DB.query("SELECT floID FROM sinkShares").then(result => { - sinks_stored = result.map(r => r.floID); + Promise.all([keys.getStoredList(), keys.getDiscardedList()]).then(result => { + let stored_list = result[0], + discarded_list = result[1]; + for (let g in stored_list) + for (let id in stored_list[g]) + if (!(stored_list[g][id] in discarded_list)) + sinks_stored.push(id); }).catch(error => console.error(error)).finally(_ => { //inform master let message = { - floID: global.myFloID, - pubKey: global.myPubKey, + floID: keys.node_id, + pubKey: keys.node_pub, sinks: sinks_stored, req_time: Date.now(), type: "SLAVE_CONNECT" } - message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey); + message.sign = floCrypto.signData(message.type + "|" + message.req_time, keys.node_priv); ws.send(JSON.stringify(message)); //start sync if (init) @@ -58,14 +64,14 @@ function requestBackupSync(checksum_trigger, ws) { return new Promise((resolve, reject) => { DB.query('SELECT MAX(u_time) as last_time FROM _backup').then(result => { let request = { - floID: global.myFloID, - pubKey: global.myPubKey, + floID: keys.node_id, + pubKey: keys.node_pub, type: "BACKUP_SYNC", last_time: result[0].last_time, checksum: checksum_trigger, req_time: Date.now() }; - request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey); + request.sign = floCrypto.signData(request.type + "|" + request.req_time, keys.node_priv); ws.send(JSON.stringify(request)); resolve(request); }).catch(error => reject(error)) @@ -134,10 +140,10 @@ function processDataFromMaster(message) { processBackupData(message); else switch (message.command) { case "SINK_SHARE": - storeSinkShare(message.sinkID, message.keyShare); + storeSinkShare(message.group, message.sinkID, message.share, message.ref); break; case "SEND_SHARE": - sendSinkShare(message.sinkID, message.pubKey); + sendSinkShare(message.group, message.sinkID, message.pubKey); break; case "REQUEST_ERROR": console.log(message.error); @@ -150,30 +156,26 @@ function processDataFromMaster(message) { } } -function storeSinkShare(sinkID, keyShare, decrypt = true) { - if (decrypt) - keyShare = floCrypto.decryptData(keyShare, global.myPrivKey) - let encryptedShare = Crypto.AES.encrypt(keyShare, global.myPrivKey); - console.debug(Date.now(), '|sinkID:', sinkID, '|EnShare:', encryptedShare); - DB.query("INSERT INTO sinkShares (floID, share) VALUE (?) ON DUPLICATE KEY UPDATE share=?", [[sinkID, encryptedShare], encryptedShare]) +function storeSinkShare(group, sinkID, share, ref) { + share = floCrypto.decryptData(share, keys.node_priv); + keys.addShare(group, sinkID, ref, share) .then(_ => null).catch(error => console.error(error)); } -function sendSinkShare(sinkID, pubKey) { - DB.query("SELECT share FROM sinkShares WHERE floID=?", [sinkID]).then(result => { - if (!result.length) - return console.warn(`key-shares for ${sinkID} not found in database!`); - let share = Crypto.AES.decrypt(result[0].share, global.myPrivKey); - let response = { - type: "SINK_SHARE", - sinkID: sinkID, - share: floCrypto.encryptData(share, pubKey), - floID: global.myFloID, - pubKey: global.myPubKey, - req_time: Date.now() - } - response.sign = floCrypto.signData(response.type + "|" + response.req_time, global.myPrivKey); //TODO: strengthen signature - masterWS.send(JSON.stringify(response)); +function sendSinkShare(group, sinkID, pubKey) { + keys.getShares(group, sinkID).then(({ ref, shares }) => { + shares.forEach(s => { + let response = { + type: "SINK_SHARE", + sinkID, ref, + share: floCrypto.encryptData(s, pubKey), + floID: keys.node_id, + pubKey: keys.node_pub, + req_time: Date.now() + } + response.sign = floCrypto.signData(response.type + "|" + response.req_time, keys.node_priv); //TODO: strengthen signature + masterWS.send(JSON.stringify(response)); + }) }).catch(error => console.error(error)); } @@ -356,13 +358,13 @@ function requestHash(tables) { //TODO: resync only necessary data (instead of entire table) let self = requestInstance; let request = { - floID: global.myFloID, - pubKey: global.myPubKey, + floID: keys.node_id, + pubKey: keys.node_pub, type: "HASH_SYNC", tables: tables, req_time: Date.now() }; - request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey); + request.sign = floCrypto.signData(request.type + "|" + request.req_time, keys.node_priv); self.ws.send(JSON.stringify(request)); self.request = request; self.checksum = null; @@ -411,13 +413,13 @@ function verifyHash(hashes) { function requestTableChunks(tables, ws) { let request = { - floID: global.myFloID, - pubKey: global.myPubKey, + floID: keys.node_id, + pubKey: keys.node_pub, type: "RE_SYNC", tables: tables, req_time: Date.now() }; - request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey); + request.sign = floCrypto.signData(request.type + "|" + request.req_time, keys.node_priv); ws.send(JSON.stringify(request)); } @@ -427,6 +429,5 @@ module.exports = { }, start: startSlaveProcess, stop: stopSlaveProcess, - storeShare: storeSinkShare, syncRequest: ws => requestInstance.open(ws) } \ No newline at end of file diff --git a/src/blockchain.js b/src/blockchain.js index 138f70e..a20e4ef 100644 --- a/src/blockchain.js +++ b/src/blockchain.js @@ -1,17 +1,25 @@ 'use strict'; const pCode = require('../docs/scripts/floExchangeAPI').processCode; +const { collectAndCall } = require('./backup/head'); +const keys = require('./keys'); const DB = require("./database"); -var collectAndCall; //container for collectAndCall function from backup module -var chests; //container for blockchain ids (where assets are stored) - const TYPE_VAULT = "VAULT", TYPE_CONVERT = "CONVERT", TYPE_CONVERT_POOL = "CONVERT_POOL", - TYPE_REFUND = "REFUND", - TYPE_BOND = "BOND", - TYPE_FUND = "BOB-FUND"; + TYPE_CONVERT_REFUND = "REFUND", + TYPE_BLOCKCHAIN_BOND = "BOND", + TYPE_BOBS_FUND = "BOB-FUND"; + +const SINK_GROUP = { + [TYPE_VAULT]: keys.sink_groups.EXCHANGE, + [TYPE_CONVERT]: keys.sink_groups.CONVERT, + [TYPE_CONVERT_POOL]: keys.sink_groups.CONVERT, + [TYPE_CONVERT_REFUND]: keys.sink_groups.CONVERT, + [TYPE_BLOCKCHAIN_BOND]: keys.sink_groups.BLOCKCHAIN_BONDS, + [TYPE_BOBS_FUND]: keys.sink_groups.BOBS_FUND +} const balance_locked = {}, balance_cache = {}, @@ -19,9 +27,9 @@ const balance_locked = {}, [TYPE_VAULT]: {}, [TYPE_CONVERT]: {}, [TYPE_CONVERT_POOL]: {}, - [TYPE_REFUND]: {}, - [TYPE_BOND]: {}, - [TYPE_FUND]: {} + [TYPE_CONVERT_REFUND]: {}, + [TYPE_BLOCKCHAIN_BOND]: {}, + [TYPE_BOBS_FUND]: {} }; function getBalance(sinkID, asset) { @@ -36,10 +44,10 @@ function getBalance(sinkID, asset) { } } -function getSinkID(quantity, asset, sinkList = null) { +function getSinkID(type, quantity, asset, sinkList = null) { return new Promise((resolve, reject) => { if (!sinkList) - sinkList = chests.list.map(s => [s, s in balance_cache ? balance_cache[s][asset] || 0 : 0]) //TODO: improve sorting + sinkList = keys.sink_chest.list(SINK_GROUP[type]).map(s => [s, s in balance_cache ? balance_cache[s][asset] || 0 : 0]) //TODO: improve sorting .sort((a, b) => b[1] - a[1]).map(x => x[0]); if (!sinkList.length) return reject(`Insufficient balance in chests for asset(${asset})`); @@ -51,12 +59,12 @@ function getSinkID(quantity, asset, sinkList = null) { if (balance > (quantity + (sinkID in balance_locked ? balance_locked[sinkID][asset] || 0 : 0))) return resolve(sinkID); else - getSinkID(quantity, asset, sinkList) + getSinkID(type, quantity, asset, sinkList) .then(result => resolve(result)) .catch(error => reject(error)) }).catch(error => { console.error(error); - getSinkID(quantity, asset, sinkList) + getSinkID(type, quantity, asset, sinkList) .then(result => resolve(result)) .catch(error => reject(error)) }); @@ -67,9 +75,9 @@ const WITHDRAWAL_MESSAGE = { [TYPE_VAULT]: "(withdrawal from market)", [TYPE_CONVERT]: "(convert coin)", [TYPE_CONVERT_POOL]: "(convert fund)", - [TYPE_REFUND]: "(refund from market)", - [TYPE_BOND]: "(bond closing)", - [TYPE_FUND]: "(fund investment closing)" + [TYPE_CONVERT_REFUND]: "(refund from market)", + [TYPE_BLOCKCHAIN_BOND]: "(bond closing)", + [TYPE_BOBS_FUND]: "(fund investment closing)" } function sendTx(floID, asset, quantity, sinkID, sinkKey, message) { @@ -89,14 +97,14 @@ const updateSyntax = { [TYPE_VAULT]: "UPDATE VaultTransactions SET r_status=?, txid=? WHERE id=?", [TYPE_CONVERT]: "UPDATE DirectConvert SET r_status=?, out_txid=? WHERE id=?", [TYPE_CONVERT_POOL]: "UPDATE ConvertFund SET r_status=?, txid=? WHERE id=?", - [TYPE_REFUND]: "UPDATE RefundTransact SET r_status=?, out_txid=? WHERE id=?", - [TYPE_BOND]: "UPDATE CloseBondTransact SET r_status=?, txid=? WHERE id=?", - [TYPE_FUND]: "UPDATE CloseFundTransact SET r_status=?, txid=? WHERE id=?" + [TYPE_CONVERT_REFUND]: "UPDATE RefundConvert SET r_status=?, out_txid=? WHERE id=?", + [TYPE_BLOCKCHAIN_BOND]: "UPDATE CloseBondTransact SET r_status=?, txid=? WHERE id=?", + [TYPE_BOBS_FUND]: "UPDATE CloseFundTransact SET r_status=?, txid=? WHERE id=?" }; function sendAsset(floID, asset, quantity, type, id) { quantity = global.toStandardDecimal(quantity); - getSinkID(quantity, asset).then(sinkID => { + getSinkID(type, quantity, asset).then(sinkID => { let callback = (sinkKey) => { //Send asset to user via API sendTx(floID, asset, quantity, sinkID, sinkKey, WITHDRAWAL_MESSAGE[type]).then(txid => { @@ -110,7 +118,7 @@ function sendAsset(floID, asset, quantity, type, id) { balance_locked[sinkID][asset] -= quantity; }); } - collectAndCall(sinkID, callback); + collectAndCall(sinkID, callback); //TODO: add timeout to prevent infinite wait callbackCollection[type][id] = callback; if (!(sinkID in balance_locked)) balance_locked[sinkID] = {}; @@ -165,39 +173,30 @@ function convertFundWithdraw_retry(asset, amount, id) { } function bondTransact_retry(floID, amount, btc_rate, usd_rate, id) { - if (id in callbackCollection[TYPE_BOND]) + if (id in callbackCollection[TYPE_BLOCKCHAIN_BOND]) console.debug("A callback is already pending for this Bond closing"); - else sendAsset(floID, "BTC", amount / (btc_rate * usd_rate), TYPE_BOND, id); + else sendAsset(floID, "BTC", amount / (btc_rate * usd_rate), TYPE_BLOCKCHAIN_BOND, id); } function fundTransact_retry(floID, amount, btc_rate, usd_rate, id) { - if (id in callbackCollection[TYPE_FUND]) + if (id in callbackCollection[TYPE_BOBS_FUND]) console.debug("A callback is already pending for this Fund investment closing"); - else sendAsset(floID, "BTC", amount / (btc_rate * usd_rate), TYPE_FUND, id); + else sendAsset(floID, "BTC", amount / (btc_rate * usd_rate), TYPE_BOBS_FUND, id); } -function refundTransact_init(floID, asset, amount, id) { +function refundConvert_init(floID, asset, amount, id) { amount = global.toStandardDecimal(amount); - DB.query("UPDATE RefundTransact SET amount=?, r_status=?, locktime=DEFAULT WHERE id=?", [amount, pCode.STATUS_PROCESSING, id]) - .then(result => sendAsset(floID, asset, amount, TYPE_REFUND, id)) + DB.query("UPDATE RefundConvert SET amount=?, r_status=?, locktime=DEFAULT WHERE id=?", [amount, pCode.STATUS_PROCESSING, id]) + .then(result => sendAsset(floID, asset, amount, TYPE_CONVERT_REFUND, id)) .catch(error => console.error(error)) } -function refundTransact_retry(floID, asset, amount, id) { - if (id in callbackCollection[TYPE_REFUND]) +function refundConvert_retry(floID, asset, amount, id) { + if (id in callbackCollection[TYPE_CONVERT_REFUND]) console.debug("A callback is already pending for this Refund"); - else sendAsset(floID, asset, amount, TYPE_REFUND, id); + else sendAsset(floID, asset, amount, TYPE_CONVERT_REFUND, id); } module.exports = { - set collectAndCall(fn) { - collectAndCall = fn; - }, - get chests() { - return chests; - }, - set chests(c) { - chests = c; - }, withdrawAsset: { init: withdrawAsset_init, retry: withdrawAsset_retry @@ -219,8 +218,8 @@ module.exports = { fundTransact: { retry: fundTransact_retry }, - refundTransact: { - init: refundTransact_init, - retry: refundTransact_retry + refundConvert: { + init: refundConvert_init, + retry: refundConvert_retry } } \ No newline at end of file diff --git a/src/keys.js b/src/keys.js new file mode 100644 index 0000000..db599e5 --- /dev/null +++ b/src/keys.js @@ -0,0 +1,380 @@ +'use strict'; + +const fs = require('fs'); +const path = require('path'); +const DB = require("./database"); + +const { SHARES_PER_NODE, SHARE_THRESHOLD } = require("./_constants")["keys"]; + +const PRIV_EKEY_MIN = 32, + PRIV_EKEY_MAX = 48, + PRIME_FILE_TYPE = 'binary', + INDEX_FILE_TYPE = 'utf-8', + INT_MIN = -2147483648, + INT_MAX = 2147483647, + INDEX_FILE_NAME_LENGTH = 16, + INDEX_FILE_EXT = '.txt', + MIN_DUMMY_FILES = 16, + MAX_DUMMY_FILES = 24, + MIN_DUMMY_SIZE_MUL = 0.5, + MAX_DUMMY_SIZE_MUL = 1.5, + SIZE_FACTOR = 100; + +var node_priv, e_key, node_id, node_pub; //containers for node-key wrapper +const _ = { + get node_priv() { + if (!node_priv || !e_key) + throw Error("keys not set"); + return Crypto.AES.decrypt(node_priv, e_key); + }, + set node_priv(key) { + node_pub = floCrypto.getPubKeyHex(key); + node_id = floCrypto.getFloID(node_pub); + if (!key || !node_pub || !node_id) + throw Error("Invalid Keys"); + let n = floCrypto.randInt(PRIV_EKEY_MIN, PRIV_EKEY_MAX) + e_key = floCrypto.randString(n); + node_priv = Crypto.AES.encrypt(key, e_key); + }, + args_dir: path.resolve(__dirname, '..', '..', 'args'), + index_dir: path.join(this.args_dir, 'indexes'), + prime_file: path.join(this.args_dir, 'prime_index.b'), + get index_file() { + try { + let data = fs.readFileSync(this.prime_file, PRIME_FILE_TYPE), + fname = Crypto.AES.decrypt(data, this.node_priv); + return path.join(this.index_dir, fname + INDEX_FILE_EXT); + } catch (error) { + console.debug(error); + throw Error("Prime-Index Missing/Corrupted"); + } + } +} + +function initialize() { + return new Promise((resolve, reject) => { + fs.readFile(_.prime_file, PRIME_FILE_TYPE, (err, res) => { + var data, cur_filename, new_filename, priv_key; + try { + priv_key = _.node_priv; + } catch (error) { + return reject(error); + } + if (!err) { + if (res.length) { //prime file not empty + try { + cur_filename = Crypto.AES.decrypt(res, priv_key); + } catch (error) { + console.debug(error); + return reject("Prime file corrupted"); + } try { //read data from index file + let tmp = fs.readFileSync(path.join(_.index_dir, cur_filename + INDEX_FILE_EXT), INDEX_FILE_TYPE); + tmp = Crypto.AES.decrypt(tmp, priv_key); + JSON.parse(tmp); //check if data is JSON parse-able + data = tmp; + } catch (error) { + console.debug(error); + return reject("Index file corrupted"); + } + } + } + try { //delete all old dummy files + let files = fs.readdirSync(_.index_dir); + for (const file of files) + if (!cur_filename || file !== cur_filename + INDEX_FILE_EXT) //check if file is current file + fs.unlinkSync(path.join(_.index_dir, file)); + } catch (error) { + console.debug(error); + return reject("Clear index directory failed"); + } try { //create files (dummy and new index file) + let N = floCrypto.randInt(MIN_DUMMY_FILES, MAX_DUMMY_FILES), + k = floCrypto.randInt(0, N); + if (typeof data === 'undefined' || data.length == 0) //no existing data, initialize + data = JSON.stringify({}); + let data_size = data.length; + for (let i = 0; i <= N; i++) { + let f_data, f_name = floCrypto.randString(INDEX_FILE_NAME_LENGTH); + if (i == k) { + new_filename = f_name; + f_data = data; + } else { + let d_size = data_size * (floCrypto.randInt(MIN_DUMMY_SIZE_MUL * SIZE_FACTOR, MAX_DUMMY_SIZE_MUL * SIZE_FACTOR) / SIZE_FACTOR); + f_data = floCrypto.randString(d_size, false); + } + f_data = Crypto.AES.encrypt(f_data, priv_key); + fs.writeFileSync(path.join(_.index_dir, f_name + INDEX_FILE_EXT), f_data, INDEX_FILE_TYPE); + } + } catch (error) { + console.debug(error); + return reject("Index file creation failed"); + } try { //update prime file + let en_filename = Crypto.AES.encrypt(new_filename, priv_key); + fs.writeFileSync(_.prime_file, en_filename, PRIME_FILE_TYPE); + } catch (error) { + console.debug(error); + return reject("Update prime file failed"); + } + if (cur_filename) + fs.unlink(path.join(_.index_dir, file), err => err ? console.debug(err) : null); + resolve("Key management initiated"); + }) + }) +} + +function shuffle() { + readIndexFile().then(data => { + let new_filename, cur_filename = Crypto.AES.decrypt(fs.readFileSync(_.prime_file, PRIME_FILE_TYPE), _.node_priv); + fs.readdir(_.index_dir, (err, files) => { + if (err) + return console.error(err); + data = JSON.stringify(data); + let data_size = data.length; + for (let file of files) { + let f_data, f_name = floCrypto.randString(INDEX_FILE_NAME_LENGTH); + if (file === cur_filename) { + new_filename = f_name; + f_data = data; + } else { + let d_size = data_size * (floCrypto.randInt(MIN_DUMMY_SIZE_MUL * SIZE_FACTOR, MAX_DUMMY_SIZE_MUL * SIZE_FACTOR) / SIZE_FACTOR); + f_data = floCrypto.randString(d_size, false); + } + f_data = Crypto.AES.encrypt(f_data, _.node_priv); + //rename and rewrite the file + fs.renameSync(path.join(_.args_dir, file + INDEX_FILE_EXT), path.join(_.index_dir, f_name + INDEX_FILE_EXT)); + fs.writeFileSync(path.join(_.index_dir, f_name + INDEX_FILE_EXT), f_data, INDEX_FILE_TYPE); + } + //update prime file + if (!new_filename) + throw Error("Index file has not been renamed"); + let en_filename = Crypto.AES.encrypt(new_filename, _.node_priv); + fs.writeFileSync(_.prime_file, en_filename, PRIME_FILE_TYPE); + }) + }).catch(error => console.error(error)) +} + +function readIndexFile() { + return new Promise((resolve, reject) => { + fs.readFile(_.index_file, INDEX_FILE_TYPE, (err, data) => { + if (err) { + console.debug(err); + return reject('Unable to read Index file'); + } + try { + data = JSON.parse(Crypto.AES.decrypt(data, _.node_priv)); + resolve(data); + } catch { + reject("Index file corrupted"); + } + }) + }) +} + +function writeIndexFile(data) { + return new Promise((resolve, reject) => { + let en_data = Crypto.AES.encrypt(JSON.stringify(data), _.node_priv); + fs.writeFile(_.index_file, en_data, INDEX_FILE_TYPE, (err) => { + if (err) { + console.debug(err); + return reject('Unable to write Index file'); + } else resolve("Updated Index file"); + }) + }) +} + +function getShares(group, id, ignoreDiscarded = true) { + return new Promise((resolve, reject) => { + checkIfDiscarded(id).then(result => { + if (ignoreDiscarded && result != false) + return reject("Trying to get share for discarded ID"); + readIndexFile().then(data => { + if (!(group in data)) + reject("Group not found in Index file"); + else if (!(id in data[group])) + reject("ID not found in Index file"); + else { + let ref = data[group][id].shift(); + DB.query("SELECT share FROM sinkShares WHERE num IN (?)", [data[group][id]]) + .then(result => resolve({ ref, shares: result.map(r => Crypto.AES.decrypt(r.share, _.node_priv)) })) + .catch(error => reject(error)) + } + }).catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + +function storeShareAtRandom(share) { + return new Promise((resolve, reject) => { + let rand = floCrypto.randInt(INT_MIN, INT_MAX); + DB.query("INSERT INTO sinkShares(num, share) VALUE (?)", [[rand, share]]) + .then(result => resolve(result.insertId)).catch(error => { + if (error.code === "ER_DUP_ENTRY") + storeShareAtRandom(share) //try again (with diff rand_num) + .then(result => resolve(result)) + .catch(error => reject(error)) + else + reject(error); + }) + }) +} + +function addShare(group, id, ref, share) { + return new Promise((resolve, reject) => { + checkIfDiscarded(id).then(result => { + if (result != false) + return reject("Trying to store share for discarded ID"); + readIndexFile().then(data => { + if (!(group in data)) + data[group] = {}; + if (!(id in data[group])) + data[group][id] = [ref]; + else if (ref < data[group][id][0]) + return reject("reference is lower than current"); + else if (ref > data[group][id][0]) { + let old_shares = data[group][id]; + data[group][id] = [ref]; + old_shares.shift(); + DB.query("DELETE FROM sinkShares WHERE num in (?", [old_shares])//delete old shares + .then(_ => null).catch(error => console.error(error)); + } + let encrypted_share = Crypto.AES.encrypt(share, _.node_priv); + console.debug(ref, '|sinkID:', sinkID, '|EnShare:', encrypted_share); + storeShareAtRandom(encrypted_share).then(i => { + data[group][id].push(i); + writeIndexFile(data).then(_ => resolve(i)).catch(error => reject(error)); + }).catch(error => reject(error)) + }).catch(error => reject(error)) + }).catch(error => reject(error)) + }) +} + +function generateShares(sinkKey, total_n, min_n) { + let shares = floCrypto.createShamirsSecretShares(sinkKey, total_n * SHARES_PER_NODE, min_n * SHARES_PER_NODE * SHARE_THRESHOLD); + let node_shares = Array(total_n); + for (let i = 0; i < total_n; i++) + node_shares[i] = shares.splice(0, SHARES_PER_NODE); + return node_shares; +} + +function getStoredList(group = null) { + return new Promise((resolve, reject) => { + readIndexFile().then(data => { + if (group !== null) { + if (group in data) + resolve(Object.keys(data.group)); + else + reject("Group not found in Index file"); + } else { + let ids = {}; + for (let group in data) + ids[group] = Object.keys(data.group); + resolve(ids); + } + }).catch(error => reject(error)) + }) +} + +function getDiscardedList() { + return new Promise((resolve, reject) => { + DB.query("SELECT floID, discard_time FROM discardedSinks") + .then(result => resolve(Object.fromEntries(result.map(r => [r.floID, r.discard_time])))) + .catch(error => reject(error)) + }) +} + +function checkIfDiscarded(id) { + return new Promise((resolve, reject) => { + DB.query("SELECT discard_time FROM discardedSinks WHERE floID=?", [id]) + .then(result => resolve(result.length ? result[0].discard_time : false)) + .catch(error => reject(error)) + }) +} + +//Sink groups and chest +const sink_groups = { + get EXCHANGE() { return "exchange" }, + get CONVERT() { return "convert" }, + get BLOCKCHAIN_BONDS() { return "blockchain_bonds" }, + get BOBS_FUND() { return "bobs_fund" }, + get generate_list() { //list to generate when starting exchange + return [this.EXCHANGE, this.CONVERT] + } +}; + +const sink_ids = {}, sink_chest = { + reset() { + for (let i in sink_ids) + delete sink_ids[i]; + }, + set_id(group, id, value) { + if (!(group in sink_ids)) + sink_ids[group] = {}; + sink_ids[group][id] = value; + }, + rm_id(group, id) { + return delete sink_ids[group][id]; + }, + get_id(group, id) { + return sink_ids[group][id]; + }, + list(group) { + return Object.keys(sink_ids[group] || {}); + }, + active_list(group) { + let ids = []; + if (group in sink_ids) + for (let id in sink_ids[group]) + if (sink_ids[group][id]) + ids.push(id); + return ids; + }, + includes(group, id) { + return group in sink_ids ? (id in sink_ids[group]) : null; + }, + isActive(group, id) { + return group in sink_ids ? (id in sink_ids && sink_ids[id]) : null; + }, + pick(group) { + let ids = this.list(group), + i = floCrypto.randInt(0, ids.length); + return ids[i]; + }, + active_pick(group) { + let ids = this.active_list(group), + i = floCrypto.randInt(0, ids.length); + return ids[i]; + }, + get_all() { + let ids = {}; + for (let g in sink_ids) + ids[g] = Object.keys(sink_ids[g]) + return ids; + } +}; + +module.exports = { + init: initialize, + getShares, + addShare, + generateShares, + getStoredList, + getDiscardedList, + checkIfDiscarded, + set node_priv(key) { + _.node_priv = key; + }, + get node_priv() { + return _.node_priv; + }, + get node_id() { + return node_id; + }, + get node_id() { + return node_pub; + }, + get sink_groups() { + return sink_groups; + }, + get sink_chest() { + return sink_chest; + } +} \ No newline at end of file diff --git a/src/main.js b/src/main.js index 26bf63b..97d2d77 100644 --- a/src/main.js +++ b/src/main.js @@ -13,6 +13,7 @@ global.btcOperator = require('../docs/scripts/btcOperator'); floGlobals.application = application; })(); +const keys = require('./keys'); const DB = require("./database"); const App = require('./app'); @@ -178,17 +179,13 @@ module.exports = function startServer() { console.error('Password not entered!'); process.exit(1); } - global.myPrivKey = Crypto.AES.decrypt(_tmp, _pass); - global.myPubKey = floCrypto.getPubKeyHex(global.myPrivKey); - global.myFloID = floCrypto.getFloID(global.myPubKey); - if (!global.myFloID || !global.myPubKey || !global.myPrivKey) - throw "Invalid Keys"; + keys.node_priv = Crypto.AES.decrypt(_tmp, _pass); } catch (error) { console.error('Unable to load private key!'); process.exit(1); } - console.log("Logged in as", global.myFloID); + console.log("Logged in as", keys.node_id); DB.connect(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(result => { app = new App(config['secret']); diff --git a/src/market.js b/src/market.js index f23c04f..d865dae 100644 --- a/src/market.js +++ b/src/market.js @@ -1,14 +1,12 @@ 'use strict'; const coupling = require('./coupling'); +const price = require("./price"); const background = require('./background'); const DB = require("./database"); - -const blockchain = background.blockchain; +const blockchain = require('./blockchain'); const { - PERIOD_INTERVAL, - WAIT_TIME, TRADE_HASH_PREFIX, TRANSFER_HASH_PREFIX } = require('./_constants')["market"]; @@ -41,7 +39,7 @@ function getRateHistory(asset, duration) { if (!asset || !assetList.includes(asset)) reject(INVALID(eCode.INVALID_TOKEN_NAME, `Invalid asset(${asset})`)); else - coupling.price.getHistory(asset, duration) + price.getHistory(asset, duration) .then(result => resolve(result)) .catch(error => reject(error)) }) @@ -469,58 +467,14 @@ function checkDistributor(floID, asset) { }) } -//Periodic Process - -function periodicProcess_start() { - periodicProcess_stop(); - periodicProcess(); - assetList.forEach(asset => coupling.initiate(asset, true)); - coupling.price.storeHistory.start(); - periodicProcess.instance = setInterval(periodicProcess, PERIOD_INTERVAL); -} - -function periodicProcess_stop() { - if (periodicProcess.instance !== undefined) { - clearInterval(periodicProcess.instance); - delete periodicProcess.instance; - } - coupling.stopAll(); - coupling.price.storeHistory.stop(); -}; - -var lastSyncBlockHeight = 0; - -function periodicProcess() { - if (periodicProcess.timeout !== undefined) { - clearTimeout(periodicProcess.timeout); - delete periodicProcess.timeout; - } - if (!blockchain.chests.list.length) - return periodicProcess.timeout = setTimeout(periodicProcess, WAIT_TIME); - - floBlockchainAPI.promisedAPI('api/blocks?limit=1').then(result => { - if (lastSyncBlockHeight < result.blocks[0].height) { - lastSyncBlockHeight = result.blocks[0].height; - background.process(); - console.log("Last Block :", lastSyncBlockHeight); - } - }).catch(error => console.error(error)); -} - module.exports = { login, logout, get rates() { - return coupling.price.currentRates; + return price.currentRates; }, get priceCountDown() { - return coupling.price.lastTimes; - }, - get chests() { - return blockchain.chests; - }, - set chests(c) { - blockchain.chests = c; + return price.lastTimes; }, addBuyOrder, addSellOrder, @@ -539,18 +493,11 @@ module.exports = { removeTag, addDistributor, removeDistributor, - periodicProcess: { - start: periodicProcess_start, - stop: periodicProcess_stop - }, set assetList(assets) { assetList = assets; background.assetList = assets; }, get assetList() { return assetList - }, - set collectAndCall(fn) { - blockchain.collectAndCall = fn; - }, + } }; \ No newline at end of file