From 0283be05db2e54b3d492dca95c0b93f2325d191f Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 27 Jan 2023 17:31:02 +0530 Subject: [PATCH 01/12] Limit max response size --- lib/services/address/index.js | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 84ba9da0..3977219b 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -16,7 +16,7 @@ var utils = require('../../utils'); var LRU = require('lru-cache'); var XXHash = require('xxhash'); - +const MAX_TX_QUERY_LIMIT = 1000; // See rationale about this cache at function getTxList(next) const TXID_LIST_CACHE_ITEMS = 250; // nr of items (this translates to: consecutive @@ -365,6 +365,11 @@ AddressService.prototype.getAddressUnspentOutputs = function(address, options, c utxoStream.on('data', function(data) { + if(results.length >= MAX_TX_QUERY_LIMIT) { //Max array limit reached, end response + utxoStream.emit('end'); + return; + } + var key = self._encoding.decodeUtxoIndexKey(data.key); var value = self._encoding.decodeUtxoIndexValue(data.value); @@ -551,6 +556,12 @@ AddressService.prototype._getAddressTxidHistory = function(address, options, cal txIdTransformStream._transform = function(chunk, enc, callback) { var txInfo = self._encoding.decodeAddressIndexKey(chunk); + + if(results.length >= MAX_TX_QUERY_LIMIT) { //Max array limit reached, end response + txIdTransformStream.emit('end'); + return; + } + results.push({ txid: txInfo.txid, height: txInfo.height }); callback(); }; From e13bd5e3e605702fbe76e7df22cd41949872a7c1 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 27 Jan 2023 17:34:06 +0530 Subject: [PATCH 02/12] Adding _streamAddressSummary - Fn uses streamer to process data. Thus doesnt store the entire list of txid or tx details - streamer fn can process the tx data as required --- lib/services/address/index.js | 116 ++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 3977219b..8d4d1a1b 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -574,6 +574,122 @@ AddressService.prototype._getAddressTxidHistory = function(address, options, cal }; +AddressService.prototype._streamAddressSummary = function(address, options, streamer, callback) { + var self = this; + + options = options || {}; + options.start = options.start || 0; + options.end = options.end || 0xffffffff; + + options.from = options.from || 0; //TODO: check from/to options are working or not + options.to = options.to || 0xffffffff; + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + options.endHeightBuf = new Buffer(4); + options.endHeightBuf.writeUInt32BE(options.end); + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + //declare the queue to process tx data + + var q = async.queue(function(task, cb) { + + let {id, options} = task; + + if (id.height === 0xffffffff) { + + return self._mempool.getMempoolTransaction(id.txid, function(err, tx) { + + if (err || !tx) { + return cb(err || new Error('Address Service: could not find tx: ' + id.txid)); + } + self._transaction.setTxMetaInfo(tx, options, cb); + + }); + + } + + self._transaction.getDetailedTransaction(id.txid, options, cb); + + }, 4); + + q.pause(); //pause and wait until queue is set + + function process_chunk(err, tx){ + + streamer(err, tx); + + if(err){ + q.kill(); + + return callback(); + } + + } + + async.waterfall([ + + // query the mempool for relevant txs for this address + function(next) { + + if (!options.queryMempool) { + return next(null, []); + } + + self._mempool.getTxidsByAddress(address, 'both', next); + }, + + // add the meta data such as input values, etc. + function(mempoolTxids, next) { + + if (mempoolTxids.length <= 0) { + return next(); + } + + mempoolTxids.map(id => q.push(id, process_chunk)); + next(); + }, + // stream the rest of the confirmed txids out of the address index + function(next) { + + var txIdTransformStream = new Transform({ objectMode: true }); + + txIdTransformStream._flush = function(cb) { + txIdTransformStream.emit('end'); + cb(); + }; + + txIdTransformStream.on('error', function(err) { + log.error('Address Service: txstream err: ' + err); + txIdTransformStream.unpipe(); + }); + + txIdTransformStream.on('end', function() { + q.resume(); + }); + + txIdTransformStream._transform = function(chunk, enc, cb) { + var txInfo = self._encoding.decodeAddressIndexKey(chunk); + q.push({ txid: txInfo.txid, height: txInfo.height }, process_chunk); + + cb(); + }; + + var txidStream = self._getTxidStream(address, options); + txidStream.pipe(txIdTransformStream); + + q.drain(next); + + } + ], callback); + +} + AddressService.prototype._removeBlock = function(block, callback) { var self = this; From 4472ed83943f83981872474328f14f7d9c5a14c7 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 27 Jan 2023 17:38:37 +0530 Subject: [PATCH 03/12] Changing fns to use _streamAddressSummary Functions updated: - getAddressHistory - getAddressSummary (old fns are kept as it is and renamed to __getAddressHistory and __getAddressSummary respectively) --- lib/services/address/index.js | 138 +++++++++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 9 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 8d4d1a1b..45962983 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -67,8 +67,8 @@ AddressService.dependencies = [ // then I would pass back [tx1, tx2] in that order // // Instead of passing addresses, with from>0, options.cacheKey can be used to define the address set. -// -AddressService.prototype.getAddressHistory = function(addresses, options, callback) { +//(old one: non-optimized for large data) +AddressService.prototype.__getAddressHistory = function(addresses, options, callback) { var self = this; var cacheUsed = false; @@ -173,8 +173,62 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba }; +AddressService.prototype.getAddressHistory = function(addresses, options, streamer, callback) { + var self = this; + + options = options || {}; + options.from = options.from || 0; + options.to = options.to || 0xffffffff; + + if(!callback){ //if only 3 args, then streamer is callback + callback = streamer; + streamer = () => null; //NULL fn + } + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + if (_.isString(addresses)) { + addresses = [addresses]; + } + + var results = { + totalCount: 0, + items: [], + } + + async.eachLimit(addresses, 4, function(address, next) { + + self._streamAddressSummary(address, options, function(err, tx){ + + results.totalCount++; + + if(err) + return log.error(err); + + if(!options.txNotNeeded && results.items.length < MAX_TX_QUERY_LIMIT) + results.items.push(tx); + + streamer(null, tx); + + }, next); + + }, function(err) { + + if (err) { + return callback(err); + } + + //TODO: sorting of tx list (results.items) + callback(null, results); + + }) + +} // this is basically the same as _getAddressHistory apart from the summary -AddressService.prototype.getAddressSummary = function(address, options, callback) { +//(old one: non-optimized for large data) +AddressService.prototype.__getAddressSummary = function(address, options, callback) { var self = this; @@ -218,6 +272,67 @@ AddressService.prototype.getAddressSummary = function(address, options, callback }; +AddressService.prototype.getAddressSummary = function(address, options, streamer, callback) { + + var self = this; + + options = options || {}; + options.from = options.from || 0; + options.to = options.to || 0xffffffff; + options.txNotNeeded = true; //no need to store tx details in result + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + if(!callback){ //if only 3 args, then streamer is callback + callback = streamer; + streamer = () => null; //NULL fn + } + + var result = { + addrStr: address, + balance: 0, + balanceSat: 0, + totalReceived: 0, + totalReceivedSat: 0, + totalSent: 0, + totalSentSat: 0, + unconfirmedBalance: 0, + unconfirmedBalanceSat: 0, + unconfirmedTxApperances: 0, + txApperances: 0, + }; + + self._aggregateAddressSummaryResult(tx, address, result); + + self.getAddressHistory(address, options, function(err, tx) { + + if(err) + return log.error(err); + + if(tx) + self._aggregateAddressSummaryResult(tx, address, result); + + streamer(null, tx); + + }, function(err) { + + if (err) { + return callback(err); + } + + result.balance = Unit.fromSatoshis(result.balanceSat).toBTC(); + result.totalReceived = Unit.fromSatoshis(result.totalReceivedSat).toBTC(); + result.totalSent = Unit.fromSatoshis(result.totalSentSat).toBTC(); + result.unconfirmedBalance = Unit.fromSatoshis(result.unconfirmedBalanceSat).toBTC(); + + callback(null, result); + + }); + +} + AddressService.prototype._setOutputResults = function(tx, address, result) { for(var j = 0; j < tx.outputs.length; j++) { @@ -260,14 +375,10 @@ AddressService.prototype._setInputResults = function(tx, address, result) { } }; -AddressService.prototype._getAddressSummaryResult = function(txs, address, result, options) { - +AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, result, options){ + var self = this; - for(var i = 0; i < txs.length; i++) { - - var tx = txs[i]; - self._setOutputResults(tx, address, result); self._setInputResults(tx, address, result); @@ -278,6 +389,15 @@ AddressService.prototype._getAddressSummaryResult = function(txs, address, resul result.transactions.push(tx.txid()); } +} + +AddressService.prototype._getAddressSummaryResult = function(txs, address, result, options) { + + var self = this; + + for(var i = 0; i < txs.length; i++) { + var tx = txs[i]; + self._aggregateAddressSummaryResult(tx, address, result, options); } return result; From 2145fdb056f9bf008667f489dde00939575331c1 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 27 Jan 2023 18:07:38 +0530 Subject: [PATCH 04/12] pass 'express-ws' module to service setupRoutes - flosight-api uses 'express-ws' module for ws api calls --- lib/services/web/index.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/services/web/index.js b/lib/services/web/index.js index d9a5b48f..bd28a786 100644 --- a/lib/services/web/index.js +++ b/lib/services/web/index.js @@ -4,6 +4,7 @@ var fs = require('fs'); var http = require('http'); var https = require('https'); var express = require('express'); +var express_ws = require('express-ws'); var bodyParser = require('body-parser'); var socketio = require('socket.io'); var inherits = require('util').inherits; @@ -105,7 +106,7 @@ WebService.prototype.setupAllRoutes = function() { if(service.getRoutePrefix && service.setupRoutes) { this.app.use('/' + this.node.services[key].getRoutePrefix(), subApp); - this.node.services[key].setupRoutes(subApp, express); + this.node.services[key].setupRoutes(subApp, express, express_ws); } else { log.debug('No routes defined for: ' + key); } From 6c164993bf176300890c56d4ff88ac87440893c7 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Fri, 27 Jan 2023 22:33:21 +0530 Subject: [PATCH 05/12] Update package.json --- .gitignore | 1 + package.json | 13 +++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index b4d08d78..47fb7864 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ coverage/* **/*.creator *.log *.tmp +*.tmp.* .DS_Store bin/florincoin* bin/SHA256SUMS.asc diff --git a/package.json b/package.json index 47ae3fe0..e39e9222 100644 --- a/package.json +++ b/package.json @@ -5,12 +5,12 @@ "node": ">=8.0.0" }, "author": "BitPay ", - "version": "5.0.8", + "version": "5.0.9-beta-rm", "main": "./index.js", - "repository": "git://github.com/oipwg/flocore-node.git", - "homepage": "https://github.com/oipwg/flocore-node", + "repository": "git://github.com/ranchimall/flocore-node.git", + "homepage": "https://github.com/ranchimall/flocore-node", "bugs": { - "url": "https://github.com/oipwg/flocore-node/issues" + "url": "https://github.com/ranchimall/flocore-node/issues" }, "bin": { "flocore-node": "./bin/flocore-node" @@ -36,13 +36,14 @@ "commander": "^2.8.1", "errno": "^0.1.4", "express": "^4.13.3", + "express-ws": "^5.0.2", "fcoin": "^1.1.4", "flocore-lib": "^0.15.2", "flocore-message": "^1.0.7", "flocore-p2p": "^5.0.0-beta.8", "florincoind-rpc": "0.7.1", - "flosight-api": "^5.0.0-beta.75", - "flosight-ui": "^5.0.0-beta.72", + "flosight-api": "github:ranchimall/flosight-api", + "flosight-ui": "github:ranchimall/flosight-ui", "leveldown": "^2.0.0", "levelup": "^2.0.0", "liftoff": "^2.2.0", From dbfe39991f6d80157c2588b7f95ff117125761dd Mon Sep 17 00:00:00 2001 From: sairajzero Date: Sat, 28 Jan 2023 02:04:40 +0530 Subject: [PATCH 06/12] Fixed: Address-summary request not responding --- lib/services/address/index.js | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 45962983..8028da06 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -304,15 +304,13 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer txApperances: 0, }; - self._aggregateAddressSummaryResult(tx, address, result); - self.getAddressHistory(address, options, function(err, tx) { if(err) return log.error(err); if(tx) - self._aggregateAddressSummaryResult(tx, address, result); + self._aggregateAddressSummaryResult(tx, address, result, options); streamer(null, tx); @@ -717,9 +715,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre //declare the queue to process tx data - var q = async.queue(function(task, cb) { - - let {id, options} = task; + var q = async.queue(function(id, cb) { if (id.height === 0xffffffff) { @@ -790,6 +786,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre }); txIdTransformStream.on('end', function() { + q.drain = next; q.resume(); }); @@ -802,9 +799,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre var txidStream = self._getTxidStream(address, options); txidStream.pipe(txIdTransformStream); - - q.drain(next); - + } ], callback); From 3fbcbbe7bcc2b793bd435257cdb153563bd55ded Mon Sep 17 00:00:00 2001 From: sairajzero Date: Sat, 28 Jan 2023 20:18:23 +0530 Subject: [PATCH 07/12] Fixed: APIs giving incorrect data - Fixed: addr API giving decimals in satoshi values - Fixed: Incorrect balance, totalSent, totalReceived values returned in API calls (issue: duplication) - Fixed: incorrect totalCount value in addr API and duplication of tx list --- lib/services/address/index.js | 151 ++++++++++++++++++++++++++++++---- 1 file changed, 133 insertions(+), 18 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 8028da06..ce16a1f0 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -180,7 +180,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream options.from = options.from || 0; options.to = options.to || 0xffffffff; - if(!callback){ //if only 3 args, then streamer is callback + if(typeof callback !== 'function'){ //if only 3 args, then streamer is callback callback = streamer; streamer = () => null; //NULL fn } @@ -202,13 +202,15 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream self._streamAddressSummary(address, options, function(err, tx){ - results.totalCount++; - if(err) return log.error(err); - if(!options.txNotNeeded && results.items.length < MAX_TX_QUERY_LIMIT) - results.items.push(tx); + if(!options.txNotNeeded) { + let count = self._getOccurrenceCount(tx, address); + results.totalCount += 1 / count; //fix for duplication + if(results.items.length < MAX_TX_QUERY_LIMIT && !results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array + results.items.push(tx); + } streamer(null, tx); @@ -220,7 +222,10 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream return callback(err); } - //TODO: sorting of tx list (results.items) + //sort items in desc block-height, then asc txid (if same height) + results.items.sort((a, b) => b.__height - a.__height || a.txid().localeCompare(b.txid())); + results.totalCount = parseInt(results.totalCount.toFixed()); + callback(null, results); }) @@ -285,7 +290,7 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer options.queryMempool = true; } - if(!callback){ //if only 3 args, then streamer is callback + if(typeof callback !== 'function'){ //if only 3 args, then streamer is callback callback = streamer; streamer = () => null; //NULL fn } @@ -320,6 +325,11 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer return callback(err); } + result.balanceSat = parseInt(result.balanceSat.toFixed()); + result.totalReceivedSat = parseInt(result.totalReceivedSat.toFixed()); + result.totalSentSat = parseInt(result.totalSentSat.toFixed()); + result.txApperances = parseInt(result.txApperances.toFixed()); + result.balance = Unit.fromSatoshis(result.balanceSat).toBTC(); result.totalReceived = Unit.fromSatoshis(result.totalReceivedSat).toBTC(); result.totalSent = Unit.fromSatoshis(result.totalSentSat).toBTC(); @@ -373,10 +383,13 @@ AddressService.prototype._setInputResults = function(tx, address, result) { } }; -AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, result, options){ - +AddressService.prototype._getAddressSummaryResult = function(txs, address, result, options) { + var self = this; + for(var i = 0; i < txs.length; i++) { + var tx = txs[i]; + self._setOutputResults(tx, address, result); self._setInputResults(tx, address, result); @@ -387,20 +400,122 @@ AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, result.transactions.push(tx.txid()); } -} - -AddressService.prototype._getAddressSummaryResult = function(txs, address, result, options) { - - var self = this; - - for(var i = 0; i < txs.length; i++) { - var tx = txs[i]; - self._aggregateAddressSummaryResult(tx, address, result, options); } return result; }; +AddressService.prototype._getOccurrenceCount = function(tx, address) { + let count = 0; + + for(var i = 0; i < tx.inputs.length; i++) { + + var input = tx.inputs[i]; + + if(utils.getAddress(input, this._network) === address) + count++; + + } + + for(var j = 0; j < tx.outputs.length; j++) { + + var output = tx.outputs[j]; + + if(utils.getAddress(output, this._network) === address) + count++; + + } + + return count; + +} + +AddressService.prototype._getOutputResults = function(tx, address) { + + let result = { value: 0, count:0 }; + + for(var j = 0; j < tx.outputs.length; j++) { + + var output = tx.outputs[j]; + + if (utils.getAddress(output, this._network) !== address) { + continue; + } + + result.value += output.value; + result.count++; + + } + + return result; + +}; + +AddressService.prototype._getInputResults = function(tx, address) { + + let result = { value: 0, count:0 }; + + for(var i = 0; i < tx.inputs.length; i++) { + + var input = tx.inputs[i]; + + if (utils.getAddress(input, this._network) !== address) { + continue; + } + + result.value += tx.__inputValues[i]; + result.count++; + + } + + return result; + +}; + +AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, result, options){ + + var self = this; + + let output = self._getOutputResults(tx, address); + let input = self._getInputResults(tx, address); + + //Since tx with multiple (x) input/output occurances of address will invoke this fn x time(s), (as we are not storing txid and hence cannot check for duplications) + //we divide the values by x and aggregate it to result. + //eg. tx with 1 input, 1 output => x=1+1=2.... input_val = 2, output_val = 1. + //the values will be aggregated 2 times, hence, we divide values by x i.e, 2. + //now agg_input_val = 2/2 =1, agg_output_val = 1/2 =0.5 + //the fn ll be called x times, hence the total result will be, result=agg*x: input(1*2=2), output(0.5*2=1) + + let total_count = input.count + output.count; + let div_input_val = input.value / total_count, + div_output_val = output.value / total_count; + + //aggregate the result + result.txApperances += 1/total_count; + + result.totalReceivedSat += div_output_val; + result.balanceSat += div_output_val; + + result.totalSentSat += div_input_val; + result.balanceSat -= div_input_val; + + if(!tx.confirmations){ + result.unconfirmedTxApperances += 1/total_count; + result.unconfirmedBalanceSat += div_output_val; + result.unconfirmedBalanceSat -= div_input_val; + } + + if (!options.noTxList) { + if (!result.transactions) { + result.transactions = []; + } + let txid = tx.txid(); + if(!result.transactions.includes(txid) && result.transactions.length < MAX_TX_QUERY_LIMIT) //push txid only if its not in the array (list limit not maxed out) + result.transactions.push(txid); + } + +} + AddressService.prototype.getAddressUnspentOutputs = function(address, options, callback) { var self = this; From d9579853ad56cbc52619a5e70dbe6c6151eb4717 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Thu, 2 Feb 2023 17:25:21 +0530 Subject: [PATCH 08/12] Improve handling of duplicate tx query - temporarily store txids to ignore duplication - removed the queue pause() and resume() in _streamAddressSummary --- lib/services/address/index.js | 92 ++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index ce16a1f0..08de11c6 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -206,8 +206,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream return log.error(err); if(!options.txNotNeeded) { - let count = self._getOccurrenceCount(tx, address); - results.totalCount += 1 / count; //fix for duplication + results.totalCount++; if(results.items.length < MAX_TX_QUERY_LIMIT && !results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array results.items.push(tx); } @@ -432,43 +431,35 @@ AddressService.prototype._getOccurrenceCount = function(tx, address) { AddressService.prototype._getOutputResults = function(tx, address) { - let result = { value: 0, count:0 }; + let value = 0; for(var j = 0; j < tx.outputs.length; j++) { var output = tx.outputs[j]; - if (utils.getAddress(output, this._network) !== address) { - continue; - } - - result.value += output.value; - result.count++; + if (utils.getAddress(output, this._network) === address) + value += output.value; } - return result; + return value; }; AddressService.prototype._getInputResults = function(tx, address) { - let result = { value: 0, count:0 }; + let value = 0; for(var i = 0; i < tx.inputs.length; i++) { var input = tx.inputs[i]; - if (utils.getAddress(input, this._network) !== address) { - continue; - } - - result.value += tx.__inputValues[i]; - result.count++; + if (utils.getAddress(input, this._network) === address) + value += tx.__inputValues[i]; } - return result; + return value; }; @@ -476,33 +467,22 @@ AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, var self = this; - let output = self._getOutputResults(tx, address); - let input = self._getInputResults(tx, address); - - //Since tx with multiple (x) input/output occurances of address will invoke this fn x time(s), (as we are not storing txid and hence cannot check for duplications) - //we divide the values by x and aggregate it to result. - //eg. tx with 1 input, 1 output => x=1+1=2.... input_val = 2, output_val = 1. - //the values will be aggregated 2 times, hence, we divide values by x i.e, 2. - //now agg_input_val = 2/2 =1, agg_output_val = 1/2 =0.5 - //the fn ll be called x times, hence the total result will be, result=agg*x: input(1*2=2), output(0.5*2=1) - - let total_count = input.count + output.count; - let div_input_val = input.value / total_count, - div_output_val = output.value / total_count; + let output_val = self._getOutputResults(tx, address); + let input_val = self._getInputResults(tx, address); //aggregate the result - result.txApperances += 1/total_count; + result.txApperances++; - result.totalReceivedSat += div_output_val; - result.balanceSat += div_output_val; + result.totalReceivedSat += output_val; + result.balanceSat += output_val; - result.totalSentSat += div_input_val; - result.balanceSat -= div_input_val; + result.totalSentSat += input_val; + result.balanceSat -= input_val; if(!tx.confirmations){ - result.unconfirmedTxApperances += 1/total_count; - result.unconfirmedBalanceSat += div_output_val; - result.unconfirmedBalanceSat -= div_input_val; + result.unconfirmedTxApperances++; + result.unconfirmedBalanceSat += output_val; + result.unconfirmedBalanceSat -= input_val; } if (!options.noTxList) { @@ -829,9 +809,22 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre } //declare the queue to process tx data + var tmpTxList = {}; //store processed txid temporarily to ignore duplication var q = async.queue(function(id, cb) { + //duplication finding + if(id.txid in tmpTxList){ + + tmpTxList[id.txid][0]++; + + if(tmpTxList[id.txid][1] !== null && tmpTxList[id.txid][0] >= tmpTxList[id.txid][1]) //all duplications are found for this txid + delete tmpTxList[id.txid]; + + return cb(); + + } else tmpTxList[id.txid] = [1, null]; + if (id.height === 0xffffffff) { return self._mempool.getMempoolTransaction(id.txid, function(err, tx) { @@ -849,9 +842,20 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre }, 4); - q.pause(); //pause and wait until queue is set + //q.pause(); //pause and wait until queue is set (not needed) - function process_chunk(err, tx){ + function chunkCallback(err, tx){ + + if(!err && !tx) //no error or tx data (duplicate calls will have empty tx value) + return; + + if(tx){ + let txid = tx.txid(); + tmpTxList[txid][1] = self._getOccurrenceCount(tx, address); + + if(tmpTxList[txid][0] >= tmpTxList[txid][1]) //all duplications are found for this txid + delete tmpTxList[txid]; + } streamer(err, tx); @@ -882,7 +886,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre return next(); } - mempoolTxids.map(id => q.push(id, process_chunk)); + mempoolTxids.map(id => q.push(id, chunkCallback)); next(); }, // stream the rest of the confirmed txids out of the address index @@ -902,12 +906,12 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre txIdTransformStream.on('end', function() { q.drain = next; - q.resume(); + //q.resume(); //(not needed if not paused above) }); txIdTransformStream._transform = function(chunk, enc, cb) { var txInfo = self._encoding.decodeAddressIndexKey(chunk); - q.push({ txid: txInfo.txid, height: txInfo.height }, process_chunk); + q.push({ txid: txInfo.txid, height: txInfo.height }, chunkCallback); cb(); }; From 3a75002efc62c6590dc67e988a3eefffd44340c2 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Sat, 4 Feb 2023 20:29:41 +0530 Subject: [PATCH 09/12] API Query options - Fixed: option `start` to query from blockheight - Added option `after`: pass this option in API to get list after the given txid Note: If both `start` and `after` are given, then greater height will be used Note: invalid or unconfirmed txid cannot be used in `after` option and will be ignored --- lib/services/address/index.js | 48 ++++++++++++++++++++++++++--------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 08de11c6..0cf0d0fb 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -658,10 +658,19 @@ AddressService.prototype.stop = function(callback) { AddressService.prototype._getTxidStream = function(address, options) { - var start = this._encoding.encodeAddressIndexKey(address); + var start; + if(options.after) { + start = this._encoding.encodeAddressIndexKey(address, options.start, options.after, 0xffffffff, 1, 0xffffffff); //0xffffffff is for getting after the txid + } else { + start = this._encoding.encodeAddressIndexKey(address, options.start); + } + + var endHeightBuf = new Buffer(4); + endHeightBuf.writeUInt32BE(options.end); + var end = Buffer.concat([ start.slice(0, address.length + 4), - options.endHeightBuf, + endHeightBuf, new Buffer(new Array(83).join('f'), 'hex') ]); @@ -718,9 +727,6 @@ AddressService.prototype._getAddressTxidHistory = function(address, options, cal var results = []; - options.endHeightBuf = new Buffer(4); - options.endHeightBuf.writeUInt32BE(options.end); - if (_.isUndefined(options.queryMempool)) { options.queryMempool = true; } @@ -801,13 +807,6 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre options.queryMempool = true; } - options.endHeightBuf = new Buffer(4); - options.endHeightBuf.writeUInt32BE(options.end); - - if (_.isUndefined(options.queryMempool)) { - options.queryMempool = true; - } - //declare the queue to process tx data var tmpTxList = {}; //store processed txid temporarily to ignore duplication @@ -889,6 +888,31 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre mempoolTxids.map(id => q.push(id, chunkCallback)); next(); }, + + function(next){ + + if(_.isUndefined(options.after)) { + return next(); + } + + self._transaction.getTransaction(id.txid, options, function(err, tx) { + + if(tx && tx.confirmations && tx.height >= options.start) { + + options.start = tx.height; + + } else { + + delete options.after; + + } + + next(); + + }); + + }, + // stream the rest of the confirmed txids out of the address index function(next) { From 774d830fffbaa9bf259a9b0c4c882ca1f8529f00 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Sun, 5 Feb 2023 03:12:04 +0530 Subject: [PATCH 10/12] Improvements to API query options - Stop request-stream process when stop-flag is on - Changed: order of reading db to forward (so that continuity will be preserved with `after` option and ws api calls). And changes required for the same. - Deprecating options (from and to) in calls that are not supported - Added: Temporary support for from and to option in getAddressHistory --- lib/services/address/index.js | 107 ++++++++++++++++++++++------------ 1 file changed, 69 insertions(+), 38 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 0cf0d0fb..81bb9712 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -177,8 +177,8 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream var self = this; options = options || {}; - options.from = options.from || 0; - options.to = options.to || 0xffffffff; + //options.from = options.from || 0; //Deprecated, use `after` option + //options.to = options.to || 0xffffffff; //Deprecated, use `after` option if(typeof callback !== 'function'){ //if only 3 args, then streamer is callback callback = streamer; @@ -207,8 +207,10 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream if(!options.txNotNeeded) { results.totalCount++; + if(results.items.length < MAX_TX_QUERY_LIMIT && !results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array - results.items.push(tx); + results.items.unshift(tx); //using unshift, so that recent tx (low) are at front + } streamer(null, tx); @@ -225,6 +227,13 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream results.items.sort((a, b) => b.__height - a.__height || a.txid().localeCompare(b.txid())); results.totalCount = parseInt(results.totalCount.toFixed()); + //Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use) + if( !_.isUndefined(options.from) || !_.isUndefined(options.to)) { + options.from = options.from || 0; + options.to = options.to || 0xffffffff; //Max value of to will actually be MAX_TX_QUERY_LIMIT + results.items = results.items.slice(options.from, options.to); + } + callback(null, results); }) @@ -258,7 +267,7 @@ AddressService.prototype.__getAddressSummary = function(address, options, callba txApperances: 0, }; - self.getAddressHistory(address, options, function(err, results) { + self.__getAddressHistory(address, options, function(err, results) { //old fn if (err) { return callback(err); @@ -281,8 +290,8 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer var self = this; options = options || {}; - options.from = options.from || 0; - options.to = options.to || 0xffffffff; + //options.from = options.from || 0; //Deprecated + //options.to = options.to || 0xffffffff; //Deprecated options.txNotNeeded = true; //no need to store tx details in result if (_.isUndefined(options.queryMempool)) { @@ -328,6 +337,8 @@ AddressService.prototype.getAddressSummary = function(address, options, streamer result.totalReceivedSat = parseInt(result.totalReceivedSat.toFixed()); result.totalSentSat = parseInt(result.totalSentSat.toFixed()); result.txApperances = parseInt(result.txApperances.toFixed()); + result.unconfirmedBalanceSat = parseInt(result.unconfirmedBalanceSat.toFixed()); + result.unconfirmedTxApperances = parseInt(result.unconfirmedTxApperances.toFixed()); result.balance = Unit.fromSatoshis(result.balanceSat).toBTC(); result.totalReceived = Unit.fromSatoshis(result.totalReceivedSat).toBTC(); @@ -463,7 +474,7 @@ AddressService.prototype._getInputResults = function(tx, address) { }; -AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, result, options){ +AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, result, options) { var self = this; @@ -486,12 +497,21 @@ AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, } if (!options.noTxList) { + if (!result.transactions) { result.transactions = []; } + let txid = tx.txid(); - if(!result.transactions.includes(txid) && result.transactions.length < MAX_TX_QUERY_LIMIT) //push txid only if its not in the array (list limit not maxed out) - result.transactions.push(txid); + if(!result.transactions.includes(txid)) { //push txid only if its not in the array + + result.transactions.unshift(txid); //using unshift, so that recent tx (low confirmation) are at front + + if(result.transactions.length > MAX_TX_QUERY_LIMIT) + result.transactions.pop(); //pop the oldest tx in list (when list limit is maxed out) + + } + } } @@ -676,9 +696,10 @@ AddressService.prototype._getTxidStream = function(address, options) { var criteria = { gte: start, - lte: end, - reverse: true // txids stream from low confirmations to high confirmations + lte: end + //reverse: true // txids stream from low confirmations to high confirmations }; + //NOTE: commentted reverse to keep the order in asc when reading to preserve continuity when using `after` option // txid stream var txidStream = this._db.createKeyStream(criteria); @@ -690,6 +711,7 @@ AddressService.prototype._getTxidStream = function(address, options) { return txidStream; }; +//(used by old fn) AddressService.prototype._getAddressTxHistory = function(options, callback) { var self = this; @@ -718,6 +740,7 @@ AddressService.prototype._getAddressTxHistory = function(options, callback) { }; +//(used by old fn) AddressService.prototype._getAddressTxidHistory = function(address, options, callback) { var self = this; @@ -781,7 +804,9 @@ AddressService.prototype._getAddressTxidHistory = function(address, options, cal return; } - results.push({ txid: txInfo.txid, height: txInfo.height }); + if(!results.some(r => r.txid == txInfo.txid)) //add txid to array only if its not already there + results.push({ txid: txInfo.txid, height: txInfo.height }); + callback(); }; @@ -800,8 +825,8 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre options.start = options.start || 0; options.end = options.end || 0xffffffff; - options.from = options.from || 0; //TODO: check from/to options are working or not - options.to = options.to || 0xffffffff; + //options.from = options.from || 0; //Deprecated, use `after` option + //options.to = options.to || 0xffffffff; //Deprecated, use `after` option if (_.isUndefined(options.queryMempool)) { options.queryMempool = true; @@ -858,7 +883,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre streamer(err, tx); - if(err){ + if(err || options.flag_stop){ q.kill(); return callback(); @@ -868,34 +893,14 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre async.waterfall([ - // query the mempool for relevant txs for this address - function(next) { - - if (!options.queryMempool) { - return next(null, []); - } - - self._mempool.getTxidsByAddress(address, 'both', next); - }, - - // add the meta data such as input values, etc. - function(mempoolTxids, next) { - - if (mempoolTxids.length <= 0) { - return next(); - } - - mempoolTxids.map(id => q.push(id, chunkCallback)); - next(); - }, - + //Find start height if `after` option is passed function(next){ if(_.isUndefined(options.after)) { return next(); } - self._transaction.getTransaction(id.txid, options, function(err, tx) { + self._transaction.getTransaction(options.after, options, function(err, tx) { if(tx && tx.confirmations && tx.height >= options.start) { @@ -934,6 +939,10 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre }); txIdTransformStream._transform = function(chunk, enc, cb) { + + if(options.flag_stop)//stop data query + return txIdTransformStream.unpipe(); + var txInfo = self._encoding.decodeAddressIndexKey(chunk); q.push({ txid: txInfo.txid, height: txInfo.height }, chunkCallback); @@ -943,7 +952,29 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre var txidStream = self._getTxidStream(address, options); txidStream.pipe(txIdTransformStream); - } + }, + + // query the mempool for relevant txs for this address + function(next) { + + if (!options.queryMempool) { + return next(null, []); + } + + self._mempool.getTxidsByAddress(address, 'both', next); + }, + + // add the meta data such as input values, etc. + function(mempoolTxids, next) { + + if (mempoolTxids.length <= 0) { + return next(); + } + + mempoolTxids.map(id => q.push(id, chunkCallback)); + next(); + }, + ], callback); } From 7409dbb77d50c8e80a98abcb9ca1365bcd0faf5f Mon Sep 17 00:00:00 2001 From: sairajzero Date: Sun, 5 Feb 2023 19:03:11 +0530 Subject: [PATCH 11/12] Bug fixes - Fixed: incorrect data returned via `from` and `to` option - Fixed: Missing data due to unordered items in getAddressHistory - Fixed: callback invoked multiple items in _streamAddressSummary due to queue parallel limit - Fixed: Queue drain being invoked before mempool txs are pushed into queue --- lib/services/address/index.js | 36 +++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 81bb9712..88c627a4 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -189,6 +189,13 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream options.queryMempool = true; } + //Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use) + if( !_.isUndefined(options.from) || !_.isUndefined(options.to)) { + options.old_support = true; + options.from = options.from || 0; + options.to = options.to || 0xffffffff; //Max value of to will actually be MAX_TX_QUERY_LIMIT + } + if (_.isString(addresses)) { addresses = [addresses]; } @@ -208,8 +215,15 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream if(!options.txNotNeeded) { results.totalCount++; - if(results.items.length < MAX_TX_QUERY_LIMIT && !results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array + if(!results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array results.items.unshift(tx); //using unshift, so that recent tx (low) are at front + + if(results.items.length > MAX_TX_QUERY_LIMIT) { //remove items from array when overflown + results.items.sort((a, b) => b.__height - a.__height || a.txid().localeCompare(b.txid())); + let del_count = options.old_support ? results.items.length : results.items.length - MAX_TX_QUERY_LIMIT; + let start_index = options.old_support ? MAX_TX_QUERY_LIMIT : 0; + results.items.splice(start_index, del_count); + } } @@ -228,9 +242,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream results.totalCount = parseInt(results.totalCount.toFixed()); //Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use) - if( !_.isUndefined(options.from) || !_.isUndefined(options.to)) { - options.from = options.from || 0; - options.to = options.to || 0xffffffff; //Max value of to will actually be MAX_TX_QUERY_LIMIT + if(options.old_support) { results.items = results.items.slice(options.from, options.to); } @@ -870,7 +882,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre function chunkCallback(err, tx){ - if(!err && !tx) //no error or tx data (duplicate calls will have empty tx value) + if(q.killed || (!err && !tx)) //no error or tx data (duplicate calls will have empty tx value) return; if(tx){ @@ -883,8 +895,10 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre streamer(err, tx); - if(err || options.flag_stop){ + if((err || options.flag_stop) && !q.killed){ + q.kill(); + q.killed = true; return callback(); } @@ -934,8 +948,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre }); txIdTransformStream.on('end', function() { - q.drain = next; - //q.resume(); //(not needed if not paused above) + next(); }); txIdTransformStream._transform = function(chunk, enc, cb) { @@ -975,6 +988,13 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre next(); }, + //wait for queue to complete + function(next) { + + q.drain = () => next(); + + } + ], callback); } From bca4fe4f979703198f7e6767841bf9148f347b54 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Sun, 5 Feb 2023 23:01:06 +0530 Subject: [PATCH 12/12] Bug fix - Fixed: data inconsistency and continuity lost in chain querying of tx details - Fixed: Not getting response when query has no tx. (ie, either address has no tx, or using the most recent tx as the key in `after` option) --- lib/services/address/index.js | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 88c627a4..f58ccb6f 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -219,7 +219,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream results.items.unshift(tx); //using unshift, so that recent tx (low) are at front if(results.items.length > MAX_TX_QUERY_LIMIT) { //remove items from array when overflown - results.items.sort((a, b) => b.__height - a.__height || a.txid().localeCompare(b.txid())); + results.items.sort((a, b) => b.__height - a.__height || b.txid().localeCompare(a.txid())); let del_count = options.old_support ? results.items.length : results.items.length - MAX_TX_QUERY_LIMIT; let start_index = options.old_support ? MAX_TX_QUERY_LIMIT : 0; results.items.splice(start_index, del_count); @@ -238,7 +238,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream } //sort items in desc block-height, then asc txid (if same height) - results.items.sort((a, b) => b.__height - a.__height || a.txid().localeCompare(b.txid())); + results.items.sort((a, b) => b.__height - a.__height || b.txid().localeCompare(a.txid())); results.totalCount = parseInt(results.totalCount.toFixed()); //Quick support for `from` and `to` options (DEPRECATED! Not recommeded to use) @@ -991,7 +991,11 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre //wait for queue to complete function(next) { - q.drain = () => next(); + if(!q.started) //No tx in query + return next(); + + else + q.drain = () => next(); }