Improved Triggers for coupling process

- moved periodic-process start/stop to market.js
- price.getRates() will update price only when updatePrice is true (ie, regular fn call wont update price unnecessarily)
- Trigger coupling for asset on successful order placement
- Return (or wait) if a coupling instance is running for asset
- Reset timeout when price is updated and/or trade has happened.
- Next coupling instance will be set for timeout upon completion of one instance
- Stop all coupling instances on app pause (i.e., node is not master)
This commit is contained in:
sairajzero 2022-05-06 23:43:44 +05:30
parent f8721ee4f0
commit d3c5682994
5 changed files with 122 additions and 88 deletions

View File

@ -4,10 +4,6 @@ const express = require('express');
//const sessions = require('express-session'); //const sessions = require('express-session');
const Request = require('./request'); const Request = require('./request');
const {
PERIOD_INTERVAL
} = require("./_constants")["app"];
module.exports = function App(secret, DB) { module.exports = function App(secret, DB) {
if (!(this instanceof App)) if (!(this instanceof App))
@ -98,7 +94,6 @@ module.exports = function App(secret, DB) {
Request.secret = secret; Request.secret = secret;
//Properties //Properties
var periodInstance = null;
let self = this; let self = this;
//return server, express-app //return server, express-app
@ -132,23 +127,8 @@ module.exports = function App(secret, DB) {
}); });
//(Node is not master) Pause serving the clients //(Node is not master) Pause serving the clients
self.pause = () => { self.pause = () => Request.pause();
Request.pause();
if (periodInstance !== null) {
clearInterval(periodInstance);
periodInstance = null;
}
}
//(Node is master) Resume serving the clients //(Node is master) Resume serving the clients
self.resume = () => { self.resume = () => Request.resume();
Request.resume();
Request.periodicProcess();
if (periodInstance === null)
periodInstance = setInterval(Request.periodicProcess, PERIOD_INTERVAL);
}
Object.defineProperty(self, "periodInstance", {
get: () => periodInstance
});
} }

View File

