Improve handling of duplicate tx query

- temporarily store txids to ignore duplication
- removed the queue pause() and resume() in _streamAddressSummary
This commit is contained in:
sairajzero 2023-02-02 17:25:21 +05:30
parent 3fbcbbe7bc
commit d9579853ad

View File

@ -206,8 +206,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream
return log.error(err);
if(!options.txNotNeeded) {
let count = self._getOccurrenceCount(tx, address);
results.totalCount += 1 / count; //fix for duplication
results.totalCount++;
if(results.items.length < MAX_TX_QUERY_LIMIT && !results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array
results.items.push(tx);
}
@ -432,43 +431,35 @@ AddressService.prototype._getOccurrenceCount = function(tx, address) {
AddressService.prototype._getOutputResults = function(tx, address) {
let result = { value: 0, count:0 };
let value = 0;
for(var j = 0; j < tx.outputs.length; j++) {
var output = tx.outputs[j];
if (utils.getAddress(output, this._network) !== address) {
continue;
}
result.value += output.value;
result.count++;
if (utils.getAddress(output, this._network) === address)
value += output.value;
}
return result;
return value;
};
AddressService.prototype._getInputResults = function(tx, address) {
let result = { value: 0, count:0 };
let value = 0;
for(var i = 0; i < tx.inputs.length; i++) {
var input = tx.inputs[i];
if (utils.getAddress(input, this._network) !== address) {
continue;
}
result.value += tx.__inputValues[i];
result.count++;
if (utils.getAddress(input, this._network) === address)
value += tx.__inputValues[i];
}
return result;
return value;
};
@ -476,33 +467,22 @@ AddressService.prototype._aggregateAddressSummaryResult = function (tx, address,
var self = this;
let output = self._getOutputResults(tx, address);
let input = self._getInputResults(tx, address);
//Since tx with multiple (x) input/output occurances of address will invoke this fn x time(s), (as we are not storing txid and hence cannot check for duplications)
//we divide the values by x and aggregate it to result.
//eg. tx with 1 input, 1 output => x=1+1=2.... input_val = 2, output_val = 1.
//the values will be aggregated 2 times, hence, we divide values by x i.e, 2.
//now agg_input_val = 2/2 =1, agg_output_val = 1/2 =0.5
//the fn ll be called x times, hence the total result will be, result=agg*x: input(1*2=2), output(0.5*2=1)
let total_count = input.count + output.count;
let div_input_val = input.value / total_count,
div_output_val = output.value / total_count;
let output_val = self._getOutputResults(tx, address);
let input_val = self._getInputResults(tx, address);
//aggregate the result
result.txApperances += 1/total_count;
result.txApperances++;
result.totalReceivedSat += div_output_val;
result.balanceSat += div_output_val;
result.totalReceivedSat += output_val;
result.balanceSat += output_val;
result.totalSentSat += div_input_val;
result.balanceSat -= div_input_val;
result.totalSentSat += input_val;
result.balanceSat -= input_val;
if(!tx.confirmations){
result.unconfirmedTxApperances += 1/total_count;
result.unconfirmedBalanceSat += div_output_val;
result.unconfirmedBalanceSat -= div_input_val;
result.unconfirmedTxApperances++;
result.unconfirmedBalanceSat += output_val;
result.unconfirmedBalanceSat -= input_val;
}
if (!options.noTxList) {
@ -829,9 +809,22 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
}
//declare the queue to process tx data
var tmpTxList = {}; //store processed txid temporarily to ignore duplication
var q = async.queue(function(id, cb) {
//duplication finding
if(id.txid in tmpTxList){
tmpTxList[id.txid][0]++;
if(tmpTxList[id.txid][1] !== null && tmpTxList[id.txid][0] >= tmpTxList[id.txid][1]) //all duplications are found for this txid
delete tmpTxList[id.txid];
return cb();
} else tmpTxList[id.txid] = [1, null];
if (id.height === 0xffffffff) {
return self._mempool.getMempoolTransaction(id.txid, function(err, tx) {
@ -849,9 +842,20 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
}, 4);
q.pause(); //pause and wait until queue is set
//q.pause(); //pause and wait until queue is set (not needed)
function process_chunk(err, tx){
function chunkCallback(err, tx){
if(!err && !tx) //no error or tx data (duplicate calls will have empty tx value)
return;
if(tx){
let txid = tx.txid();
tmpTxList[txid][1] = self._getOccurrenceCount(tx, address);
if(tmpTxList[txid][0] >= tmpTxList[txid][1]) //all duplications are found for this txid
delete tmpTxList[txid];
}
streamer(err, tx);
@ -882,7 +886,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
return next();
}
mempoolTxids.map(id => q.push(id, process_chunk));
mempoolTxids.map(id => q.push(id, chunkCallback));
next();
},
// stream the rest of the confirmed txids out of the address index
@ -902,12 +906,12 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
txIdTransformStream.on('end', function() {
q.drain = next;
q.resume();
//q.resume(); //(not needed if not paused above)
});
txIdTransformStream._transform = function(chunk, enc, cb) {
var txInfo = self._encoding.decodeAddressIndexKey(chunk);
q.push({ txid: txInfo.txid, height: txInfo.height }, process_chunk);
q.push({ txid: txInfo.txid, height: txInfo.height }, chunkCallback);
cb();
};