diff --git a/src/app.js b/src/app.js index 205c35b..f70cba0 100644 --- a/src/app.js +++ b/src/app.js @@ -114,8 +114,8 @@ module.exports = function App(secret, DB) { set: (assets) => Request.assetList = assets }); - Object.defineProperty(self, "chest", { - set: (chest) => Request.chest = chest + Object.defineProperty(self, "chests", { + set: (chests) => Request.chests = chests }); Object.defineProperty(self, "collectAndCall", { diff --git a/src/backup/head.js b/src/backup/head.js index 9412e36..374c10c 100644 --- a/src/backup/head.js +++ b/src/backup/head.js @@ -20,7 +20,7 @@ const SLAVE_MODE = 0, const sinkList = {}; -app.chest = { +app.chests = { get list() { return Object.keys(sinkList); }, @@ -125,24 +125,33 @@ function transferMoneyToNewSink(oldSinkID, oldSinkKey, newSink) { } */ -function collectAndCall(sinkID, callback) { +function collectAndCall(sinkID, callback, timeout = null) { if (!(callback instanceof Function)) throw Error("callback should be a function"); - if (sinkID in shares_collected) //if there is already a collect for sinkID, then just add the callback to queue - return shares_collected[sinkID].callbacks.push(callback); - - shares_collected[sinkID] = { - callbacks: [callback], - shares: {} - }; - for (let floID in connectedSlaves) - requestShare(connectedSlaves[floID], sinkID); - DB.query("SELECT share FROM sinkShares WHERE floID=?", [sinkID]).then(result => { - if (result.length) - collectShares(myFloID, sinkID, Crypto.AES.decrypt(result[0].share, global.myPrivKey)) - }).catch(error => console.error(error)) + if (!(sinkID in shares_collected)) { //if not already collecting shares for sinkID, then initiate collection + shares_collected[sinkID] = { + callbacks: [], + shares: {} + }; + for (let floID in connectedSlaves) + requestShare(connectedSlaves[floID], sinkID); + DB.query("SELECT share FROM sinkShares WHERE floID=?", [sinkID]).then(result => { + if (result.length) + collectShares(myFloID, sinkID, Crypto.AES.decrypt(result[0].share, global.myPrivKey)) + }).catch(error => console.error(error)) + } + shares_collected[sinkID].callbacks.push(callback); + if (timeout) + setTimeout(() => { + if (sinkID in shares_collected) { + let i = shares_collected[sinkID].callbacks.indexOf(callback); + delete shares_collected[sinkID].callbacks[i]; //deleting will empty the index, but space will be there so that order of other indexes are not affected + } + }, timeout); } +collectAndCall.isAlive = (sinkID, callbackRef) => (sinkID in shares_collected && shares_collected[sinkID].callbacks.indexOf(callbackRef) != -1); + function collectShares(floID, sinkID, share) { if (_mode !== MASTER_MODE) return console.warn("Not serving as master"); @@ -153,7 +162,7 @@ function collectShares(floID, sinkID, share) { let sinkKey = floCrypto.retrieveShamirSecret([].concat(...Object.values(shares_collected[sinkID].shares))); if (floCrypto.verifyPrivKey(sinkKey, sinkID)) { console.log("Shares collected successfully for", sinkID); - shares_collected[sinkID].callbacks.map(fn => fn(sinkKey)); + shares_collected[sinkID].callbacks.forEach(fn => fn instanceof Function ? fn(sinkKey) : null); delete shares_collected[sinkID]; } } catch { diff --git a/src/blockchain.js b/src/blockchain.js new file mode 100644 index 0000000..bf9e400 --- /dev/null +++ b/src/blockchain.js @@ -0,0 +1,130 @@ +'use strict'; + +var collectAndCall; //container for collectAndCall function from backup module +var chests; //container for blockchain ids (where assets are stored) + +const balance_locked = {}, + balance_cache = {}, + callbackCollection = { + FLO: {}, + token: {} + }; + +function getSinkID(amount, asset = "FLO", sinkList = null) { + return new Promise((resolve, reject) => { + if (!sinkList) + sinkList = chests.list.map(s => [s, s in balance_cache ? balance_cache[s][asset] || 0 : 0]) //TODO: improve sorting + .sort((a, b) => b[1] - a[1]).map(x => x[0]); + if (!sinkList.length) + return reject(`Insufficient balance in chests for asset(${asset})`); + let sinkID = sinkList.shift(); + (asset === "FLO" ? floBlockchainAPI.getBalance(sinkID) : floTokenAPI.getBalance(sinkID, asset)).then(balance => { + if (!(sinkID in balance_cache)) + balance_cache[sinkID] = {}; + balance_cache[sinkID][asset] = balance; + if (balance > (amount + (sinkID in balance_locked ? balance_locked[sinkID][asset] || 0 : 0))) + return resolve(sinkID); + else + getSinkID(amount, asset, sinkList) + .then(result => resolve(result)) + .catch(error => reject(error)) + }).catch(error => { + console.error(error); + getSinkID(amount, asset, sinkList) + .then(result => resolve(result)) + .catch(error => reject(error)) + }); + }) +} + +function sendFLO(floID, amount, id) { + getSinkID(amount).then(sinkID => { + let callback = (sinkKey) => { + //Send FLO to user via blockchain API + floBlockchainAPI.sendTx(sinkID, floID, amount, sinkKey, '(withdrawal from market)').then(txid => { + if (!txid) + throw Error("Transaction not successful"); + //Transaction was successful, Add in DB + DB.query("UPDATE OutputFLO SET status=?, txid=? WHERE id=?", ["WAITING_CONFIRMATION", txid, id]) + .then(_ => null).catch(error => console.error(error)); + }).catch(error => console.error(error)).finally(_ => { + delete callbackCollection.FLO[id]; + balance_locked[sinkID].FLO -= amount; + }); + } + collectAndCall(sinkID, callback); + callbackCollection.FLO[id] = callback; + if (!(sinkID in balance_locked)) + balance_locked[sinkID] = {}; + balance_locked[sinkID].FLO = (balance_locked[sinkID].FLO || 0) + amount; + }).catch(error => console.error(error)) +} + +function sendFLO_init(floID, amount) { + DB.query("INSERT INTO OutputFLO (floID, amount, status) VALUES (?, ?, ?)", [floID, amount, "PENDING"]) + .then(result => sendFLO(floID, amount, result.insertId)) + .catch(error => console.error(error)) +} + +function sendFLO_retry(floID, amount, id) { + if (id in callbackCollection.FLO) + console.debug("A callback is already pending for this FLO transfer"); + else + sendFLO(floID, amount, id); +} + +function sendToken(floID, token, amount, id) { + getSinkID(amount, token).then(sinkID => { + let callback = (sinkKey) => { + //Send Token to user via token API + floTokenAPI.sendToken(sinkKey, amount, floID, '(withdrawal from market)', token).then(txid => { + if (!txid) + throw Error("Transaction not successful"); + //Transaction was successful, Add in DB + DB.query("UPDATE OutputToken SET status=?, txid=? WHERE id=?", ["WAITING_CONFIRMATION", txid, id]) + .then(_ => null).catch(error => console.error(error)); + }).catch(error => console.error(error)).finally(_ => { + delete callbackCollection.token[id]; + balance_locked[sinkID][token] -= amount; + }); + } + collectAndCall(sinkID, callback); + callbackCollection.token[id] = callback; + if (!(sinkID in balance_locked)) + balance_locked[sinkID] = {}; + balance_locked[sinkID][token] = (balance_locked[sinkID][token] || 0) + amount; + }).catch(error => console.error(error)) +} + +function sendToken_init() { + DB.query("INSERT INTO OutputToken (floID, token, amount, status) VALUES (?, ?, ?, ?)", [floID, token, amount, "PENDING"]) + .then(result => sendToken(floID, amount, result.insertId)) + .catch(error => console.error(error)) +} + +function sendToken_retry(floID, token, amount, id) { + if (id in callbackCollection.token) + console.debug("A callback is already pending for this token transfer"); + else + sendToken(floID, token, amount, id); +} + +module.exports = { + set collectAndCall(fn) { + collectAndCall = fn; + }, + get chests() { + return chests; + }, + set chests(c) { + chests = c; + }, + sendFLO: { + init: sendFLO_init, + retry: sendFLO_retry + }, + sendToken: { + init: sendToken_init, + retry: sendToken_retry + } +} \ No newline at end of file diff --git a/src/market.js b/src/market.js index 31fc3f0..d322e66 100644 --- a/src/market.js +++ b/src/market.js @@ -390,9 +390,9 @@ confirmDepositFLO.checkTx = function(sender, txid) { return reject([false, "Transaction not included in any block yet"]); if (!tx.confirmations) return reject([false, "Transaction not confirmed yet"]); - let amount = tx.vout.reduce((a, v) => blockchain.chest.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); + let amount = tx.vout.reduce((a, v) => blockchain.chests.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); if (amount == 0) - return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestList loading from other nodes) + return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes) else resolve(amount); }).catch(error => reject([false, error])) @@ -436,7 +436,7 @@ function withdrawFLO(floID, amount) { let txQueries = []; txQueries.push(updateBalance.consume(floID, "FLO", amount)); DB.transaction(txQueries).then(result => { - blockchain.sendFLO(floID, amount); + blockchain.sendFLO.init(floID, amount); resolve("Withdrawal request is in process"); }).catch(error => reject(error)); }).catch(error => reject(error)); @@ -445,7 +445,7 @@ function withdrawFLO(floID, amount) { function retryWithdrawalFLO() { DB.query("SELECT id, floID, amount FROM OutputFLO WHERE status=?", ["PENDING"]).then(results => { - results.forEach(req => blockchain.resendFLO(req.floID, req.amount)) + results.forEach(req => blockchain.sendFLO.retry(req.floID, req.amount, req.id)) }).catch(error => reject(error)); } @@ -527,9 +527,9 @@ confirmDepositToken.checkTx = function(sender, txid) { let vin_sender = tx.transactionDetails.vin.filter(v => v.addr === sender) if (!vin_sender.length) return reject([true, "Transaction not sent by the sender"]); - let amount_flo = tx.transactionDetails.vout.reduce((a, v) => blockchain.chest.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); + let amount_flo = tx.transactionDetails.vout.reduce((a, v) => blockchain.chests.includes(v.scriptPubKey.addresses[0]) ? a + v.value : a, 0); if (amount_flo == 0) - return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestList loading from other nodes) + return reject([true, "Transaction receiver is not market ID"]); //Maybe reject as false? (to compensate delay in chestsList loading from other nodes) else resolve([token_name, amount_token, amount_flo]); }).catch(error => reject([false, error])) @@ -553,7 +553,7 @@ function withdrawToken(floID, token, amount) { txQueries.push(updateBalance.consume(floID, token, amount)); DB.transaction(txQueries).then(result => { //Send Token to user via token API - blockchain.sendToken(floID, token, amount); + blockchain.sendToken.init(floID, token, amount); resolve("Withdrawal request is in process"); }).catch(error => reject(error)); }).catch(error => reject(error)); @@ -563,7 +563,7 @@ function withdrawToken(floID, token, amount) { function retryWithdrawalToken() { DB.query("SELECT id, floID, token, amount FROM OutputToken WHERE status=?", ["PENDING"]).then(results => { - results.forEach(req => blockchain.resendToken(req.floID, req.token, req.amount)); + results.forEach(req => blockchain.sendToken.retry(req.floID, req.token, req.amount, req.id)); }).catch(error => reject(error)); } @@ -669,7 +669,7 @@ function blockchainReCheck() { clearTimeout(blockchainReCheck.timeout); delete blockchainReCheck.timeout; } - if (!blockchain.chest.list.length) + if (!blockchain.chests.list.length) return blockchainReCheck.timeout = setTimeout(blockchainReCheck, WAIT_TIME); floBlockchainAPI.promisedAPI('api/blocks?limit=1').then(result => { @@ -695,11 +695,11 @@ module.exports = { get priceCountDown() { return coupling.price.lastTimes; }, - get chest() { - return blockchain.chest; + get chests() { + return blockchain.chests; }, - set chest(c) { - blockchain.chest = c; + set chests(c) { + blockchain.chests = c; }, addBuyOrder, addSellOrder, diff --git a/src/request.js b/src/request.js index 53bfe56..10da684 100644 --- a/src/request.js +++ b/src/request.js @@ -128,7 +128,7 @@ function Account(req, res) { timestamp: data.timestamp }, data.sign, data.floID, data.pubKey).then(req_str => { market.getAccountDetails(data.floID).then(result => { - result.sinkID = market.chest.pick; + result.sinkID = market.chests.pick; if (trustedIDs.includes(data.floID)) result.subAdmin = true; res.send(result); @@ -530,8 +530,8 @@ module.exports = { set assetList(assets) { market.assetList = assets; }, - set chest(c) { - market.chest = c; + set chests(c) { + market.chests = c; }, set collectAndCall(fn) { market.collectAndCall = fn;