Upgrade Keys module

Keys module
- Private keys of node and sinks (shares) are managed by keys module
- sink_chest is moved here
- Seperate sinkIDs for each service

Sink Private key security improvements
- Shares are stored as index in SQL
- Indexes are mapped with sinkIDs in file system
- File is shuffled on interval
- Every file data is stored in encrypted format

Other changes
- changes required for the above
- RefundTransact is now dedicated to Convert service only (as every service has diff sinkID)
- moved periodicProcess to background.js
This commit is contained in:
sairajzero 2022-11-15 03:18:23 +05:30
parent 178fcf4ce2
commit 6c0237b0b5
10 changed files with 761 additions and 358 deletions

View File

@ -285,7 +285,7 @@ CREATE TABLE DirectConvert(
PRIMARY KEY(id)
);
CREATE TABLE RefundTransact(
CREATE TABLE RefundConvert(
id INT NOT NULL AUTO_INCREMENT,
floID CHAR(34) NOT NULL,
asset_type TINYINT NOT NULL,
@ -317,12 +317,27 @@ CREATE table _backupCache(
);
CREATE TABLE sinkShares(
floID CHAR(34) NOT NULL,
num INT NOT NULL AUTO_INCREMENT,
share TEXT,
time_stored TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(num)
);
CREATE TABLE discardedSinks(
id INT AUTO_INCREMENT,
floID CHAR(34) NOT NULL,
discard_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
KEY(id),
PRIMARY KEY(floID)
);
CREATE TRIGGER discardedSinks_I AFTER INSERT ON discardedSinks
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('discardedSinks', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT;
CREATE TRIGGER discardedSinks_U AFTER UPDATE ON discardedSinks
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('discardedSinks', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT;
CREATE TRIGGER discardedSinks_D AFTER DELETE ON discardedSinks
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('discardedSinks', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, u_time=DEFAULT;
CREATE TRIGGER RequestLog_I AFTER INSERT ON RequestLog
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RequestLog', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT;
CREATE TRIGGER RequestLog_U AFTER UPDATE ON RequestLog
@ -400,12 +415,12 @@ FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('DirectConvert', NEW.id) O
CREATE TRIGGER DirectConvert_D AFTER DELETE ON DirectConvert
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('DirectConvert', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, u_time=DEFAULT;
CREATE TRIGGER RefundTransact_I AFTER INSERT ON RefundTransact
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundTransact', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT;
CREATE TRIGGER RefundTransact_U AFTER UPDATE ON RefundTransact
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundTransact', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT;
CREATE TRIGGER RefundTransact_D AFTER DELETE ON RefundTransact
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundTransact', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, u_time=DEFAULT;
CREATE TRIGGER RefundConvert_I AFTER INSERT ON RefundConvert
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundConvert', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT;
CREATE TRIGGER RefundConvert_U AFTER UPDATE ON RefundConvert
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundConvert', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT;
CREATE TRIGGER RefundConvert_D AFTER DELETE ON RefundConvert
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('RefundConvert', OLD.id) ON DUPLICATE KEY UPDATE mode=NULL, u_time=DEFAULT;
CREATE TRIGGER UserTag_I AFTER INSERT ON UserTag
FOR EACH ROW INSERT INTO _backup (t_name, id) VALUES ('UserTag', NEW.id) ON DUPLICATE KEY UPDATE mode=TRUE, u_time=DEFAULT;

View File

@ -18,7 +18,7 @@ TRUNCATE CloseBondTransact;
TRUNCATE CloseFundTransact;
TRUNCATE ConvertFund;
TRUNCATE DirectConvert;
TRUNCATE RefundTransact;
TRUNCATE RefundConvert;
/* Blockchain data */
TRUNCATE LastTx;

View File

@ -6,10 +6,17 @@ module.exports = {
SIGN_EXPIRE_TIME: 5 * 60 * 1000, //5 mins
MAX_SESSION_TIMEOUT: 30 * 24 * 60 * 60 * 1000, //30 days
},
market: {
background: {
PERIOD_INTERVAL: 5 * 60 * 1000, //5 min,
WAIT_TIME: 2 * 60 * 1000, //2 mins,
REQUEST_TIMEOUT: 24 * 60 * 60 * 1000, //1 day
},
keys: {
SHARES_PER_NODE: 8,
SHARE_THRESHOLD: 50 / 100, //50%
DISCARD_COOLDOWN: 24 * 60 * 60 * 1000, //1 day
},
market: {
LAUNCH_SELLER_TAG: "launch-seller",
MAXIMUM_LAUNCH_SELL_CHIPS: 100000,
TRADE_HASH_PREFIX: "z1",
@ -29,7 +36,6 @@ module.exports = {
MIN_FUND: 0.3 // 30%
},
backup: {
SHARE_THRESHOLD: 50 / 100, //50%
HASH_N_ROW: 100,
BACKUP_INTERVAL: 5 * 60 * 1000, //5 min
BACKUP_SYNC_TIMEOUT: 10 * 60 * 1000, //10 mins

View File

@ -1,15 +1,22 @@
'use strict';
const keys = require('./keys');
const blockchain = require('./blockchain');
const conversion_rates = require('./services/conversion').getRate;
const bond_util = require('./services/bonds').util;
const fund_util = require('./services/bobs-fund').util;
const pCode = require('../docs/scripts/floExchangeAPI').processCode;
const DB = require("./database");
const coupling = require('./coupling');
const price = require('./price');
const {
PERIOD_INTERVAL,
REQUEST_TIMEOUT
} = require('./_constants')['background'];
const {
LAUNCH_SELLER_TAG,
MAXIMUM_LAUNCH_SELL_CHIPS,
REQUEST_TIMEOUT,
MAXIMUM_LAUNCH_SELL_CHIPS
} = require('./_constants')["market"];
var assetList; //container and allowed assets
@ -20,7 +27,7 @@ const verifyTx = {};
function confirmDepositFLO() {
DB.query("SELECT id, floID, txid FROM VaultTransactions WHERE mode=? AND asset=? AND asset_type=? AND r_status=?", [pCode.VAULT_MODE_DEPOSIT, "FLO", pCode.ASSET_TYPE_COIN, pCode.STATUS_PENDING]).then(results => {
results.forEach(r => {
verifyTx.FLO(r.floID, r.txid).then(amount => {
verifyTx.FLO(r.floID, r.txid, keys.sink_groups.EXCHANGE).then(amount => {
addSellChipsIfLaunchSeller(r.floID, amount).then(txQueries => {
txQueries.push(updateBalance.add(r.floID, "FLO", amount));
txQueries.push(["UPDATE VaultTransactions SET r_status=?, amount=? WHERE id=?", [pCode.STATUS_SUCCESS, amount, r.id]]);
@ -38,7 +45,7 @@ function confirmDepositFLO() {
}).catch(error => console.error(error))
}
verifyTx.FLO = function (sender, txid) {
verifyTx.FLO = function (sender, txid, group) {
return new Promise((resolve, reject) => {
floBlockchainAPI.getTx(txid).then(tx => {
let vin_sender = tx.vin.filter(v => v.addr === sender)
@ -50,7 +57,7 @@ verifyTx.FLO = function (sender, txid) {
return reject([false, "Transaction not included in any block yet"]);
if (!tx.confirmations)
return reject([false, "Transaction not confirmed yet"]);
let amount = tx.vout.reduce((a, v) => blockchain.chests.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0);
let amount = tx.vout.reduce((a, v) => keys.sink_chest.includes(group, v.scriptPubKey.addresses[0]) ? a + v.value : a, 0);
if (amount == 0)
return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes)
else
@ -95,7 +102,7 @@ function addSellChipsIfLaunchSeller(floID, quantity) {
function confirmDepositToken() {
DB.query("SELECT id, floID, txid FROM VaultTransactions WHERE mode=? AND asset_type=? AND r_status=?", [pCode.VAULT_MODE_DEPOSIT, pCode.ASSET_TYPE_TOKEN, pCode.STATUS_PENDING]).then(results => {
results.forEach(r => {
verifyTx.token(r.floID, r.txid).then(({ token, amount, flo_amount }) => {
verifyTx.token(r.floID, r.txid, keys.sink_groups.EXCHANGE).then(({ token, amount, flo_amount }) => {
DB.query("SELECT id FROM VaultTransactions where floID=? AND mode=? AND asset=? AND asset_type=? AND txid=?", [r.floID, pCode.VAULT_MODE_DEPOSIT, "FLO", pCode.ASSET_TYPE_COIN, r.txid]).then(result => {
let txQueries = [];
//Add the FLO balance if necessary
@ -119,7 +126,7 @@ function confirmDepositToken() {
}).catch(error => console.error(error))
}
verifyTx.token = function (sender, txid, currencyOnly = false) {
verifyTx.token = function (sender, txid, group, currencyOnly = false) {
return new Promise((resolve, reject) => {
floTokenAPI.getTx(txid).then(tx => {
if (tx.parsedFloData.type !== "transfer")
@ -135,7 +142,7 @@ verifyTx.token = function (sender, txid, currencyOnly = false) {
let vin_sender = tx.transactionDetails.vin.filter(v => v.addr === sender)
if (!vin_sender.length)
return reject([true, "Transaction not sent by the sender"]);
let flo_amount = tx.transactionDetails.vout.reduce((a, v) => blockchain.chests.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0);
let flo_amount = tx.transactionDetails.vout.reduce((a, v) => keys.sink_chest.includes(group, v.scriptPubKey.addresses[0]) ? a + v.value : a, 0);
if (flo_amount == 0)
return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes)
else
@ -188,7 +195,7 @@ function confirmVaultWithdraw() {
}).catch(error => console.error(error));
}
verifyTx.BTC = function (sender, txid) {
verifyTx.BTC = function (sender, txid, group) {
return new Promise((resolve, reject) => {
btcOperator.getTx(txid).then(tx => {
let vin_sender = tx.inputs.filter(v => floCrypto.isSameAddr(v.address, sender))
@ -201,7 +208,7 @@ verifyTx.BTC = function (sender, txid) {
if (!tx.confirmations)
return reject([false, "Transaction not confirmed yet"]);
let amount = tx.outputs.reduce((a, v) =>
blockchain.chests.includes(floCrypto.toFloID(v.address, { bech: [coinjs.bech32.version] })) ? a + parseFloat(v.value) : a, 0);
keys.sink_chest.includes(group, floCrypto.toFloID(v.address, { bech: [coinjs.bech32.version] })) ? a + parseFloat(v.value) : a, 0);
if (amount == 0)
return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes)
else
@ -213,7 +220,7 @@ verifyTx.BTC = function (sender, txid) {
function verifyConvert() {
//Set all timeout convert request to refund mode (thus, asset will be refund if tx gets confirmed later)
let req_timeout = new Date(Date.now() - REQUEST_TIMEOUT),
to_refund_sql = "INSERT INTO RefundTransact (floID, in_txid, asset_type, asset, r_status)" +
to_refund_sql = "INSERT INTO RefundConvert (floID, in_txid, asset_type, asset, r_status)" +
" SELECT floID, in_txid, ? AS asset_type, ? AS asset, r_status" +
" WHERE r_status=? AND locktime<? AND mode=?";
let txQueries = [];
@ -224,7 +231,7 @@ function verifyConvert() {
DB.query("SELECT id, floID, mode, in_txid, amount, quantity FROM DirectConvert WHERE r_status=? AND coin=?", [pCode.STATUS_PENDING, "BTC"]).then(results => {
results.forEach(r => {
if (r.mode == pCode.CONVERT_MODE_GET) {
verifyTx.token(r.floID, r.in_txid, true).then(({ amount }) => {
verifyTx.token(r.floID, r.in_txid, keys.sink_groups.CONVERT, true).then(({ amount }) => {
if (r.amount !== amount)
throw ([true, "Transaction amount mismatched in blockchain"]);
conversion_rates.BTC_INR().then(rate => {
@ -237,7 +244,7 @@ function verifyConvert() {
.then(_ => null).catch(error => console.error(error));
});
} else if (r.mode == pCode.CONVERT_MODE_PUT) {
verifyTx.BTC(r.floID, r.in_txid).then(quantity => {
verifyTx.BTC(r.floID, r.in_txid, keys.sink_groups.CONVERT).then(quantity => {
if (r.quantity !== quantity)
throw ([true, "Transaction quantity mismatched in blockchain"]);
conversion_rates.BTC_INR().then(rate => {
@ -293,7 +300,7 @@ function verifyConvertFundDeposit() {
DB.query("SELECT id, mode, txid, coin FROM ConvertFund WHERE r_status=? AND coin=?", [pCode.STATUS_PROCESSING, "BTC"]).then(results => {
results.forEach(r => {
if (r.mode == pCode.CONVERT_MODE_GET) { //deposit currency
verifyTx.token(floGlobals.adminID, r.txid, true).then(({ amount }) => {
verifyTx.token(floGlobals.adminID, r.txid, keys.sink_groups.CONVERT, true).then(({ amount }) => {
DB.query("UPDATE ConvertFund SET r_status=?, amount=? WHERE id=?", [pCode.STATUS_SUCCESS, amount, r.id])
.then(result => console.info(`Deposit-fund ${amount} ${floGlobals.currency} successful`))
.catch(error => console.error(error));
@ -304,7 +311,7 @@ function verifyConvertFundDeposit() {
.then(_ => null).catch(error => console.error(error));
});
} else if (r.mode == pCode.CONVERT_MODE_PUT) {//deposit coin
verifyTx.BTC(floGlobals.adminID, r.txid).then(quantity => {
verifyTx.BTC(floGlobals.adminID, r.txid, keys.sink_groups.CONVERT).then(quantity => {
DB.query("UPDATE ConvertFund SET r_status=?, quantity=? WHERE id=?", [pCode.STATUS_SUCCESS, quantity, r.id])
.then(result => console.info(`Deposit-fund ${quantity} ${r.coin} successful`))
.catch(error => console.error(error));
@ -354,67 +361,47 @@ function confirmConvertFundWithdraw() {
}).catch(error => console.error(error))
}
function verifyRefund() {
DB.query("SELECT id, floID, asset_type, asset, in_txid FROM RefundTransact WHERE r_status=?", [pCode.STATUS_PENDING]).then(results => {
function verifyConvertRefund() {
DB.query("SELECT id, floID, asset_type, asset, in_txid FROM RefundConvert WHERE r_status=?", [pCode.STATUS_PENDING]).then(results => {
results.forEach(r => {
if (r.ASSET_TYPE_COIN) {
if (r.asset == "FLO")
verifyTx.FLO(r.floID, r.in_txid)
.then(amount => blockchain.refundTransact.init(r.floID, r.asset, amount, r.id))
if (r.asset == "BTC") //Convert is only for BTC right now
verifyTx.BTC(r.floID, r.in_txid, keys.sink_groups.CONVERT)
.then(amount => blockchain.refundConvert.init(r.floID, r.asset, amount, r.id))
.catch(error => {
console.error(error);
if (error[0])
DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id])
.then(_ => null).catch(error => console.error(error));
});
else if (r.asset == "BTC")
verifyTx.BTC(r.floID, r.in_txid)
.then(amount => blockchain.refundTransact.init(r.floID, r.asset, amount, r.id))
.catch(error => {
console.error(error);
if (error[0])
DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id])
DB.query("UPDATE RefundConvert SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id])
.then(_ => null).catch(error => console.error(error));
});
} else if (r.ASSET_TYPE_TOKEN)
verifyTx.token(r.floID, r.in_txid).then(({ token, amount }) => {
if (token !== r.asset)
throw ([true, "Transaction token mismatched"]);
else
blockchain.refundTransact.init(r.floID, token, amount, r.id);
verifyTx.token(r.floID, r.in_txid, keys.sink_groups.CONVERT, true).then(({ amount }) => {
blockchain.refundConvert.init(r.floID, floGlobals.currency, amount, r.id);
}).catch(error => {
console.error(error);
if (error[0])
DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id])
DB.query("UPDATE RefundConvert SET r_status=? WHERE id=?", [pCode.STATUS_REJECTED, r.id])
.then(_ => null).catch(error => console.error(error));
});
})
}).catch(error => console.error(error))
}
function retryRefund() {
DB.query("SELECT id, floID, asset, amount FROM RefundTransact WHERE r_status=?", [pCode.STATUS_PROCESSING]).then(results => {
results.forEach(r => blockchain.refundTransact.retry(r.floID, r.asset, r.amount, r.id))
function retryConvertRefund() {
DB.query("SELECT id, floID, asset, amount FROM RefundConvert WHERE r_status=?", [pCode.STATUS_PROCESSING]).then(results => {
results.forEach(r => blockchain.refundConvert.retry(r.floID, r.asset, r.amount, r.id))
}).catch(error => console.error(error))
}
function confirmRefund() {
DB.query("SELECT * FROM RefundTransact WHERE r_status=?", [pCode.STATUS_CONFIRMATION]).then(results => {
results.forEach(r => { //TODO
function confirmConvertRefund() {
DB.query("SELECT * FROM RefundConvert WHERE r_status=?", [pCode.STATUS_CONFIRMATION]).then(results => {
results.forEach(r => {
if (r.ASSET_TYPE_COIN) {
if (r.asset == "FLO")
floBlockchainAPI.getTx(r.out_txid).then(tx => {
if (!tx.blockheight || !tx.confirmations) //Still not confirmed
return;
DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id])
.then(result => console.info(`Refunded ${r.amount} ${r.asset} to ${r.floID}`))
.catch(error => console.error(error))
}).catch(error => console.error(error));
else if (r.asset == "BTC")
if (r.asset == "BTC") //Convert is only for BTC right now
btcOperator.getTx(r.out_txid).then(tx => {
if (!tx.blockhash || !tx.confirmations) //Still not confirmed
return;
DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id])
DB.query("UPDATE RefundConvert SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id])
.then(result => console.info(`Refunded ${r.amount} ${r.asset} to ${r.floID}`))
.catch(error => console.error(error))
}).catch(error => console.error(error));
@ -422,7 +409,7 @@ function confirmRefund() {
floTokenAPI.getTx(r.out_txid).then(tx => {
if (!tx.transactionDetails.blockheight || !tx.transactionDetails.confirmations) //Still not confirmed
return;
DB.query("UPDATE RefundTransact SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id])
DB.query("UPDATE RefundConvert SET r_status=? WHERE id=?", [pCode.STATUS_SUCCESS, r.id])
.then(result => console.info(`Refunded ${r.amount} ${r.asset} to ${r.floID}`))
.catch(error => console.error(error));
}).catch(error => console.error(error));
@ -443,7 +430,7 @@ function confirmBondClosing() {
if (!tx.blockhash || !tx.confirmations) //Still not confirmed
return;
let closeBondString = bond_util.stringify.end(r.bond_id, r.end_date, r.btc_net, r.usd_net, r.amount, r.ref_sign, r.txid);
floBlockchainAPI.writeData(global.myFloID, closeBondString, global.myPrivKey, bond_util.config.adminID).then(txid => {
floBlockchainAPI.writeData(keys.node_id, closeBondString, keys.node_priv, bond_util.config.adminID).then(txid => {
DB.query("UPDATE CloseBondTransact SET r_status=?, close_id=? WHERE id=?", [pCode.STATUS_SUCCESS, txid, r.id])
.then(result => console.info("Bond closed:", r.bond_id))
.catch(error => console.error(error));
@ -466,7 +453,7 @@ function confirmFundClosing() {
if (!tx.blockhash || !tx.confirmations) //Still not confirmed
return;
let closeFundString = fund_util.stringify.end(r.fund_id, r.floID, r.end_date, r.btc_net, r.usd_net, r.amount, r.ref_sign, r.txid);
floBlockchainAPI.writeData(global.myFloID, closeFundString, global.myPrivKey, fund_util.config.adminID).then(txid => {
floBlockchainAPI.writeData(keys.node_id, closeFundString, keys.node_priv, fund_util.config.adminID).then(txid => {
DB.query("UPDATE CloseFundTransact SET r_status=?, close_id=? WHERE id=?", [pCode.STATUS_SUCCESS, txid, r.id])
.then(result => console.info("Fund investment closed:", r.fund_id, r.floID))
.catch(error => console.error(error));
@ -476,34 +463,75 @@ function confirmFundClosing() {
}).catch(error => console.error(error))
}
//Periodic Process
function processAll() {
//deposit-withdraw asset balance
confirmDepositFLO();
confirmDepositToken();
retryVaultWithdrawal();
confirmVaultWithdraw();
if (keys.sink_chest.list(keys.sink_groups.EXCHANGE).length) {
confirmDepositFLO();
confirmDepositToken();
retryVaultWithdrawal();
confirmVaultWithdraw();
}
//convert service
verifyConvert();
retryConvert();
confirmConvert();
verifyConvertFundDeposit();
retryConvertFundWithdraw();
confirmConvertFundWithdraw();
if (keys.sink_chest.list(keys.sink_groups.CONVERT).length) {
verifyConvert();
retryConvert();
confirmConvert();
verifyConvertFundDeposit();
retryConvertFundWithdraw();
confirmConvertFundWithdraw();
verifyConvertRefund();
retryConvertRefund();
confirmConvertRefund();
}
//blockchain-bond service
retryBondClosing();
confirmBondClosing();
if (keys.sink_chest.list(keys.sink_groups.BLOCKCHAIN_BONDS).length) {
retryBondClosing();
confirmBondClosing();
}
//bob's fund service
retryFundClosing();
confirmFundClosing();
//refund transactions
verifyRefund();
retryRefund();
confirmRefund();
if (keys.sink_chest.list(keys.sink_groups.EXCHANGE).length) {
retryFundClosing();
confirmFundClosing();
}
}
var lastSyncBlockHeight = 0;
function periodicProcess() {
floBlockchainAPI.promisedAPI('api/blocks?limit=1').then(result => {
if (lastSyncBlockHeight < result.blocks[0].height) {
lastSyncBlockHeight = result.blocks[0].height;
processAll();
console.log("Last Block :", lastSyncBlockHeight);
}
}).catch(error => console.error(error));
}
function periodicProcess_start() {
periodicProcess_stop();
periodicProcess();
assetList.forEach(asset => coupling.initiate(asset, true));
price.storeHistory.start();
periodicProcess.instance = setInterval(periodicProcess, PERIOD_INTERVAL);
}
function periodicProcess_stop() {
if (periodicProcess.instance !== undefined) {
clearInterval(periodicProcess.instance);
delete periodicProcess.instance;
}
coupling.stopAll();
price.storeHistory.stop();
}
module.exports = {
blockchain,
process: processAll,
periodicProcess: {
start: periodicProcess_start,
stop: periodicProcess_stop
},
set assetList(assets) {
assetList = assets;
},

View File

@ -1,93 +1,74 @@
'use strict';
const keys = require('../keys');
const K_Bucket = require('../../docs/scripts/floExchangeAPI').K_Bucket;
const slave = require('./slave');
const sync = require('./sync');
const WebSocket = require('ws');
const DB = require("../database");
const {
SHARE_THRESHOLD
} = require("../_constants")["backup"];
const { BACKUP_INTERVAL } = require("../_constants")["backup"];
const { DISCARD_COOLDOWN } = require("../_constants")["keys"];
var app, wss, tokenList; //Container for app and wss
var _app, _wss, tokenList; //Container for app and wss
var nodeList, nodeURL, nodeKBucket; //Container for (backup) node list
const connectedSlaves = {},
shares_collected = {},
shares_pending = {};
shares_pending = {},
discarded_sinks = [];
var _mode = null;
const SLAVE_MODE = 0,
MASTER_MODE = 1;
const sinkList = {};
const chests = {
get list() {
return Object.keys(sinkList);
},
get activeList() {
let result = [];
for (let id in sinkList)
if (sinkList[id])
result.push(id);
return result;
},
includes(id) {
return (id in sinkList);
},
isActive(id) {
return (id in sinkList && sinkList[id] !== null);
},
get pick() {
let sinks = Object.keys(sinkList),
i = floCrypto.randInt(0, sinks.length);
return sinks[i];
}
};
//Shares
function generateShares(sinkKey) {
let nextNodes = nodeKBucket.nextNode(global.myFloID, null),
let nextNodes = nodeKBucket.nextNode(keys.node_id, null),
aliveNodes = Object.keys(connectedSlaves);
nextNodes.unshift(global.myFloID);
aliveNodes.unshift(global.myFloID);
let N = nextNodes.length + 1,
th = Math.ceil(aliveNodes.length * SHARE_THRESHOLD) + 1,
shares, refShare, mappedShares = {};
shares = floCrypto.createShamirsSecretShares(sinkKey, N, th);
refShare = shares.pop();
nextNodes.unshift(keys.node_id);
aliveNodes.unshift(keys.node_id);
let shares, mappedShares = {};
shares = keys.generateShares(sinkKey, nextNodes.length, aliveNodes.length);
for (let i in nextNodes)
mappedShares[nextNodes[i]] = [refShare, shares[i]].join("|");
mappedShares[nextNodes[i]] = shares[i];
return mappedShares;
}
function sendShare(ws, sinkID, keyShare) {
if (shares_pending[sinkID][ws.floID] === keyShare) //this should always be true unless there is an error
delete shares_pending[sinkID][ws.floID]; //delete the share after sending it to respective slave
ws.send(JSON.stringify({
function sendShares(ws, sinkID) {
if (!(sinkID in shares_pending) || !(ws.floID in shares_pending[sinkID].shares))
return;
let { ref, group } = shares_pending[sinkID],
shares = shares_pending[sinkID].shares[ws.floID];
delete shares_pending[sinkID].shares[ws.floID]; //delete the share after sending it to respective slave
shares.forEach(s => ws.send(JSON.stringify({
command: "SINK_SHARE",
sinkID,
keyShare: floCrypto.encryptData(keyShare, ws.pubKey)
}));
sinkID, ref, group,
share: floCrypto.encryptData(s, ws.pubKey)
})));
}
function sendSharesToNodes(sinkID, shares) {
shares_pending[sinkID] = shares;
function sendSharesToNodes(sinkID, group, shares) {
if (discarded_sinks.includes(sinkID)) { //sinkID is discarded, abort the new shares
let i = discarded_sinks.findIndex(sinkID);
discarded_sinks.splice(i, 1);
return;
}
let ref = Date.now();
shares_pending[sinkID] = { shares, group, ref };
if (keys.node_id in shares) {
shares.forEach(s => keys.addShare(group, sinkID, ref, s))
delete shares_pending[sinkID].shares[keys.node_id];
}
for (let node in shares)
if (node in connectedSlaves)
sendShare(connectedSlaves[node], sinkID, shares[node]);
if (global.myFloID in shares) {
slave.storeShare(sinkID, shares[global.myFloID], false);
delete shares_pending[sinkID][global.myFloID];
}
sinkList[sinkID] = Date.now();
sendShares(connectedSlaves[node], sinkID);
keys.sink_chest.set_id(group, sinkID, ref);
}
function requestShare(ws, sinkID) {
function requestShare(ws, group, sinkID) {
ws.send(JSON.stringify({
command: "SEND_SHARE",
sinkID: sinkID,
pubKey: global.myPubKey
group, sinkID,
pubKey: keys.node_pub
}));
}
@ -126,20 +107,16 @@ function transferMoneyToNewSink(oldSinkID, oldSinkKey, newSink) {
}
*/
function collectAndCall(sinkID, callback, timeout = null) {
function collectAndCall(group, sinkID, callback, timeout = null) {
if (!(callback instanceof Function))
throw Error("callback should be a function");
if (!(sinkID in shares_collected)) { //if not already collecting shares for sinkID, then initiate collection
shares_collected[sinkID] = {
callbacks: [],
shares: {}
};
shares_collected[sinkID] = { group, ref: 0, callbacks: [], shares: {} };
for (let floID in connectedSlaves)
requestShare(connectedSlaves[floID], sinkID);
DB.query("SELECT share FROM sinkShares WHERE floID=?", [sinkID]).then(result => {
if (result.length)
collectShares(myFloID, sinkID, Crypto.AES.decrypt(result[0].share, global.myPrivKey))
}).catch(error => console.error(error))
requestShare(connectedSlaves[floID], group, sinkID);
keys.getShares(group, sinkID)
.then(({ ref, shares }) => shares.forEach(s => collectShares(sinkID, ref, s)))
.catch(error => console.error(error))
}
shares_collected[sinkID].callbacks.push(callback);
if (timeout)
@ -153,14 +130,22 @@ function collectAndCall(sinkID, callback, timeout = null) {
collectAndCall.isAlive = (sinkID, callbackRef) => (sinkID in shares_collected && shares_collected[sinkID].callbacks.indexOf(callbackRef) != -1);
function collectShares(floID, sinkID, share) {
function collectShares(sinkID, ref, share) {
if (_mode !== MASTER_MODE)
return console.warn("Not serving as master");
if (!(sinkID in shares_collected))
return console.error("Something is wrong! Slaves are sending sinkID thats not been collected");
shares_collected[sinkID].shares[floID] = share.split("|");
if (shares_collected[sinkID].ref > ref)
return console.debug("Received expired share");
else if (shares_collected[sinkID].ref < ref) {
shares_collected[sinkID].ref = ref;
shares_collected[sinkID].shares = [];
}
if (shares_collected[sinkID].shares.includes(share))
return console.debug("Received duplicate share");
shares_collected[sinkID].shares.push(share);
try {
let sinkKey = floCrypto.retrieveShamirSecret([].concat(...Object.values(shares_collected[sinkID].shares)));
let sinkKey = floCrypto.retrieveShamirSecret(shares_collected[sinkID].shares);
if (floCrypto.verifyPrivKey(sinkKey, sinkID)) {
console.debug("Shares collected successfully for", sinkID);
shares_collected[sinkID].callbacks.forEach(fn => fn instanceof Function ? fn(sinkKey) : null);
@ -182,11 +167,11 @@ function connectWS(floID) {
function connectToMaster(i = 0, init = false) {
if (i >= nodeList.length) {
console.error("No master is found, and myFloID is not in list. This should not happen!");
console.error("No master is found and Node not in list. This should not happen!");
process.exit(1);
}
let floID = nodeList[i];
if (floID === myFloID)
if (floID === keys.node_id)
serveAsMaster(init);
else
connectWS(floID).then(ws => {
@ -199,32 +184,16 @@ function connectToMaster(i = 0, init = false) {
});
}
//Node becomes master
function serveAsMaster(init) {
console.info('Starting master process');
slave.stop();
_mode = MASTER_MODE;
informLiveNodes(init);
app.resume();
}
function serveAsSlave(ws, init) {
console.info('Starting slave process');
app.pause();
slave.start(ws, init);
_mode = SLAVE_MODE;
}
function informLiveNodes(init) {
let message = {
floID: global.myFloID,
floID: keys.node_id,
type: "UPDATE_MASTER",
pubKey: global.myPubKey,
pubKey: keys.node_pub,
req_time: Date.now()
};
message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey);
message.sign = floCrypto.signData(message.type + "|" + message.req_time, keys.node_priv);
message = JSON.stringify(message);
let nodes = nodeList.filter(n => n !== global.myFloID);
let nodes = nodeList.filter(n => n !== keys.node_id);
Promise.allSettled(nodes.map(n => connectWS(n))).then(result => {
let flag = false;
for (let i in result)
@ -237,20 +206,29 @@ function informLiveNodes(init) {
console.warn(`Node(${nodes[i]}) is offline`);
if (init && flag)
syncRequest();
DB.query("SELECT floID, share FROM sinkShares ORDER BY time_stored DESC").then(result => {
if (result.length)
result.forEach(r => reconstructShares(r.floID));
else if (!flag) {
keys.getStoredList().then(result => {
if (Object.keys(result).length) {
keys.getDiscardedList().then(discarded_list => {
for (let group in result)
result.group.forEach(id => {
if (!(id in discarded_list))
reconstructShares(group, id);
});
}).catch(error => console.error(error))
} else if (!flag) {
console.log("Starting the exchange...");
let newSink = floCrypto.generateNewID();
console.debug("Generated sink:", newSink.floID, newSink.privKey);
sendSharesToNodes(newSink.floID, generateShares(newSink.privKey));
//generate a sinkID for each group in starting list
keys.sink_groups.generate_list.forEach(group => {
let newSink = floCrypto.generateNewID();
console.debug("Generated sink:", group, newSink.floID);
sendSharesToNodes(newSink.floID, group, generateShares(newSink.privKey));
})
}
}).catch(error => console.error(error));
});
}
function syncRequest(cur = global.myFloID) {
function syncRequest(cur = keys.node_id) {
//Sync data from next available node
let nextNode = nodeKBucket.nextNode(cur);
if (!nextNode)
@ -261,16 +239,16 @@ function syncRequest(cur = global.myFloID) {
}
function updateMaster(floID) {
let currentMaster = _mode === MASTER_MODE ? global.myFloID : slave.masterWS.floID;
let currentMaster = _mode === MASTER_MODE ? keys.node_id : slave.masterWS.floID;
if (nodeList.indexOf(floID) < nodeList.indexOf(currentMaster))
connectToMaster();
}
function reconstructShares(sinkID) {
function reconstructShares(group, sinkID) {
if (_mode !== MASTER_MODE)
return console.warn("Not serving as master");
sinkList[sinkID] = null;
collectAndCall(sinkID, sinkKey => sendSharesToNodes(sinkID, generateShares(sinkKey)));
keys.sink_chest.set_id(group, sinkID, null);
collectAndCall(group, sinkID, sinkKey => sendSharesToNodes(sinkID, group, generateShares(sinkKey)));
}
function slaveConnect(floID, pubKey, ws, sinks) {
@ -282,29 +260,82 @@ function slaveConnect(floID, pubKey, ws, sinks) {
//Send shares if need to be delivered
for (let sinkID in shares_pending)
if (floID in shares_pending[sinkID])
sendShare(ws, sinkID, shares_pending[floID]);
if (floID in shares_pending[sinkID].shares)
sendShares(ws, sinkID);
//Request shares if any
for (let sinkID in shares_collected)
requestShare(ws, sinkID); //if (!(floID in shares_collected[sinkID].shares))
requestShare(ws, shares_collected[sinkID].group, sinkID);
//check if sinks in slaves are present
if (Array.isArray(sinks))
for (let sinkID of sinks)
if (!(sinkID in sinkList))
reconstructShares(sinkID);
/*
if (shares_pending === null || //The 1st backup is connected
Object.keys(connectedSlaves).length < Math.pow(SHARE_THRESHOLD, 2) * Object.keys(shares_pending).length) //re-calib shares for better
sendSharesToNodes(sinkID, generateShares(sinkPrivKey))
*/
if (Array.isArray(sinks)) {
for (let sinkID of sinks) {
if (!keys.sink_chest.includes(shares_collected[sinkID].group, sinkID))
keys.checkIfDiscarded(sinkID)
.then(result => result === false ? reconstructShares(shares_collected[sinkID].group, sinkID) : null)
.catch(error => console.error(error))
}
}
}
function checkForDiscardedSinks() {
let cur_time = Date.now(),
all_sinks = keys.sink_chest.get_all();
for (let group in all_sinks)
all_sinks[group].forEach(id => keys.checkIfDiscarded(id).then(result => {
console.debug(group, id); //Check if group is correctly mapped, or if its changed by loop
if (result != false) {
if (cur_time - result > DISCARD_COOLDOWN)
keys.sink_chest.rm_id(group, id);
else
keys.sink_chest.set_id(group, id, null);
if (sinkID in shares_collected && !discarded_sinks.includes(sinkID))
discarded_sinks.push(sinkID);
}
}).catch(error => console.debug(error)))
}
//Master interval process
function intervalProcess() {
checkForDiscardedSinks();
}
intervalProcess.start = () => {
intervalProcess.stop();
intervalProcess.instance = setInterval(intervalProcess, BACKUP_INTERVAL);
}
intervalProcess.stop = () => {
if (intervalProcess.instance !== undefined) {
clearInterval(intervalProcess.instance);
delete intervalProcess.instance;
}
}
//Node becomes master
function serveAsMaster(init) {
console.info('Starting master process');
slave.stop();
_mode = MASTER_MODE;
keys.sink_chest.reset();
intervalProcess.start();
informLiveNodes(init);
_app.resume();
}
//Node becomes slave
function serveAsSlave(ws, init) {
console.info('Starting slave process');
intervalProcess.stop();
_app.pause();
slave.start(ws, init);
_mode = SLAVE_MODE;
}
//Transmistter
function startBackupTransmitter(server) {
wss = new WebSocket.Server({
_wss = new WebSocket.Server({
server
});
wss.on('connection', ws => {
_wss.on('connection', ws => {
ws.on('message', message => {
//verify if from a backup node
try {
@ -335,7 +366,7 @@ function startBackupTransmitter(server) {
slaveConnect(request.floID, request.pubKey, ws, request.sinks);
break;
case "SINK_SHARE":
collectShares(request.floID, request.sinkID, floCrypto.decryptData(request.share, global.myPrivKey))
collectShares(request.sinkID, request.ref, floCrypto.decryptData(request.share, keys.node_priv))
default:
invalid = "Invalid Request Type";
}
@ -361,16 +392,15 @@ function startBackupTransmitter(server) {
});
}
function initProcess(a) {
app = a;
app.chests = chests;
app.collectAndCall = collectAndCall;
startBackupTransmitter(app.server);
function initProcess(app) {
_app = app;
startBackupTransmitter(_app.server);
connectToMaster(0, true);
}
module.exports = {
init: initProcess,
collectAndCall,
set nodeList(list) {
nodeURL = list;
nodeKBucket = new K_Bucket(floGlobals.adminID, Object.keys(nodeURL));
@ -383,6 +413,6 @@ module.exports = {
tokenList = assets.filter(a => a.toUpperCase() !== "FLO");
},
get wss() {
return wss;
return _wss;
}
};

View File

@ -1,5 +1,6 @@
'use strict';
const keys = require("../keys");
const DB = require("../database");
const {
@ -21,18 +22,23 @@ function startSlaveProcess(ws, init) {
ws.on('message', processDataFromMaster);
masterWS = ws;
let sinks_stored = [];
DB.query("SELECT floID FROM sinkShares").then(result => {
sinks_stored = result.map(r => r.floID);
Promise.all([keys.getStoredList(), keys.getDiscardedList()]).then(result => {
let stored_list = result[0],
discarded_list = result[1];
for (let g in stored_list)
for (let id in stored_list[g])
if (!(stored_list[g][id] in discarded_list))
sinks_stored.push(id);
}).catch(error => console.error(error)).finally(_ => {
//inform master
let message = {
floID: global.myFloID,
pubKey: global.myPubKey,
floID: keys.node_id,
pubKey: keys.node_pub,
sinks: sinks_stored,
req_time: Date.now(),
type: "SLAVE_CONNECT"
}
message.sign = floCrypto.signData(message.type + "|" + message.req_time, global.myPrivKey);
message.sign = floCrypto.signData(message.type + "|" + message.req_time, keys.node_priv);
ws.send(JSON.stringify(message));
//start sync
if (init)
@ -58,14 +64,14 @@ function requestBackupSync(checksum_trigger, ws) {
return new Promise((resolve, reject) => {
DB.query('SELECT MAX(u_time) as last_time FROM _backup').then(result => {
let request = {
floID: global.myFloID,
pubKey: global.myPubKey,
floID: keys.node_id,
pubKey: keys.node_pub,
type: "BACKUP_SYNC",
last_time: result[0].last_time,
checksum: checksum_trigger,
req_time: Date.now()
};
request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey);
request.sign = floCrypto.signData(request.type + "|" + request.req_time, keys.node_priv);
ws.send(JSON.stringify(request));
resolve(request);
}).catch(error => reject(error))
@ -134,10 +140,10 @@ function processDataFromMaster(message) {
processBackupData(message);
else switch (message.command) {
case "SINK_SHARE":
storeSinkShare(message.sinkID, message.keyShare);
storeSinkShare(message.group, message.sinkID, message.share, message.ref);
break;
case "SEND_SHARE":
sendSinkShare(message.sinkID, message.pubKey);
sendSinkShare(message.group, message.sinkID, message.pubKey);
break;
case "REQUEST_ERROR":
console.log(message.error);
@ -150,30 +156,26 @@ function processDataFromMaster(message) {
}
}
function storeSinkShare(sinkID, keyShare, decrypt = true) {
if (decrypt)
keyShare = floCrypto.decryptData(keyShare, global.myPrivKey)
let encryptedShare = Crypto.AES.encrypt(keyShare, global.myPrivKey);
console.debug(Date.now(), '|sinkID:', sinkID, '|EnShare:', encryptedShare);
DB.query("INSERT INTO sinkShares (floID, share) VALUE (?) ON DUPLICATE KEY UPDATE share=?", [[sinkID, encryptedShare], encryptedShare])
function storeSinkShare(group, sinkID, share, ref) {
share = floCrypto.decryptData(share, keys.node_priv);
keys.addShare(group, sinkID, ref, share)
.then(_ => null).catch(error => console.error(error));
}
function sendSinkShare(sinkID, pubKey) {
DB.query("SELECT share FROM sinkShares WHERE floID=?", [sinkID]).then(result => {
if (!result.length)
return console.warn(`key-shares for ${sinkID} not found in database!`);
let share = Crypto.AES.decrypt(result[0].share, global.myPrivKey);
let response = {
type: "SINK_SHARE",
sinkID: sinkID,
share: floCrypto.encryptData(share, pubKey),
floID: global.myFloID,
pubKey: global.myPubKey,
req_time: Date.now()
}
response.sign = floCrypto.signData(response.type + "|" + response.req_time, global.myPrivKey); //TODO: strengthen signature
masterWS.send(JSON.stringify(response));
function sendSinkShare(group, sinkID, pubKey) {
keys.getShares(group, sinkID).then(({ ref, shares }) => {
shares.forEach(s => {
let response = {
type: "SINK_SHARE",
sinkID, ref,
share: floCrypto.encryptData(s, pubKey),
floID: keys.node_id,
pubKey: keys.node_pub,
req_time: Date.now()
}
response.sign = floCrypto.signData(response.type + "|" + response.req_time, keys.node_priv); //TODO: strengthen signature
masterWS.send(JSON.stringify(response));
})
}).catch(error => console.error(error));
}
@ -356,13 +358,13 @@ function requestHash(tables) {
//TODO: resync only necessary data (instead of entire table)
let self = requestInstance;
let request = {
floID: global.myFloID,
pubKey: global.myPubKey,
floID: keys.node_id,
pubKey: keys.node_pub,
type: "HASH_SYNC",
tables: tables,
req_time: Date.now()
};
request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey);
request.sign = floCrypto.signData(request.type + "|" + request.req_time, keys.node_priv);
self.ws.send(JSON.stringify(request));
self.request = request;
self.checksum = null;
@ -411,13 +413,13 @@ function verifyHash(hashes) {
function requestTableChunks(tables, ws) {
let request = {
floID: global.myFloID,
pubKey: global.myPubKey,
floID: keys.node_id,
pubKey: keys.node_pub,
type: "RE_SYNC",
tables: tables,
req_time: Date.now()
};
request.sign = floCrypto.signData(request.type + "|" + request.req_time, global.myPrivKey);
request.sign = floCrypto.signData(request.type + "|" + request.req_time, keys.node_priv);
ws.send(JSON.stringify(request));
}
@ -427,6 +429,5 @@ module.exports = {
},
start: startSlaveProcess,
stop: stopSlaveProcess,
storeShare: storeSinkShare,
syncRequest: ws => requestInstance.open(ws)
}

View File

@ -1,17 +1,25 @@
'use strict';
const pCode = require('../docs/scripts/floExchangeAPI').processCode;
const { collectAndCall } = require('./backup/head');
const keys = require('./keys');
const DB = require("./database");
var collectAndCall; //container for collectAndCall function from backup module
var chests; //container for blockchain ids (where assets are stored)
const TYPE_VAULT = "VAULT",
TYPE_CONVERT = "CONVERT",
TYPE_CONVERT_POOL = "CONVERT_POOL",
TYPE_REFUND = "REFUND",
TYPE_BOND = "BOND",
TYPE_FUND = "BOB-FUND";
TYPE_CONVERT_REFUND = "REFUND",
TYPE_BLOCKCHAIN_BOND = "BOND",
TYPE_BOBS_FUND = "BOB-FUND";
const SINK_GROUP = {
[TYPE_VAULT]: keys.sink_groups.EXCHANGE,
[TYPE_CONVERT]: keys.sink_groups.CONVERT,
[TYPE_CONVERT_POOL]: keys.sink_groups.CONVERT,
[TYPE_CONVERT_REFUND]: keys.sink_groups.CONVERT,
[TYPE_BLOCKCHAIN_BOND]: keys.sink_groups.BLOCKCHAIN_BONDS,
[TYPE_BOBS_FUND]: keys.sink_groups.BOBS_FUND
}
const balance_locked = {},
balance_cache = {},
@ -19,9 +27,9 @@ const balance_locked = {},
[TYPE_VAULT]: {},
[TYPE_CONVERT]: {},
[TYPE_CONVERT_POOL]: {},
[TYPE_REFUND]: {},
[TYPE_BOND]: {},
[TYPE_FUND]: {}
[TYPE_CONVERT_REFUND]: {},
[TYPE_BLOCKCHAIN_BOND]: {},
[TYPE_BOBS_FUND]: {}
};
function getBalance(sinkID, asset) {
@ -36,10 +44,10 @@ function getBalance(sinkID, asset) {
}
}
function getSinkID(quantity, asset, sinkList = null) {
function getSinkID(type, quantity, asset, sinkList = null) {
return new Promise((resolve, reject) => {
if (!sinkList)
sinkList = chests.list.map(s => [s, s in balance_cache ? balance_cache[s][asset] || 0 : 0]) //TODO: improve sorting
sinkList = keys.sink_chest.list(SINK_GROUP[type]).map(s => [s, s in balance_cache ? balance_cache[s][asset] || 0 : 0]) //TODO: improve sorting
.sort((a, b) => b[1] - a[1]).map(x => x[0]);
if (!sinkList.length)
return reject(`Insufficient balance in chests for asset(${asset})`);
@ -51,12 +59,12 @@ function getSinkID(quantity, asset, sinkList = null) {
if (balance > (quantity + (sinkID in balance_locked ? balance_locked[sinkID][asset] || 0 : 0)))
return resolve(sinkID);
else
getSinkID(quantity, asset, sinkList)
getSinkID(type, quantity, asset, sinkList)
.then(result => resolve(result))
.catch(error => reject(error))
}).catch(error => {
console.error(error);
getSinkID(quantity, asset, sinkList)
getSinkID(type, quantity, asset, sinkList)
.then(result => resolve(result))
.catch(error => reject(error))
});
@ -67,9 +75,9 @@ const WITHDRAWAL_MESSAGE = {
[TYPE_VAULT]: "(withdrawal from market)",
[TYPE_CONVERT]: "(convert coin)",
[TYPE_CONVERT_POOL]: "(convert fund)",
[TYPE_REFUND]: "(refund from market)",
[TYPE_BOND]: "(bond closing)",
[TYPE_FUND]: "(fund investment closing)"
[TYPE_CONVERT_REFUND]: "(refund from market)",
[TYPE_BLOCKCHAIN_BOND]: "(bond closing)",
[TYPE_BOBS_FUND]: "(fund investment closing)"
}
function sendTx(floID, asset, quantity, sinkID, sinkKey, message) {
@ -89,14 +97,14 @@ const updateSyntax = {
[TYPE_VAULT]: "UPDATE VaultTransactions SET r_status=?, txid=? WHERE id=?",
[TYPE_CONVERT]: "UPDATE DirectConvert SET r_status=?, out_txid=? WHERE id=?",
[TYPE_CONVERT_POOL]: "UPDATE ConvertFund SET r_status=?, txid=? WHERE id=?",
[TYPE_REFUND]: "UPDATE RefundTransact SET r_status=?, out_txid=? WHERE id=?",
[TYPE_BOND]: "UPDATE CloseBondTransact SET r_status=?, txid=? WHERE id=?",
[TYPE_FUND]: "UPDATE CloseFundTransact SET r_status=?, txid=? WHERE id=?"
[TYPE_CONVERT_REFUND]: "UPDATE RefundConvert SET r_status=?, out_txid=? WHERE id=?",
[TYPE_BLOCKCHAIN_BOND]: "UPDATE CloseBondTransact SET r_status=?, txid=? WHERE id=?",
[TYPE_BOBS_FUND]: "UPDATE CloseFundTransact SET r_status=?, txid=? WHERE id=?"
};
function sendAsset(floID, asset, quantity, type, id) {
quantity = global.toStandardDecimal(quantity);
getSinkID(quantity, asset).then(sinkID => {
getSinkID(type, quantity, asset).then(sinkID => {
let callback = (sinkKey) => {
//Send asset to user via API
sendTx(floID, asset, quantity, sinkID, sinkKey, WITHDRAWAL_MESSAGE[type]).then(txid => {
@ -110,7 +118,7 @@ function sendAsset(floID, asset, quantity, type, id) {
balance_locked[sinkID][asset] -= quantity;
});
}
collectAndCall(sinkID, callback);
collectAndCall(sinkID, callback); //TODO: add timeout to prevent infinite wait
callbackCollection[type][id] = callback;
if (!(sinkID in balance_locked))
balance_locked[sinkID] = {};
@ -165,39 +173,30 @@ function convertFundWithdraw_retry(asset, amount, id) {
}
function bondTransact_retry(floID, amount, btc_rate, usd_rate, id) {
if (id in callbackCollection[TYPE_BOND])
if (id in callbackCollection[TYPE_BLOCKCHAIN_BOND])
console.debug("A callback is already pending for this Bond closing");
else sendAsset(floID, "BTC", amount / (btc_rate * usd_rate), TYPE_BOND, id);
else sendAsset(floID, "BTC", amount / (btc_rate * usd_rate), TYPE_BLOCKCHAIN_BOND, id);
}
function fundTransact_retry(floID, amount, btc_rate, usd_rate, id) {
if (id in callbackCollection[TYPE_FUND])
if (id in callbackCollection[TYPE_BOBS_FUND])
console.debug("A callback is already pending for this Fund investment closing");
else sendAsset(floID, "BTC", amount / (btc_rate * usd_rate), TYPE_FUND, id);
else sendAsset(floID, "BTC", amount / (btc_rate * usd_rate), TYPE_BOBS_FUND, id);
}
function refundTransact_init(floID, asset, amount, id) {
function refundConvert_init(floID, asset, amount, id) {
amount = global.toStandardDecimal(amount);
DB.query("UPDATE RefundTransact SET amount=?, r_status=?, locktime=DEFAULT WHERE id=?", [amount, pCode.STATUS_PROCESSING, id])
.then(result => sendAsset(floID, asset, amount, TYPE_REFUND, id))
DB.query("UPDATE RefundConvert SET amount=?, r_status=?, locktime=DEFAULT WHERE id=?", [amount, pCode.STATUS_PROCESSING, id])
.then(result => sendAsset(floID, asset, amount, TYPE_CONVERT_REFUND, id))
.catch(error => console.error(error))
}
function refundTransact_retry(floID, asset, amount, id) {
if (id in callbackCollection[TYPE_REFUND])
function refundConvert_retry(floID, asset, amount, id) {
if (id in callbackCollection[TYPE_CONVERT_REFUND])
console.debug("A callback is already pending for this Refund");
else sendAsset(floID, asset, amount, TYPE_REFUND, id);
else sendAsset(floID, asset, amount, TYPE_CONVERT_REFUND, id);
}
module.exports = {
set collectAndCall(fn) {
collectAndCall = fn;
},
get chests() {
return chests;
},
set chests(c) {
chests = c;
},
withdrawAsset: {
init: withdrawAsset_init,
retry: withdrawAsset_retry
@ -219,8 +218,8 @@ module.exports = {
fundTransact: {
retry: fundTransact_retry
},
refundTransact: {
init: refundTransact_init,
retry: refundTransact_retry
refundConvert: {
init: refundConvert_init,
retry: refundConvert_retry
}
}

380
src/keys.js Normal file
View File

@ -0,0 +1,380 @@
'use strict';
const fs = require('fs');
const path = require('path');
const DB = require("./database");
const { SHARES_PER_NODE, SHARE_THRESHOLD } = require("./_constants")["keys"];
const PRIV_EKEY_MIN = 32,
PRIV_EKEY_MAX = 48,
PRIME_FILE_TYPE = 'binary',
INDEX_FILE_TYPE = 'utf-8',
INT_MIN = -2147483648,
INT_MAX = 2147483647,
INDEX_FILE_NAME_LENGTH = 16,
INDEX_FILE_EXT = '.txt',
MIN_DUMMY_FILES = 16,
MAX_DUMMY_FILES = 24,
MIN_DUMMY_SIZE_MUL = 0.5,
MAX_DUMMY_SIZE_MUL = 1.5,
SIZE_FACTOR = 100;
var node_priv, e_key, node_id, node_pub; //containers for node-key wrapper
const _ = {
get node_priv() {
if (!node_priv || !e_key)
throw Error("keys not set");
return Crypto.AES.decrypt(node_priv, e_key);
},
set node_priv(key) {
node_pub = floCrypto.getPubKeyHex(key);
node_id = floCrypto.getFloID(node_pub);
if (!key || !node_pub || !node_id)
throw Error("Invalid Keys");
let n = floCrypto.randInt(PRIV_EKEY_MIN, PRIV_EKEY_MAX)
e_key = floCrypto.randString(n);
node_priv = Crypto.AES.encrypt(key, e_key);
},
args_dir: path.resolve(__dirname, '..', '..', 'args'),
index_dir: path.join(this.args_dir, 'indexes'),
prime_file: path.join(this.args_dir, 'prime_index.b'),
get index_file() {
try {
let data = fs.readFileSync(this.prime_file, PRIME_FILE_TYPE),
fname = Crypto.AES.decrypt(data, this.node_priv);
return path.join(this.index_dir, fname + INDEX_FILE_EXT);
} catch (error) {
console.debug(error);
throw Error("Prime-Index Missing/Corrupted");
}
}
}
function initialize() {
return new Promise((resolve, reject) => {
fs.readFile(_.prime_file, PRIME_FILE_TYPE, (err, res) => {
var data, cur_filename, new_filename, priv_key;
try {
priv_key = _.node_priv;
} catch (error) {
return reject(error);
}
if (!err) {
if (res.length) { //prime file not empty
try {
cur_filename = Crypto.AES.decrypt(res, priv_key);
} catch (error) {
console.debug(error);
return reject("Prime file corrupted");
} try { //read data from index file
let tmp = fs.readFileSync(path.join(_.index_dir, cur_filename + INDEX_FILE_EXT), INDEX_FILE_TYPE);
tmp = Crypto.AES.decrypt(tmp, priv_key);
JSON.parse(tmp); //check if data is JSON parse-able
data = tmp;
} catch (error) {
console.debug(error);
return reject("Index file corrupted");
}
}
}
try { //delete all old dummy files
let files = fs.readdirSync(_.index_dir);
for (const file of files)
if (!cur_filename || file !== cur_filename + INDEX_FILE_EXT) //check if file is current file
fs.unlinkSync(path.join(_.index_dir, file));
} catch (error) {
console.debug(error);
return reject("Clear index directory failed");
} try { //create files (dummy and new index file)
let N = floCrypto.randInt(MIN_DUMMY_FILES, MAX_DUMMY_FILES),
k = floCrypto.randInt(0, N);
if (typeof data === 'undefined' || data.length == 0) //no existing data, initialize
data = JSON.stringify({});
let data_size = data.length;
for (let i = 0; i <= N; i++) {
let f_data, f_name = floCrypto.randString(INDEX_FILE_NAME_LENGTH);
if (i == k) {
new_filename = f_name;
f_data = data;
} else {
let d_size = data_size * (floCrypto.randInt(MIN_DUMMY_SIZE_MUL * SIZE_FACTOR, MAX_DUMMY_SIZE_MUL * SIZE_FACTOR) / SIZE_FACTOR);
f_data = floCrypto.randString(d_size, false);
}
f_data = Crypto.AES.encrypt(f_data, priv_key);
fs.writeFileSync(path.join(_.index_dir, f_name + INDEX_FILE_EXT), f_data, INDEX_FILE_TYPE);
}
} catch (error) {
console.debug(error);
return reject("Index file creation failed");
} try { //update prime file
let en_filename = Crypto.AES.encrypt(new_filename, priv_key);
fs.writeFileSync(_.prime_file, en_filename, PRIME_FILE_TYPE);
} catch (error) {
console.debug(error);
return reject("Update prime file failed");
}
if (cur_filename)
fs.unlink(path.join(_.index_dir, file), err => err ? console.debug(err) : null);
resolve("Key management initiated");
})
})
}
function shuffle() {
readIndexFile().then(data => {
let new_filename, cur_filename = Crypto.AES.decrypt(fs.readFileSync(_.prime_file, PRIME_FILE_TYPE), _.node_priv);
fs.readdir(_.index_dir, (err, files) => {
if (err)
return console.error(err);
data = JSON.stringify(data);
let data_size = data.length;
for (let file of files) {
let f_data, f_name = floCrypto.randString(INDEX_FILE_NAME_LENGTH);
if (file === cur_filename) {
new_filename = f_name;
f_data = data;
} else {
let d_size = data_size * (floCrypto.randInt(MIN_DUMMY_SIZE_MUL * SIZE_FACTOR, MAX_DUMMY_SIZE_MUL * SIZE_FACTOR) / SIZE_FACTOR);
f_data = floCrypto.randString(d_size, false);
}
f_data = Crypto.AES.encrypt(f_data, _.node_priv);
//rename and rewrite the file
fs.renameSync(path.join(_.args_dir, file + INDEX_FILE_EXT), path.join(_.index_dir, f_name + INDEX_FILE_EXT));
fs.writeFileSync(path.join(_.index_dir, f_name + INDEX_FILE_EXT), f_data, INDEX_FILE_TYPE);
}
//update prime file
if (!new_filename)
throw Error("Index file has not been renamed");
let en_filename = Crypto.AES.encrypt(new_filename, _.node_priv);
fs.writeFileSync(_.prime_file, en_filename, PRIME_FILE_TYPE);
})
}).catch(error => console.error(error))
}
function readIndexFile() {
return new Promise((resolve, reject) => {
fs.readFile(_.index_file, INDEX_FILE_TYPE, (err, data) => {
if (err) {
console.debug(err);
return reject('Unable to read Index file');
}
try {
data = JSON.parse(Crypto.AES.decrypt(data, _.node_priv));
resolve(data);
} catch {
reject("Index file corrupted");
}
})
})
}
function writeIndexFile(data) {
return new Promise((resolve, reject) => {
let en_data = Crypto.AES.encrypt(JSON.stringify(data), _.node_priv);
fs.writeFile(_.index_file, en_data, INDEX_FILE_TYPE, (err) => {
if (err) {
console.debug(err);
return reject('Unable to write Index file');
} else resolve("Updated Index file");
})
})
}
function getShares(group, id, ignoreDiscarded = true) {
return new Promise((resolve, reject) => {
checkIfDiscarded(id).then(result => {
if (ignoreDiscarded && result != false)
return reject("Trying to get share for discarded ID");
readIndexFile().then(data => {
if (!(group in data))
reject("Group not found in Index file");
else if (!(id in data[group]))
reject("ID not found in Index file");
else {
let ref = data[group][id].shift();
DB.query("SELECT share FROM sinkShares WHERE num IN (?)", [data[group][id]])
.then(result => resolve({ ref, shares: result.map(r => Crypto.AES.decrypt(r.share, _.node_priv)) }))
.catch(error => reject(error))
}
}).catch(error => reject(error))
}).catch(error => reject(error))
})
}
function storeShareAtRandom(share) {
return new Promise((resolve, reject) => {
let rand = floCrypto.randInt(INT_MIN, INT_MAX);
DB.query("INSERT INTO sinkShares(num, share) VALUE (?)", [[rand, share]])
.then(result => resolve(result.insertId)).catch(error => {
if (error.code === "ER_DUP_ENTRY")
storeShareAtRandom(share) //try again (with diff rand_num)
.then(result => resolve(result))
.catch(error => reject(error))
else
reject(error);
})
})
}
function addShare(group, id, ref, share) {
return new Promise((resolve, reject) => {
checkIfDiscarded(id).then(result => {
if (result != false)
return reject("Trying to store share for discarded ID");
readIndexFile().then(data => {
if (!(group in data))
data[group] = {};
if (!(id in data[group]))
data[group][id] = [ref];
else if (ref < data[group][id][0])
return reject("reference is lower than current");
else if (ref > data[group][id][0]) {
let old_shares = data[group][id];
data[group][id] = [ref];
old_shares.shift();
DB.query("DELETE FROM sinkShares WHERE num in (?", [old_shares])//delete old shares
.then(_ => null).catch(error => console.error(error));
}
let encrypted_share = Crypto.AES.encrypt(share, _.node_priv);
console.debug(ref, '|sinkID:', sinkID, '|EnShare:', encrypted_share);
storeShareAtRandom(encrypted_share).then(i => {
data[group][id].push(i);
writeIndexFile(data).then(_ => resolve(i)).catch(error => reject(error));
}).catch(error => reject(error))
}).catch(error => reject(error))
}).catch(error => reject(error))
})
}
function generateShares(sinkKey, total_n, min_n) {
let shares = floCrypto.createShamirsSecretShares(sinkKey, total_n * SHARES_PER_NODE, min_n * SHARES_PER_NODE * SHARE_THRESHOLD);
let node_shares = Array(total_n);
for (let i = 0; i < total_n; i++)
node_shares[i] = shares.splice(0, SHARES_PER_NODE);
return node_shares;
}
function getStoredList(group = null) {
return new Promise((resolve, reject) => {
readIndexFile().then(data => {
if (group !== null) {
if (group in data)
resolve(Object.keys(data.group));
else
reject("Group not found in Index file");
} else {
let ids = {};
for (let group in data)
ids[group] = Object.keys(data.group);
resolve(ids);
}
}).catch(error => reject(error))
})
}
function getDiscardedList() {
return new Promise((resolve, reject) => {
DB.query("SELECT floID, discard_time FROM discardedSinks")
.then(result => resolve(Object.fromEntries(result.map(r => [r.floID, r.discard_time]))))
.catch(error => reject(error))
})
}
function checkIfDiscarded(id) {
return new Promise((resolve, reject) => {
DB.query("SELECT discard_time FROM discardedSinks WHERE floID=?", [id])
.then(result => resolve(result.length ? result[0].discard_time : false))
.catch(error => reject(error))
})
}
//Sink groups and chest
const sink_groups = {
get EXCHANGE() { return "exchange" },
get CONVERT() { return "convert" },
get BLOCKCHAIN_BONDS() { return "blockchain_bonds" },
get BOBS_FUND() { return "bobs_fund" },
get generate_list() { //list to generate when starting exchange
return [this.EXCHANGE, this.CONVERT]
}
};
const sink_ids = {}, sink_chest = {
reset() {
for (let i in sink_ids)
delete sink_ids[i];
},
set_id(group, id, value) {
if (!(group in sink_ids))
sink_ids[group] = {};
sink_ids[group][id] = value;
},
rm_id(group, id) {
return delete sink_ids[group][id];
},
get_id(group, id) {
return sink_ids[group][id];
},
list(group) {
return Object.keys(sink_ids[group] || {});
},
active_list(group) {
let ids = [];
if (group in sink_ids)
for (let id in sink_ids[group])
if (sink_ids[group][id])
ids.push(id);
return ids;
},
includes(group, id) {
return group in sink_ids ? (id in sink_ids[group]) : null;
},
isActive(group, id) {
return group in sink_ids ? (id in sink_ids && sink_ids[id]) : null;
},
pick(group) {
let ids = this.list(group),
i = floCrypto.randInt(0, ids.length);
return ids[i];
},
active_pick(group) {
let ids = this.active_list(group),
i = floCrypto.randInt(0, ids.length);
return ids[i];
},
get_all() {
let ids = {};
for (let g in sink_ids)
ids[g] = Object.keys(sink_ids[g])
return ids;
}
};
module.exports = {
init: initialize,
getShares,
addShare,
generateShares,
getStoredList,
getDiscardedList,
checkIfDiscarded,
set node_priv(key) {
_.node_priv = key;
},
get node_priv() {
return _.node_priv;
},
get node_id() {
return node_id;
},
get node_id() {
return node_pub;
},
get sink_groups() {
return sink_groups;
},
get sink_chest() {
return sink_chest;
}
}

View File

@ -13,6 +13,7 @@ global.btcOperator = require('../docs/scripts/btcOperator');
floGlobals.application = application;
})();
const keys = require('./keys');
const DB = require("./database");
const App = require('./app');
@ -178,17 +179,13 @@ module.exports = function startServer() {
console.error('Password not entered!');
process.exit(1);
}
global.myPrivKey = Crypto.AES.decrypt(_tmp, _pass);
global.myPubKey = floCrypto.getPubKeyHex(global.myPrivKey);
global.myFloID = floCrypto.getFloID(global.myPubKey);
if (!global.myFloID || !global.myPubKey || !global.myPrivKey)
throw "Invalid Keys";
keys.node_priv = Crypto.AES.decrypt(_tmp, _pass);
} catch (error) {
console.error('Unable to load private key!');
process.exit(1);
}
console.log("Logged in as", global.myFloID);
console.log("Logged in as", keys.node_id);
DB.connect(config["sql_user"], config["sql_pwd"], config["sql_db"], config["sql_host"]).then(result => {
app = new App(config['secret']);

View File

@ -1,14 +1,12 @@
'use strict';
const coupling = require('./coupling');
const price = require("./price");
const background = require('./background');
const DB = require("./database");
const blockchain = background.blockchain;
const blockchain = require('./blockchain');
const {
PERIOD_INTERVAL,
WAIT_TIME,
TRADE_HASH_PREFIX,
TRANSFER_HASH_PREFIX
} = require('./_constants')["market"];
@ -41,7 +39,7 @@ function getRateHistory(asset, duration) {
if (!asset || !assetList.includes(asset))
reject(INVALID(eCode.INVALID_TOKEN_NAME, `Invalid asset(${asset})`));
else
coupling.price.getHistory(asset, duration)
price.getHistory(asset, duration)
.then(result => resolve(result))
.catch(error => reject(error))
})
@ -469,58 +467,14 @@ function checkDistributor(floID, asset) {
})
}
//Periodic Process
function periodicProcess_start() {
periodicProcess_stop();
periodicProcess();
assetList.forEach(asset => coupling.initiate(asset, true));
coupling.price.storeHistory.start();
periodicProcess.instance = setInterval(periodicProcess, PERIOD_INTERVAL);
}
function periodicProcess_stop() {
if (periodicProcess.instance !== undefined) {
clearInterval(periodicProcess.instance);
delete periodicProcess.instance;
}
coupling.stopAll();
coupling.price.storeHistory.stop();
};
var lastSyncBlockHeight = 0;
function periodicProcess() {
if (periodicProcess.timeout !== undefined) {
clearTimeout(periodicProcess.timeout);
delete periodicProcess.timeout;
}
if (!blockchain.chests.list.length)
return periodicProcess.timeout = setTimeout(periodicProcess, WAIT_TIME);
floBlockchainAPI.promisedAPI('api/blocks?limit=1').then(result => {
if (lastSyncBlockHeight < result.blocks[0].height) {
lastSyncBlockHeight = result.blocks[0].height;
background.process();
console.log("Last Block :", lastSyncBlockHeight);
}
}).catch(error => console.error(error));
}
module.exports = {
login,
logout,
get rates() {
return coupling.price.currentRates;
return price.currentRates;
},
get priceCountDown() {
return coupling.price.lastTimes;
},
get chests() {
return blockchain.chests;
},
set chests(c) {
blockchain.chests = c;
return price.lastTimes;
},
addBuyOrder,
addSellOrder,
@ -539,18 +493,11 @@ module.exports = {
removeTag,
addDistributor,
removeDistributor,
periodicProcess: {
start: periodicProcess_start,
stop: periodicProcess_stop
},
set assetList(assets) {
assetList = assets;
background.assetList = assets;
},
get assetList() {
return assetList
},
set collectAndCall(fn) {
blockchain.collectAndCall = fn;
},
}
};