From d9579853ad56cbc52619a5e70dbe6c6151eb4717 Mon Sep 17 00:00:00 2001 From: sairajzero Date: Thu, 2 Feb 2023 17:25:21 +0530 Subject: [PATCH] Improve handling of duplicate tx query - temporarily store txids to ignore duplication - removed the queue pause() and resume() in _streamAddressSummary --- lib/services/address/index.js | 92 ++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index ce16a1f0..08de11c6 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -206,8 +206,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream return log.error(err); if(!options.txNotNeeded) { - let count = self._getOccurrenceCount(tx, address); - results.totalCount += 1 / count; //fix for duplication + results.totalCount++; if(results.items.length < MAX_TX_QUERY_LIMIT && !results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array results.items.push(tx); } @@ -432,43 +431,35 @@ AddressService.prototype._getOccurrenceCount = function(tx, address) { AddressService.prototype._getOutputResults = function(tx, address) { - let result = { value: 0, count:0 }; + let value = 0; for(var j = 0; j < tx.outputs.length; j++) { var output = tx.outputs[j]; - if (utils.getAddress(output, this._network) !== address) { - continue; - } - - result.value += output.value; - result.count++; + if (utils.getAddress(output, this._network) === address) + value += output.value; } - return result; + return value; }; AddressService.prototype._getInputResults = function(tx, address) { - let result = { value: 0, count:0 }; + let value = 0; for(var i = 0; i < tx.inputs.length; i++) { var input = tx.inputs[i]; - if (utils.getAddress(input, this._network) !== address) { - continue; - } - - result.value += tx.__inputValues[i]; - result.count++; + if (utils.getAddress(input, this._network) === address) + value += tx.__inputValues[i]; } - return result; + return value; }; @@ -476,33 +467,22 @@ AddressService.prototype._aggregateAddressSummaryResult = function (tx, address, var self = this; - let output = self._getOutputResults(tx, address); - let input = self._getInputResults(tx, address); - - //Since tx with multiple (x) input/output occurances of address will invoke this fn x time(s), (as we are not storing txid and hence cannot check for duplications) - //we divide the values by x and aggregate it to result. - //eg. tx with 1 input, 1 output => x=1+1=2.... input_val = 2, output_val = 1. - //the values will be aggregated 2 times, hence, we divide values by x i.e, 2. - //now agg_input_val = 2/2 =1, agg_output_val = 1/2 =0.5 - //the fn ll be called x times, hence the total result will be, result=agg*x: input(1*2=2), output(0.5*2=1) - - let total_count = input.count + output.count; - let div_input_val = input.value / total_count, - div_output_val = output.value / total_count; + let output_val = self._getOutputResults(tx, address); + let input_val = self._getInputResults(tx, address); //aggregate the result - result.txApperances += 1/total_count; + result.txApperances++; - result.totalReceivedSat += div_output_val; - result.balanceSat += div_output_val; + result.totalReceivedSat += output_val; + result.balanceSat += output_val; - result.totalSentSat += div_input_val; - result.balanceSat -= div_input_val; + result.totalSentSat += input_val; + result.balanceSat -= input_val; if(!tx.confirmations){ - result.unconfirmedTxApperances += 1/total_count; - result.unconfirmedBalanceSat += div_output_val; - result.unconfirmedBalanceSat -= div_input_val; + result.unconfirmedTxApperances++; + result.unconfirmedBalanceSat += output_val; + result.unconfirmedBalanceSat -= input_val; } if (!options.noTxList) { @@ -829,9 +809,22 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre } //declare the queue to process tx data + var tmpTxList = {}; //store processed txid temporarily to ignore duplication var q = async.queue(function(id, cb) { + //duplication finding + if(id.txid in tmpTxList){ + + tmpTxList[id.txid][0]++; + + if(tmpTxList[id.txid][1] !== null && tmpTxList[id.txid][0] >= tmpTxList[id.txid][1]) //all duplications are found for this txid + delete tmpTxList[id.txid]; + + return cb(); + + } else tmpTxList[id.txid] = [1, null]; + if (id.height === 0xffffffff) { return self._mempool.getMempoolTransaction(id.txid, function(err, tx) { @@ -849,9 +842,20 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre }, 4); - q.pause(); //pause and wait until queue is set + //q.pause(); //pause and wait until queue is set (not needed) - function process_chunk(err, tx){ + function chunkCallback(err, tx){ + + if(!err && !tx) //no error or tx data (duplicate calls will have empty tx value) + return; + + if(tx){ + let txid = tx.txid(); + tmpTxList[txid][1] = self._getOccurrenceCount(tx, address); + + if(tmpTxList[txid][0] >= tmpTxList[txid][1]) //all duplications are found for this txid + delete tmpTxList[txid]; + } streamer(err, tx); @@ -882,7 +886,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre return next(); } - mempoolTxids.map(id => q.push(id, process_chunk)); + mempoolTxids.map(id => q.push(id, chunkCallback)); next(); }, // stream the rest of the confirmed txids out of the address index @@ -902,12 +906,12 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre txIdTransformStream.on('end', function() { q.drain = next; - q.resume(); + //q.resume(); //(not needed if not paused above) }); txIdTransformStream._transform = function(chunk, enc, cb) { var txInfo = self._encoding.decodeAddressIndexKey(chunk); - q.push({ txid: txInfo.txid, height: txInfo.height }, process_chunk); + q.push({ txid: txInfo.txid, height: txInfo.height }, chunkCallback); cb(); };