@ -3,6 +3,7 @@
const price = require("./price"); const price = require("./price");
const { const {
WAIT_TIME,
TRADE_HASH_PREFIX TRADE_HASH_PREFIX
} = require("./_constants")["market"]; } = require("./_constants")["market"];
@ -12,10 +13,30 @@ const updateBalance = {};
updateBalance.consume = (floID, token, amount) => ["UPDATE UserBalance SET quantity=quantity-? WHERE floID=? AND token=?", [amount, floID, token]]; updateBalance.consume = (floID, token, amount) => ["UPDATE UserBalance SET quantity=quantity-? WHERE floID=? AND token=?", [amount, floID, token]];
updateBalance.add = (floID, token, amount) => ["INSERT INTO UserBalance (floID, token, quantity) VALUE (?, ?, ?) ON DUPLICATE KEY UPDATE quantity=quantity+?", [floID, token, amount, amount]]; updateBalance.add = (floID, token, amount) => ["INSERT INTO UserBalance (floID, token, quantity) VALUE (?, ?, ?) ON DUPLICATE KEY UPDATE quantity=quantity+?", [floID, token, amount, amount]];
function startCouplingForAsset(asset) { const couplingInstance = {},
price.getRates(asset).then(cur_rate => { couplingTimeout = {};
function stopAllInstance() {
for (let asset in couplingTimeout) {
if (couplingTimeout[asset])
clearTimeout(couplingTimeout[asset]);
delete couplingInstance[asset];
delete couplingTimeout[asset];
}
}
function startCouplingForAsset(asset, updatePrice = false) {
if (couplingInstance[asset] === true) { //if coupling is already running for asset
if (updatePrice) { //wait until current instance is over
if (couplingTimeout[asset]) clearTimeout(couplingTimeout[asset]);
couplingTimeout[asset] = setTimeout(() => startCouplingForAsset(asset, true), WAIT_TIME);
}
return;
}
price.getRates(asset, updatePrice).then(cur_rate => {
cur_rate = cur_rate.toFixed(8); cur_rate = cur_rate.toFixed(8);
processCoupling(asset, cur_rate); couplingInstance[asset] = true; //set instance as running
recursiveCoupling(asset, cur_rate, updatePrice);
}).catch(error => console.error(error)); }).catch(error => console.error(error));
} }
@ -69,45 +90,64 @@ const getBestBuyer = (asset, cur_rate) => new Promise((resolve, reject) => {
}).catch(error => reject(error)) }).catch(error => reject(error))
}); });
function processCoupling(asset, cur_rate) { function recursiveCoupling(asset, cur_rate, flag = false) {
getBestPair(asset, cur_rate).then(best => { processCoupling(asset, cur_rate).then(result => {
//console.debug("Sell:", best.sell); console.log(result);
//console.debug("Buy:", best.buy); if (couplingInstance[asset] === true)
let quantity = Math.min(best.buy.quantity, best.sell.quantity, best.sell.chip_quantity); recursiveCoupling(asset, cur_rate, true);
let txQueries = processOrders(best.sell, best.buy, asset, cur_rate, quantity);
//begin audit
beginAudit(best.sell.floID, best.buy.floID, asset, cur_rate, quantity).then(audit => {
//process txn query in SQL
DB.transaction(txQueries).then(_ => {
console.log(`Transaction was successful! BuyOrder:${best.buy.id}| SellOrder:${best.sell.id}`);
audit.end();
price.updateLastTime();
//Since a tx was successful, match again
processCoupling(asset, cur_rate);
}).catch(error => console.error(error));
}).catch(error => console.error(error));
}).catch(error => { }).catch(error => {
let noBuy, noSell; //noBuy = error[0], noSell = error[1], reason = error[2]
if (error.buy === undefined) price.noOrder(asset, error[0], error[1]);
noBuy = false; console.error(error[2]);
else if (error.buy !== null) { //set timeout for next coupling (if not order placement occurs)
console.error(error.buy); if (flag) {
noBuy = null; price.updateLastTime(asset);
} else { if (couplingInstance[asset] === true && flag) {
console.log("No valid buyOrders for Asset:", asset); //if price was updated and/or trade happened, reset timer
noBuy = true; if (couplingTimeout[asset]) clearTimeout(couplingTimeout[asset]);
couplingTimeout[asset] = setTimeout(() => startCouplingForAsset(asset, true), price.MIN_TIME);
}
} }
if (error.sell === undefined) delete couplingInstance[asset];
noSell = false; })
else if (error.sell !== null) { }
console.error(error.sell);
noSell = null; function processCoupling(asset, cur_rate) {
} else { return new Promise((resolve, reject) => {
console.log("No valid sellOrders for Asset:", asset); getBestPair(asset, cur_rate).then(best => {
noSell = true; //console.debug("Sell:", best.sell);
} //console.debug("Buy:", best.buy);
price.noOrder(asset, noBuy, noSell); let quantity = Math.min(best.buy.quantity, best.sell.quantity, best.sell.chip_quantity);
}); let txQueries = processOrders(best.sell, best.buy, asset, cur_rate, quantity);
//begin audit
beginAudit(best.sell.floID, best.buy.floID, asset, cur_rate, quantity).then(audit => {
//process txn query in SQL
DB.transaction(txQueries).then(_ => {
audit.end();
resolve(`Transaction was successful! BuyOrder:${best.buy.id}| SellOrder:${best.sell.id}`)
}).catch(error => reject([null, null, error]));
}).catch(error => reject([null, null, error]));
}).catch(error => {
let noBuy, noSell;
if (error.buy === undefined)
noBuy = false;
else if (error.buy === null)
noBuy = true;
else {
console.error(error.buy);
noBuy = null;
}
if (error.sell === undefined)
noSell = false;
else if (error.sell === null)
noSell = true;
else {
console.error(error.sell);
noSell = null;
}
reject([noBuy, noSell, `No valid ${noSell? 'sellOrders': ''} | ${noBuy? 'buyOrders': ''} for Asset: ${asset}`]);
});
})
} }
function processOrders(seller_best, buyer_best, asset, cur_rate, quantity) { function processOrders(seller_best, buyer_best, asset, cur_rate, quantity) {
@ -208,6 +248,7 @@ function auditBalance(sellerID, buyerID, asset) {
module.exports = { module.exports = {
initiate: startCouplingForAsset, initiate: startCouplingForAsset,
stopAll: stopAllInstance,
updateBalance, updateBalance,
price, price,
set DB(db) { set DB(db) {

View File

@ -3,15 +3,14 @@
const coupling = require('./coupling'); const coupling = require('./coupling');
const { const {
PERIOD_INTERVAL,
WAIT_TIME,
LAUNCH_SELLER_TAG,
MAXIMUM_LAUNCH_SELL_CHIPS, MAXIMUM_LAUNCH_SELL_CHIPS,
TRADE_HASH_PREFIX, TRADE_HASH_PREFIX,
TRANSFER_HASH_PREFIX TRANSFER_HASH_PREFIX
} = require('./_constants')["market"]; } = require('./_constants')["market"];
const LAUNCH_SELLER_TAG = "launch-seller";
const MINI_PERIOD_INTERVAL = require('./_constants')['app']['PERIOD_INTERVAL'] / 10;
const updateBalance = coupling.updateBalance; const updateBalance = coupling.updateBalance;
var DB, assetList; //container for database and allowed assets var DB, assetList; //container for database and allowed assets
@ -135,9 +134,10 @@ function addSellOrder(floID, asset, quantity, min_price) {
return reject(INVALID(`Invalid asset (${asset})`)); return reject(INVALID(`Invalid asset (${asset})`));
getAssetBalance.check(floID, asset, quantity).then(_ => { getAssetBalance.check(floID, asset, quantity).then(_ => {
checkSellRequirement(floID, asset, quantity).then(_ => { checkSellRequirement(floID, asset, quantity).then(_ => {
DB.query("INSERT INTO SellOrder(floID, asset, quantity, minPrice) VALUES (?, ?, ?, ?)", [floID, asset, quantity, min_price]) DB.query("INSERT INTO SellOrder(floID, asset, quantity, minPrice) VALUES (?, ?, ?, ?)", [floID, asset, quantity, min_price]).then(result => {
.then(result => resolve('Sell Order placed successfully')) resolve('Sell Order placed successfully');
.catch(error => reject(error)); coupling.initiate(asset);
}).catch(error => reject(error));
}).catch(error => reject(error)) }).catch(error => reject(error))
}).catch(error => reject(error)); }).catch(error => reject(error));
}); });
@ -168,9 +168,10 @@ function addBuyOrder(floID, asset, quantity, max_price) {
else if (!assetList.includes(asset)) else if (!assetList.includes(asset))
return reject(INVALID(`Invalid asset (${asset})`)); return reject(INVALID(`Invalid asset (${asset})`));
getAssetBalance.check(floID, floGlobals.currency, quantity * max_price).then(_ => { getAssetBalance.check(floID, floGlobals.currency, quantity * max_price).then(_ => {
DB.query("INSERT INTO BuyOrder(floID, asset, quantity, maxPrice) VALUES (?, ?, ?, ?)", [floID, asset, quantity, max_price]) DB.query("INSERT INTO BuyOrder(floID, asset, quantity, maxPrice) VALUES (?, ?, ?, ?)", [floID, asset, quantity, max_price]).then(result => {
.then(result => resolve('Buy Order placed successfully')) resolve('Buy Order placed successfully');
.catch(error => reject(error)); coupling.initiate(asset);
}).catch(error => reject(error));
}).catch(error => reject(error)); }).catch(error => reject(error));
}); });
} }
@ -332,7 +333,6 @@ function confirmDepositFLO() {
confirmDepositFLO.addSellChipsIfLaunchSeller(req.floID, amount).then(txQueries => { confirmDepositFLO.addSellChipsIfLaunchSeller(req.floID, amount).then(txQueries => {
txQueries.push(updateBalance.add(req.floID, "FLO", amount)); txQueries.push(updateBalance.add(req.floID, "FLO", amount));
txQueries.push(["UPDATE InputFLO SET status=?, amount=? WHERE id=?", ["SUCCESS", amount, req.id]]); txQueries.push(["UPDATE InputFLO SET status=?, amount=? WHERE id=?", ["SUCCESS", amount, req.id]]);
console.debug(txQueries)
DB.transaction(txQueries) DB.transaction(txQueries)
.then(result => console.debug("FLO deposited:", req.floID, amount)) .then(result => console.debug("FLO deposited:", req.floID, amount))
.catch(error => console.error(error)) .catch(error => console.error(error))
@ -656,16 +656,30 @@ function checkDistributor(floID, asset) {
function periodicProcess() { function periodicProcess() {
blockchainReCheck(); blockchainReCheck();
assetList.forEach(asset => coupling.initiate(asset));
} }
periodicProcess.start = function() {
periodicProcess.stop();
periodicProcess();
assetList.forEach(asset => coupling.initiate(asset));
periodicProcess.instance = setInterval(periodicProcess, PERIOD_INTERVAL);
};
periodicProcess.stop = function() {
if (periodicProcess.instance !== undefined) {
clearInterval(periodicProcess.instance);
delete periodicProcess.instance;
}
coupling.stopAll();
};
function blockchainReCheck() { function blockchainReCheck() {
if (blockchainReCheck.timeout) { if (blockchainReCheck.timeout !== undefined) {
clearTimeout(blockchainReCheck.timeout); clearTimeout(blockchainReCheck.timeout);
blockchainReCheck.timeout = null; delete blockchainReCheck.timeout;
} }
if (!global.sinkID) if (!global.sinkID)
return blockchainReCheck.timeout = setTimeout(blockchainReCheck, MINI_PERIOD_INTERVAL); return blockchainReCheck.timeout = setTimeout(blockchainReCheck, WAIT_TIME);
confirmDepositFLO(); confirmDepositFLO();
confirmDepositToken(); confirmDepositToken();
@ -674,7 +688,6 @@ function blockchainReCheck() {
confirmWithdrawalFLO(); confirmWithdrawalFLO();
confirmWithdrawalToken(); confirmWithdrawalToken();
} }
blockchainReCheck.timeout = null;
module.exports = { module.exports = {
login, login,

View File

@ -81,8 +81,8 @@ function loadRate(asset) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (typeof currentRate[asset] !== "undefined") if (typeof currentRate[asset] !== "undefined")
return resolve(currentRate[asset]); return resolve(currentRate[asset]);
updateLastTime(asset);
DB.query("SELECT rate FROM PriceHistory WHERE asset=? ORDER BY rec_time DESC LIMIT 1", [asset]).then(result => { DB.query("SELECT rate FROM PriceHistory WHERE asset=? ORDER BY rec_time DESC LIMIT 1", [asset]).then(result => {
updateLastTime(asset);
if (result.length) if (result.length)
resolve(currentRate[asset] = result[0].rate); resolve(currentRate[asset] = result[0].rate);
else else
@ -136,12 +136,12 @@ fetchRates.USD_INR = function() {
} }
*/ */
function getRates(asset) { function getRates(asset, updatePrice = false) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
loadRate(asset).then(_ => { loadRate(asset).then(_ => {
//console.debug(asset, currentRate[asset]); //console.debug(asset, currentRate[asset]);
let cur_time = Date.now(); let cur_time = Date.now();
if (cur_time - lastTime[asset] < MIN_TIME) //Minimum time to update not crossed: No update required if (!updatePrice || cur_time - lastTime[asset] < MIN_TIME) //Minimum time to update not crossed: No update required
resolve(currentRate[asset]); resolve(currentRate[asset]);
else if (noBuyOrder[asset] && noSellOrder[asset]) //Both are not available: No update required else if (noBuyOrder[asset] && noSellOrder[asset]) //Both are not available: No update required
resolve(currentRate[asset]); resolve(currentRate[asset]);
@ -152,10 +152,9 @@ function getRates(asset) {
if (noBuyOrder[asset]) { if (noBuyOrder[asset]) {
//No Buy, But Sell available: Decrease the price //No Buy, But Sell available: Decrease the price
let tmp_val = currentRate[asset] * (1 - DOWN_RATE); let tmp_val = currentRate[asset] * (1 - DOWN_RATE);
if (tmp_val >= ratePast24hr * (1 - MAX_DOWN_PER_DAY)) { if (tmp_val >= ratePast24hr * (1 - MAX_DOWN_PER_DAY))
currentRate[asset] = tmp_val; currentRate[asset] = tmp_val;
updateLastTime(asset); else
} else
console.debug("Max Price down for the day has reached"); console.debug("Max Price down for the day has reached");
resolve(currentRate[asset]); resolve(currentRate[asset]);
} else if (noSellOrder[asset]) { } else if (noSellOrder[asset]) {
@ -163,10 +162,9 @@ function getRates(asset) {
checkForRatedSellers(asset).then(result => { checkForRatedSellers(asset).then(result => {
if (result) { if (result) {
let tmp_val = currentRate[asset] * (1 + UP_RATE); let tmp_val = currentRate[asset] * (1 + UP_RATE);
if (tmp_val <= ratePast24hr * (1 + MAX_UP_PER_DAY)) { if (tmp_val <= ratePast24hr * (1 + MAX_UP_PER_DAY))
currentRate[asset] = tmp_val; currentRate[asset] = tmp_val;
updateLastTime(asset); else
} else
console.debug("Max Price up for the day has reached"); console.debug("Max Price up for the day has reached");
} }
}).catch(error => console.error(error)).finally(_ => resolve(currentRate[asset])); }).catch(error => console.error(error)).finally(_ => resolve(currentRate[asset]));
@ -199,6 +197,7 @@ module.exports = {
getRates, getRates,
getHistory, getHistory,
updateLastTime, updateLastTime,
MIN_TIME,
noOrder(asset, buy, sell) { noOrder(asset, buy, sell) {
noBuyOrder[asset] = buy; noBuyOrder[asset] = buy;
noSellOrder[asset] = sell; noSellOrder[asset] = sell;

View File

@ -490,7 +490,6 @@ module.exports = {
WithdrawFLO, WithdrawFLO,
DepositToken, DepositToken,
WithdrawToken, WithdrawToken,
periodicProcess: market.periodicProcess,
AddUserTag, AddUserTag,
RemoveUserTag, RemoveUserTag,
AddDistributor, AddDistributor,
@ -510,8 +509,10 @@ module.exports = {
}, },
pause() { pause() {
serving = false; serving = false;
market.periodicProcess.stop();
}, },
resume() { resume() {
serving = true; serving = true;
market.periodicProcess.start();
} }
}; };