improve handling of duplicate tx query

This commit is contained in:
sairajzero 2023-02-02 18:18:07 +05:30
parent c32a7cb57a
commit 8729078765

View File

@ -206,8 +206,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream
return log.error(err); return log.error(err);
if(!options.txNotNeeded) { if(!options.txNotNeeded) {
let count = self._getOccurrenceCount(tx, address); results.totalCount++;
results.totalCount += 1 / count; //fix for duplication
if(results.items.length < MAX_TX_QUERY_LIMIT && !results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array if(results.items.length < MAX_TX_QUERY_LIMIT && !results.items.some(x => x.txid() === tx.txid())) //push only if tx not already in array
results.items.push(tx); results.items.push(tx);
} }
@ -226,7 +225,6 @@ AddressService.prototype.getAddressHistory = function(addresses, options, stream
results.items.sort((a, b) => b.__height - a.__height || a.txid().localeCompare(b.txid())); results.items.sort((a, b) => b.__height - a.__height || a.txid().localeCompare(b.txid()));
results.totalCount = parseInt(results.totalCount.toFixed()); results.totalCount = parseInt(results.totalCount.toFixed());
//TODO: sorting of tx list (results.items)
callback(null, results); callback(null, results);
}) })
@ -433,43 +431,35 @@ AddressService.prototype._getOccurrenceCount = function(tx, address) {
AddressService.prototype._getOutputResults = function(tx, address) { AddressService.prototype._getOutputResults = function(tx, address) {
let result = { value: 0, count:0 }; let value = 0;
for(var j = 0; j < tx.outputs.length; j++) { for(var j = 0; j < tx.outputs.length; j++) {
var output = tx.outputs[j]; var output = tx.outputs[j];
if (utils.getAddress(output, this._network) !== address) { if (utils.getAddress(output, this._network) === address)
continue; value += output.value;
}
result.value += output.value;
result.count++;
} }
return result; return value;
}; };
AddressService.prototype._getInputResults = function(tx, address) { AddressService.prototype._getInputResults = function(tx, address) {
let result = { value: 0, count:0 }; let value = 0;
for(var i = 0; i < tx.inputs.length; i++) { for(var i = 0; i < tx.inputs.length; i++) {
var input = tx.inputs[i]; var input = tx.inputs[i];
if (utils.getAddress(input, this._network) !== address) { if (utils.getAddress(input, this._network) === address)
continue; value += tx.__inputValues[i];
}
result.value += tx.__inputValues[i];
result.count++;
} }
return result; return value;
}; };
@ -477,33 +467,22 @@ AddressService.prototype._aggregateAddressSummaryResult = function (tx, address,
var self = this; var self = this;
let output = self._getOutputResults(tx, address); let output_val = self._getOutputResults(tx, address);
let input = self._getInputResults(tx, address); let input_val = self._getInputResults(tx, address);
//Since tx with multiple (x) input/output occurances of address will invoke this fn x time(s), (as we are not storing txid and hence cannot check for duplications)
//we divide the values by x and aggregate it to result.
//eg. tx with 1 input, 1 output => x=1+1=2.... input_val = 2, output_val = 1.
//the values will be aggregated 2 times, hence, we divide values by x i.e, 2.
//now agg_input_val = 2/2 =1, agg_output_val = 1/2 =0.5
//the fn ll be called x times, hence the total result will be, result=agg*x: input(1*2=2), output(0.5*2=1)
let total_count = input.count + output.count;
let div_input_val = input.value / total_count,
div_output_val = output.value / total_count;
//aggregate the result //aggregate the result
result.txApperances += 1/total_count; result.txApperances++;
result.totalReceivedSat += div_output_val; result.totalReceivedSat += output_val;
result.balanceSat += div_output_val; result.balanceSat += output_val;
result.totalSentSat += div_input_val; result.totalSentSat += input_val;
result.balanceSat -= div_input_val; result.balanceSat -= input_val;
if(!tx.confirmations){ if(!tx.confirmations){
result.unconfirmedTxApperances += 1/total_count; result.unconfirmedTxApperances++;
result.unconfirmedBalanceSat += div_output_val; result.unconfirmedBalanceSat += output_val;
result.unconfirmedBalanceSat -= div_input_val; result.unconfirmedBalanceSat -= input_val;
} }
if (!options.noTxList) { if (!options.noTxList) {
@ -830,9 +809,23 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
} }
//declare the queue to process tx data //declare the queue to process tx data
var tmpTxList = {}; //store processed txid temporarily to ignore duplication
var q = async.queue(function(id, cb) { var q = async.queue(function(id, cb) {
//duplication finding
if(id.txid in tmpTxList){
tmpTxList[id.txid][0]++;
if(tmpTxList[id.txid][1] !== null && tmpTxList[id.txid][0] >= tmpTxList[id.txid][1]) //all duplications are found for this txid
delete tmpTxList[id.txid];
return cb();
} else tmpTxList[id.txid] = [1, null];
if (id.height === 0xffffffff) { if (id.height === 0xffffffff) {
return self._mempool.getMempoolTransaction(id.txid, function(err, tx) { return self._mempool.getMempoolTransaction(id.txid, function(err, tx) {
@ -852,7 +845,18 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
q.pause(); //pause and wait until queue is set q.pause(); //pause and wait until queue is set
function process_chunk(err, tx){ function chunkCallback(err, tx){
if(!err && !tx) //no error or tx data (duplicate calls will have empty tx value)
return;
if(tx){
let txid = tx.txid();
tmpTxList[txid][1] = self._getOccurrenceCount(tx, address);
if(tmpTxList[txid][0] >= tmpTxList[txid][1]) //all duplications are found for this txid
delete tmpTxList[txid];
}
streamer(err, tx); streamer(err, tx);
@ -883,7 +887,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
return next(); return next();
} }
mempoolTxids.map(id => q.push(id, process_chunk)); mempoolTxids.map(id => q.push(id, chunkCallback));
next(); next();
}, },
// stream the rest of the confirmed txids out of the address index // stream the rest of the confirmed txids out of the address index
@ -908,7 +912,7 @@ AddressService.prototype._streamAddressSummary = function(address, options, stre
txIdTransformStream._transform = function(chunk, enc, cb) { txIdTransformStream._transform = function(chunk, enc, cb) {
var txInfo = self._encoding.decodeAddressIndexKey(chunk); var txInfo = self._encoding.decodeAddressIndexKey(chunk);
q.push({ txid: txInfo.txid, height: txInfo.height }, process_chunk); q.push({ txid: txInfo.txid, height: txInfo.height }, chunkCallback);
cb(); cb();
}; };