From d3c568299488ace0e5fab24f59579d97585f8f8e Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 6 May 2022 23:43:44 +0530 Subject: [PATCH] Improved Triggers for coupling process - moved periodic-process start/stop to market.js - price.getRates() will update price only when updatePrice is true (ie, regular fn call wont update price unnecessarily) - Trigger coupling for asset on successful order placement - Return (or wait) if a coupling instance is running for asset - Reset timeout when price is updated and/or trade has happened. - Next coupling instance will be set for timeout upon completion of one instance - Stop all coupling instances on app pause (i.e., node is not master) --- src/app.js | 24 +--------- src/coupling.js | 121 ++++++++++++++++++++++++++++++++---------------- src/market.js | 45 +++++++++++------- src/price.js | 17 ++++--- src/request.js | 3 +- 5 files changed, 122 insertions(+), 88 deletions(-) diff --git a/src/app.js b/src/app.js index cafdda0..68989eb 100644 --- a/src/app.js +++ b/src/app.js @@ -4,10 +4,6 @@ const express = require('express'); //const sessions = require('express-session'); const Request = require('./request'); -const { - PERIOD_INTERVAL -} = require("./_constants")["app"]; - module.exports = function App(secret, DB) { if (!(this instanceof App)) @@ -98,7 +94,6 @@ module.exports = function App(secret, DB) { Request.secret = secret; //Properties - var periodInstance = null; let self = this; //return server, express-app @@ -132,23 +127,8 @@ module.exports = function App(secret, DB) { }); //(Node is not master) Pause serving the clients - self.pause = () => { - Request.pause(); - if (periodInstance !== null) { - clearInterval(periodInstance); - periodInstance = null; - } - } - + self.pause = () => Request.pause(); //(Node is master) Resume serving the clients - self.resume = () => { - Request.resume(); - Request.periodicProcess(); - if (periodInstance === null) - periodInstance = setInterval(Request.periodicProcess, PERIOD_INTERVAL); - } + self.resume = () => Request.resume(); - Object.defineProperty(self, "periodInstance", { - get: () => periodInstance - }); } \ No newline at end of file diff --git a/src/coupling.js b/src/coupling.js index febdf98..0cfdc05 100644 --- a/src/coupling.js +++ b/src/coupling.js @@ -3,6 +3,7 @@ const price = require("./price"); const { + WAIT_TIME, TRADE_HASH_PREFIX } = require("./_constants")["market"]; @@ -12,10 +13,30 @@ const updateBalance = {}; updateBalance.consume = (floID, token, amount) => ["UPDATE UserBalance SET quantity=quantity-? WHERE floID=? AND token=?", [amount, floID, token]]; updateBalance.add = (floID, token, amount) => ["INSERT INTO UserBalance (floID, token, quantity) VALUE (?, ?, ?) ON DUPLICATE KEY UPDATE quantity=quantity+?", [floID, token, amount, amount]]; -function startCouplingForAsset(asset) { - price.getRates(asset).then(cur_rate => { +const couplingInstance = {}, + couplingTimeout = {}; + +function stopAllInstance() { + for (let asset in couplingTimeout) { + if (couplingTimeout[asset]) + clearTimeout(couplingTimeout[asset]); + delete couplingInstance[asset]; + delete couplingTimeout[asset]; + } +} + +function startCouplingForAsset(asset, updatePrice = false) { + if (couplingInstance[asset] === true) { //if coupling is already running for asset + if (updatePrice) { //wait until current instance is over + if (couplingTimeout[asset]) clearTimeout(couplingTimeout[asset]); + couplingTimeout[asset] = setTimeout(() => startCouplingForAsset(asset, true), WAIT_TIME); + } + return; + } + price.getRates(asset, updatePrice).then(cur_rate => { cur_rate = cur_rate.toFixed(8); - processCoupling(asset, cur_rate); + couplingInstance[asset] = true; //set instance as running + recursiveCoupling(asset, cur_rate, updatePrice); }).catch(error => console.error(error)); } @@ -69,45 +90,64 @@ const getBestBuyer = (asset, cur_rate) => new Promise((resolve, reject) => { }).catch(error => reject(error)) }); -function processCoupling(asset, cur_rate) { - getBestPair(asset, cur_rate).then(best => { - //console.debug("Sell:", best.sell); - //console.debug("Buy:", best.buy); - let quantity = Math.min(best.buy.quantity, best.sell.quantity, best.sell.chip_quantity); - let txQueries = processOrders(best.sell, best.buy, asset, cur_rate, quantity); - //begin audit - beginAudit(best.sell.floID, best.buy.floID, asset, cur_rate, quantity).then(audit => { - //process txn query in SQL - DB.transaction(txQueries).then(_ => { - console.log(`Transaction was successful! BuyOrder:${best.buy.id}| SellOrder:${best.sell.id}`); - audit.end(); - price.updateLastTime(); - //Since a tx was successful, match again - processCoupling(asset, cur_rate); - }).catch(error => console.error(error)); - }).catch(error => console.error(error)); +function recursiveCoupling(asset, cur_rate, flag = false) { + processCoupling(asset, cur_rate).then(result => { + console.log(result); + if (couplingInstance[asset] === true) + recursiveCoupling(asset, cur_rate, true); }).catch(error => { - let noBuy, noSell; - if (error.buy === undefined) - noBuy = false; - else if (error.buy !== null) { - console.error(error.buy); - noBuy = null; - } else { - console.log("No valid buyOrders for Asset:", asset); - noBuy = true; + //noBuy = error[0], noSell = error[1], reason = error[2] + price.noOrder(asset, error[0], error[1]); + console.error(error[2]); + //set timeout for next coupling (if not order placement occurs) + if (flag) { + price.updateLastTime(asset); + if (couplingInstance[asset] === true && flag) { + //if price was updated and/or trade happened, reset timer + if (couplingTimeout[asset]) clearTimeout(couplingTimeout[asset]); + couplingTimeout[asset] = setTimeout(() => startCouplingForAsset(asset, true), price.MIN_TIME); + } } - if (error.sell === undefined) - noSell = false; - else if (error.sell !== null) { - console.error(error.sell); - noSell = null; - } else { - console.log("No valid sellOrders for Asset:", asset); - noSell = true; - } - price.noOrder(asset, noBuy, noSell); - }); + delete couplingInstance[asset]; + }) +} + +function processCoupling(asset, cur_rate) { + return new Promise((resolve, reject) => { + getBestPair(asset, cur_rate).then(best => { + //console.debug("Sell:", best.sell); + //console.debug("Buy:", best.buy); + let quantity = Math.min(best.buy.quantity, best.sell.quantity, best.sell.chip_quantity); + let txQueries = processOrders(best.sell, best.buy, asset, cur_rate, quantity); + //begin audit + beginAudit(best.sell.floID, best.buy.floID, asset, cur_rate, quantity).then(audit => { + //process txn query in SQL + DB.transaction(txQueries).then(_ => { + audit.end(); + resolve(`Transaction was successful! BuyOrder:${best.buy.id}| SellOrder:${best.sell.id}`) + }).catch(error => reject([null, null, error])); + }).catch(error => reject([null, null, error])); + }).catch(error => { + let noBuy, noSell; + if (error.buy === undefined) + noBuy = false; + else if (error.buy === null) + noBuy = true; + else { + console.error(error.buy); + noBuy = null; + } + if (error.sell === undefined) + noSell = false; + else if (error.sell === null) + noSell = true; + else { + console.error(error.sell); + noSell = null; + } + reject([noBuy, noSell, `No valid ${noSell? 'sellOrders': ''} | ${noBuy? 'buyOrders': ''} for Asset: ${asset}`]); + }); + }) } function processOrders(seller_best, buyer_best, asset, cur_rate, quantity) { @@ -208,6 +248,7 @@ function auditBalance(sellerID, buyerID, asset) { module.exports = { initiate: startCouplingForAsset, + stopAll: stopAllInstance, updateBalance, price, set DB(db) { diff --git a/src/market.js b/src/market.js index d734711..6055498 100644 --- a/src/market.js +++ b/src/market.js @@ -3,15 +3,14 @@ const coupling = require('./coupling'); const { + PERIOD_INTERVAL, + WAIT_TIME, + LAUNCH_SELLER_TAG, MAXIMUM_LAUNCH_SELL_CHIPS, TRADE_HASH_PREFIX, TRANSFER_HASH_PREFIX } = require('./_constants')["market"]; -const LAUNCH_SELLER_TAG = "launch-seller"; - -const MINI_PERIOD_INTERVAL = require('./_constants')['app']['PERIOD_INTERVAL'] / 10; - const updateBalance = coupling.updateBalance; var DB, assetList; //container for database and allowed assets @@ -135,9 +134,10 @@ function addSellOrder(floID, asset, quantity, min_price) { return reject(INVALID(`Invalid asset (${asset})`)); getAssetBalance.check(floID, asset, quantity).then(_ => { checkSellRequirement(floID, asset, quantity).then(_ => { - DB.query("INSERT INTO SellOrder(floID, asset, quantity, minPrice) VALUES (?, ?, ?, ?)", [floID, asset, quantity, min_price]) - .then(result => resolve('Sell Order placed successfully')) - .catch(error => reject(error)); + DB.query("INSERT INTO SellOrder(floID, asset, quantity, minPrice) VALUES (?, ?, ?, ?)", [floID, asset, quantity, min_price]).then(result => { + resolve('Sell Order placed successfully'); + coupling.initiate(asset); + }).catch(error => reject(error)); }).catch(error => reject(error)) }).catch(error => reject(error)); }); @@ -168,9 +168,10 @@ function addBuyOrder(floID, asset, quantity, max_price) { else if (!assetList.includes(asset)) return reject(INVALID(`Invalid asset (${asset})`)); getAssetBalance.check(floID, floGlobals.currency, quantity * max_price).then(_ => { - DB.query("INSERT INTO BuyOrder(floID, asset, quantity, maxPrice) VALUES (?, ?, ?, ?)", [floID, asset, quantity, max_price]) - .then(result => resolve('Buy Order placed successfully')) - .catch(error => reject(error)); + DB.query("INSERT INTO BuyOrder(floID, asset, quantity, maxPrice) VALUES (?, ?, ?, ?)", [floID, asset, quantity, max_price]).then(result => { + resolve('Buy Order placed successfully'); + coupling.initiate(asset); + }).catch(error => reject(error)); }).catch(error => reject(error)); }); } @@ -332,7 +333,6 @@ function confirmDepositFLO() { confirmDepositFLO.addSellChipsIfLaunchSeller(req.floID, amount).then(txQueries => { txQueries.push(updateBalance.add(req.floID, "FLO", amount)); txQueries.push(["UPDATE InputFLO SET status=?, amount=? WHERE id=?", ["SUCCESS", amount, req.id]]); - console.debug(txQueries) DB.transaction(txQueries) .then(result => console.debug("FLO deposited:", req.floID, amount)) .catch(error => console.error(error)) @@ -656,16 +656,30 @@ function checkDistributor(floID, asset) { function periodicProcess() { blockchainReCheck(); - assetList.forEach(asset => coupling.initiate(asset)); } +periodicProcess.start = function() { + periodicProcess.stop(); + periodicProcess(); + assetList.forEach(asset => coupling.initiate(asset)); + periodicProcess.instance = setInterval(periodicProcess, PERIOD_INTERVAL); +}; + +periodicProcess.stop = function() { + if (periodicProcess.instance !== undefined) { + clearInterval(periodicProcess.instance); + delete periodicProcess.instance; + } + coupling.stopAll(); +}; + function blockchainReCheck() { - if (blockchainReCheck.timeout) { + if (blockchainReCheck.timeout !== undefined) { clearTimeout(blockchainReCheck.timeout); - blockchainReCheck.timeout = null; + delete blockchainReCheck.timeout; } if (!global.sinkID) - return blockchainReCheck.timeout = setTimeout(blockchainReCheck, MINI_PERIOD_INTERVAL); + return blockchainReCheck.timeout = setTimeout(blockchainReCheck, WAIT_TIME); confirmDepositFLO(); confirmDepositToken(); @@ -674,7 +688,6 @@ function blockchainReCheck() { confirmWithdrawalFLO(); confirmWithdrawalToken(); } -blockchainReCheck.timeout = null; module.exports = { login, diff --git a/src/price.js b/src/price.js index e004ed5..9afc1f3 100644 --- a/src/price.js +++ b/src/price.js @@ -81,8 +81,8 @@ function loadRate(asset) { return new Promise((resolve, reject) => { if (typeof currentRate[asset] !== "undefined") return resolve(currentRate[asset]); - updateLastTime(asset); DB.query("SELECT rate FROM PriceHistory WHERE asset=? ORDER BY rec_time DESC LIMIT 1", [asset]).then(result => { + updateLastTime(asset); if (result.length) resolve(currentRate[asset] = result[0].rate); else @@ -136,12 +136,12 @@ fetchRates.USD_INR = function() { } */ -function getRates(asset) { +function getRates(asset, updatePrice = false) { return new Promise((resolve, reject) => { loadRate(asset).then(_ => { //console.debug(asset, currentRate[asset]); let cur_time = Date.now(); - if (cur_time - lastTime[asset] < MIN_TIME) //Minimum time to update not crossed: No update required + if (!updatePrice || cur_time - lastTime[asset] < MIN_TIME) //Minimum time to update not crossed: No update required resolve(currentRate[asset]); else if (noBuyOrder[asset] && noSellOrder[asset]) //Both are not available: No update required resolve(currentRate[asset]); @@ -152,10 +152,9 @@ function getRates(asset) { if (noBuyOrder[asset]) { //No Buy, But Sell available: Decrease the price let tmp_val = currentRate[asset] * (1 - DOWN_RATE); - if (tmp_val >= ratePast24hr * (1 - MAX_DOWN_PER_DAY)) { + if (tmp_val >= ratePast24hr * (1 - MAX_DOWN_PER_DAY)) currentRate[asset] = tmp_val; - updateLastTime(asset); - } else + else console.debug("Max Price down for the day has reached"); resolve(currentRate[asset]); } else if (noSellOrder[asset]) { @@ -163,10 +162,9 @@ function getRates(asset) { checkForRatedSellers(asset).then(result => { if (result) { let tmp_val = currentRate[asset] * (1 + UP_RATE); - if (tmp_val <= ratePast24hr * (1 + MAX_UP_PER_DAY)) { + if (tmp_val <= ratePast24hr * (1 + MAX_UP_PER_DAY)) currentRate[asset] = tmp_val; - updateLastTime(asset); - } else + else console.debug("Max Price up for the day has reached"); } }).catch(error => console.error(error)).finally(_ => resolve(currentRate[asset])); @@ -199,6 +197,7 @@ module.exports = { getRates, getHistory, updateLastTime, + MIN_TIME, noOrder(asset, buy, sell) { noBuyOrder[asset] = buy; noSellOrder[asset] = sell; diff --git a/src/request.js b/src/request.js index 28a186c..1eb0d06 100644 --- a/src/request.js +++ b/src/request.js @@ -490,7 +490,6 @@ module.exports = { WithdrawFLO, DepositToken, WithdrawToken, - periodicProcess: market.periodicProcess, AddUserTag, RemoveUserTag, AddDistributor, @@ -510,8 +509,10 @@ module.exports = { }, pause() { serving = false; + market.periodicProcess.stop(); }, resume() { serving = true; + market.periodicProcess.start(); } }; \ No newline at end of file