Fix: invoking 'collectAndCall' for withdraws

This commit is contained in:
sairajzero 2022-09-26 01:34:26 +05:30
parent dd232c5a71
commit 1e8f2c17e7
5 changed files with 173 additions and 34 deletions

View File

@ -114,8 +114,8 @@ module.exports = function App(secret, DB) {
set: (assets) => Request.assetList = assets set: (assets) => Request.assetList = assets
}); });
Object.defineProperty(self, "chest", { Object.defineProperty(self, "chests", {
set: (chest) => Request.chest = chest set: (chests) => Request.chests = chests
}); });
Object.defineProperty(self, "collectAndCall", { Object.defineProperty(self, "collectAndCall", {

View File

@ -20,7 +20,7 @@ const SLAVE_MODE = 0,
const sinkList = {}; const sinkList = {};
app.chest = { app.chests = {
get list() { get list() {
return Object.keys(sinkList); return Object.keys(sinkList);
}, },
@ -125,14 +125,12 @@ function transferMoneyToNewSink(oldSinkID, oldSinkKey, newSink) {
} }
*/ */
function collectAndCall(sinkID, callback) { function collectAndCall(sinkID, callback, timeout = null) {
if (!(callback instanceof Function)) if (!(callback instanceof Function))
throw Error("callback should be a function"); throw Error("callback should be a function");
if (sinkID in shares_collected) //if there is already a collect for sinkID, then just add the callback to queue if (!(sinkID in shares_collected)) { //if not already collecting shares for sinkID, then initiate collection
return shares_collected[sinkID].callbacks.push(callback);
shares_collected[sinkID] = { shares_collected[sinkID] = {
callbacks: [callback], callbacks: [],
shares: {} shares: {}
}; };
for (let floID in connectedSlaves) for (let floID in connectedSlaves)
@ -141,8 +139,19 @@ function collectAndCall(sinkID, callback) {
if (result.length) if (result.length)
collectShares(myFloID, sinkID, Crypto.AES.decrypt(result[0].share, global.myPrivKey)) collectShares(myFloID, sinkID, Crypto.AES.decrypt(result[0].share, global.myPrivKey))
}).catch(error => console.error(error)) }).catch(error => console.error(error))
}
shares_collected[sinkID].callbacks.push(callback);
if (timeout)
setTimeout(() => {
if (sinkID in shares_collected) {
let i = shares_collected[sinkID].callbacks.indexOf(callback);
delete shares_collected[sinkID].callbacks[i]; //deleting will empty the index, but space will be there so that order of other indexes are not affected
}
}, timeout);
} }
collectAndCall.isAlive = (sinkID, callbackRef) => (sinkID in shares_collected && shares_collected[sinkID].callbacks.indexOf(callbackRef) != -1);
function collectShares(floID, sinkID, share) { function collectShares(floID, sinkID, share) {
if (_mode !== MASTER_MODE) if (_mode !== MASTER_MODE)
return console.warn("Not serving as master"); return console.warn("Not serving as master");
@ -153,7 +162,7 @@ function collectShares(floID, sinkID, share) {
let sinkKey = floCrypto.retrieveShamirSecret([].concat(...Object.values(shares_collected[sinkID].shares))); let sinkKey = floCrypto.retrieveShamirSecret([].concat(...Object.values(shares_collected[sinkID].shares)));
if (floCrypto.verifyPrivKey(sinkKey, sinkID)) { if (floCrypto.verifyPrivKey(sinkKey, sinkID)) {
console.log("Shares collected successfully for", sinkID); console.log("Shares collected successfully for", sinkID);
shares_collected[sinkID].callbacks.map(fn => fn(sinkKey)); shares_collected[sinkID].callbacks.forEach(fn => fn instanceof Function ? fn(sinkKey) : null);
delete shares_collected[sinkID]; delete shares_collected[sinkID];
} }
} catch { } catch {

130
src/blockchain.js Normal file
View File

@ -0,0 +1,130 @@
'use strict';
var collectAndCall; //container for collectAndCall function from backup module
var chests; //container for blockchain ids (where assets are stored)
const balance_locked = {},
balance_cache = {},
callbackCollection = {
FLO: {},
token: {}
};
function getSinkID(amount, asset = "FLO", sinkList = null) {
return new Promise((resolve, reject) => {
if (!sinkList)
sinkList = chests.list.map(s => [s, s in balance_cache ? balance_cache[s][asset] || 0 : 0]) //TODO: improve sorting
.sort((a, b) => b[1] - a[1]).map(x => x[0]);
if (!sinkList.length)
return reject(`Insufficient balance in chests for asset(${asset})`);
let sinkID = sinkList.shift();
(asset === "FLO" ? floBlockchainAPI.getBalance(sinkID) : floTokenAPI.getBalance(sinkID, asset)).then(balance => {
if (!(sinkID in balance_cache))
balance_cache[sinkID] = {};
balance_cache[sinkID][asset] = balance;
if (balance > (amount + (sinkID in balance_locked ? balance_locked[sinkID][asset] || 0 : 0)))
return resolve(sinkID);
else
getSinkID(amount, asset, sinkList)
.then(result => resolve(result))
.catch(error => reject(error))
}).catch(error => {
console.error(error);
getSinkID(amount, asset, sinkList)
.then(result => resolve(result))
.catch(error => reject(error))
});
})
}
function sendFLO(floID, amount, id) {
getSinkID(amount).then(sinkID => {
let callback = (sinkKey) => {
//Send FLO to user via blockchain API
floBlockchainAPI.sendTx(sinkID, floID, amount, sinkKey, '(withdrawal from market)').then(txid => {
if (!txid)
throw Error("Transaction not successful");
//Transaction was successful, Add in DB
DB.query("UPDATE OutputFLO SET status=?, txid=? WHERE id=?", ["WAITING_CONFIRMATION", txid, id])
.then(_ => null).catch(error => console.error(error));
}).catch(error => console.error(error)).finally(_ => {
delete callbackCollection.FLO[id];
balance_locked[sinkID].FLO -= amount;
});
}
collectAndCall(sinkID, callback);
callbackCollection.FLO[id] = callback;
if (!(sinkID in balance_locked))
balance_locked[sinkID] = {};
balance_locked[sinkID].FLO = (balance_locked[sinkID].FLO || 0) + amount;
}).catch(error => console.error(error))
}
function sendFLO_init(floID, amount) {
DB.query("INSERT INTO OutputFLO (floID, amount, status) VALUES (?, ?, ?)", [floID, amount, "PENDING"])
.then(result => sendFLO(floID, amount, result.insertId))
.catch(error => console.error(error))
}
function sendFLO_retry(floID, amount, id) {
if (id in callbackCollection.FLO)
console.debug("A callback is already pending for this FLO transfer");
else
sendFLO(floID, amount, id);
}
function sendToken(floID, token, amount, id) {
getSinkID(amount, token).then(sinkID => {
let callback = (sinkKey) => {
//Send Token to user via token API
floTokenAPI.sendToken(sinkKey, amount, floID, '(withdrawal from market)', token).then(txid => {
if (!txid)
throw Error("Transaction not successful");
//Transaction was successful, Add in DB
DB.query("UPDATE OutputToken SET status=?, txid=? WHERE id=?", ["WAITING_CONFIRMATION", txid, id])
.then(_ => null).catch(error => console.error(error));
}).catch(error => console.error(error)).finally(_ => {
delete callbackCollection.token[id];
balance_locked[sinkID][token] -= amount;
});
}
collectAndCall(sinkID, callback);
callbackCollection.token[id] = callback;
if (!(sinkID in balance_locked))
balance_locked[sinkID] = {};
balance_locked[sinkID][token] = (balance_locked[sinkID][token] || 0) + amount;
}).catch(error => console.error(error))
}
function sendToken_init() {
DB.query("INSERT INTO OutputToken (floID, token, amount, status) VALUES (?, ?, ?, ?)", [floID, token, amount, "PENDING"])
.then(result => sendToken(floID, amount, result.insertId))
.catch(error => console.error(error))
}
function sendToken_retry(floID, token, amount, id) {
if (id in callbackCollection.token)
console.debug("A callback is already pending for this token transfer");
else
sendToken(floID, token, amount, id);
}
module.exports = {
set collectAndCall(fn) {
collectAndCall = fn;
},
get chests() {
return chests;
},
set chests(c) {
chests = c;
},
sendFLO: {
init: sendFLO_init,
retry: sendFLO_retry
},
sendToken: {
init: sendToken_init,
retry: sendToken_retry
}
}

View File

@ -390,9 +390,9 @@ confirmDepositFLO.checkTx = function(sender, txid) {
return reject([false, "Transaction not included in any block yet"]); return reject([false, "Transaction not included in any block yet"]);
if (!tx.confirmations) if (!tx.confirmations)
return reject([false, "Transaction not confirmed yet"]); return reject([false, "Transaction not confirmed yet"]);
let amount = tx.vout.reduce((a, v) => blockchain.chest.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); let amount = tx.vout.reduce((a, v) => blockchain.chests.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0);
if (amount == 0) if (amount == 0)
return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestList loading from other nodes) return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes)
else else
resolve(amount); resolve(amount);
}).catch(error => reject([false, error])) }).catch(error => reject([false, error]))
@ -436,7 +436,7 @@ function withdrawFLO(floID, amount) {
let txQueries = []; let txQueries = [];
txQueries.push(updateBalance.consume(floID, "FLO", amount)); txQueries.push(updateBalance.consume(floID, "FLO", amount));
DB.transaction(txQueries).then(result => { DB.transaction(txQueries).then(result => {
blockchain.sendFLO(floID, amount); blockchain.sendFLO.init(floID, amount);
resolve("Withdrawal request is in process"); resolve("Withdrawal request is in process");
}).catch(error => reject(error)); }).catch(error => reject(error));
}).catch(error => reject(error)); }).catch(error => reject(error));
@ -445,7 +445,7 @@ function withdrawFLO(floID, amount) {
function retryWithdrawalFLO() { function retryWithdrawalFLO() {
DB.query("SELECT id, floID, amount FROM OutputFLO WHERE status=?", ["PENDING"]).then(results => { DB.query("SELECT id, floID, amount FROM OutputFLO WHERE status=?", ["PENDING"]).then(results => {
results.forEach(req => blockchain.resendFLO(req.floID, req.amount)) results.forEach(req => blockchain.sendFLO.retry(req.floID, req.amount, req.id))
}).catch(error => reject(error)); }).catch(error => reject(error));
} }
@ -527,9 +527,9 @@ confirmDepositToken.checkTx = function(sender, txid) {
let vin_sender = tx.transactionDetails.vin.filter(v => v.addr === sender) let vin_sender = tx.transactionDetails.vin.filter(v => v.addr === sender)
if (!vin_sender.length) if (!vin_sender.length)
return reject([true, "Transaction not sent by the sender"]); return reject([true, "Transaction not sent by the sender"]);
let amount_flo = tx.transactionDetails.vout.reduce((a, v) => blockchain.chest.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); let amount_flo = tx.transactionDetails.vout.reduce((a, v) => blockchain.chests.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0);
if (amount_flo == 0) if (amount_flo == 0)
return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestList loading from other nodes) return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes)
else else
resolve([token_name, amount_token, amount_flo]); resolve([token_name, amount_token, amount_flo]);
}).catch(error => reject([false, error])) }).catch(error => reject([false, error]))
@ -553,7 +553,7 @@ function withdrawToken(floID, token, amount) {
txQueries.push(updateBalance.consume(floID, token, amount)); txQueries.push(updateBalance.consume(floID, token, amount));
DB.transaction(txQueries).then(result => { DB.transaction(txQueries).then(result => {
//Send Token to user via token API //Send Token to user via token API
blockchain.sendToken(floID, token, amount); blockchain.sendToken.init(floID, token, amount);
resolve("Withdrawal request is in process"); resolve("Withdrawal request is in process");
}).catch(error => reject(error)); }).catch(error => reject(error));
}).catch(error => reject(error)); }).catch(error => reject(error));
@ -563,7 +563,7 @@ function withdrawToken(floID, token, amount) {
function retryWithdrawalToken() { function retryWithdrawalToken() {
DB.query("SELECT id, floID, token, amount FROM OutputToken WHERE status=?", ["PENDING"]).then(results => { DB.query("SELECT id, floID, token, amount FROM OutputToken WHERE status=?", ["PENDING"]).then(results => {
results.forEach(req => blockchain.resendToken(req.floID, req.token, req.amount)); results.forEach(req => blockchain.sendToken.retry(req.floID, req.token, req.amount, req.id));
}).catch(error => reject(error)); }).catch(error => reject(error));
} }
@ -669,7 +669,7 @@ function blockchainReCheck() {
clearTimeout(blockchainReCheck.timeout); clearTimeout(blockchainReCheck.timeout);
delete blockchainReCheck.timeout; delete blockchainReCheck.timeout;
} }
if (!blockchain.chest.list.length) if (!blockchain.chests.list.length)
return blockchainReCheck.timeout = setTimeout(blockchainReCheck, WAIT_TIME); return blockchainReCheck.timeout = setTimeout(blockchainReCheck, WAIT_TIME);
floBlockchainAPI.promisedAPI('api/blocks?limit=1').then(result => { floBlockchainAPI.promisedAPI('api/blocks?limit=1').then(result => {
@ -695,11 +695,11 @@ module.exports = {
get priceCountDown() { get priceCountDown() {
return coupling.price.lastTimes; return coupling.price.lastTimes;
}, },
get chest() { get chests() {
return blockchain.chest; return blockchain.chests;
}, },
set chest(c) { set chests(c) {
blockchain.chest = c; blockchain.chests = c;
}, },
addBuyOrder, addBuyOrder,
addSellOrder, addSellOrder,

View File

@ -128,7 +128,7 @@ function Account(req, res) {
timestamp: data.timestamp timestamp: data.timestamp
}, data.sign, data.floID, data.pubKey).then(req_str => { }, data.sign, data.floID, data.pubKey).then(req_str => {
market.getAccountDetails(data.floID).then(result => { market.getAccountDetails(data.floID).then(result => {
result.sinkID = market.chest.pick; result.sinkID = market.chests.pick;
if (trustedIDs.includes(data.floID)) if (trustedIDs.includes(data.floID))
result.subAdmin = true; result.subAdmin = true;
res.send(result); res.send(result);
@ -530,8 +530,8 @@ module.exports = {
set assetList(assets) { set assetList(assets) {
market.assetList = assets; market.assetList = assets;
}, },
set chest(c) { set chests(c) {
market.chest = c; market.chests = c;
}, },
set collectAndCall(fn) { set collectAndCall(fn) {
market.collectAndCall = fn; market.collectAndCall = fn;