Adding _streamAddressSummary

- Fn uses streamer to process data. Thus doesnt store the entire list of txid or tx details
- streamer fn can process the tx data as required
This commit is contained in:
sairajzero 2023-01-27 17:34:06 +05:30
parent 0283be05db
commit e13bd5e3e6

View File

@ -574,6 +574,122 @@ AddressService.prototype._getAddressTxidHistory = function(address, options, cal
};
AddressService.prototype._streamAddressSummary = function(address, options, streamer, callback) {
var self = this;
options = options || {};
options.start = options.start || 0;
options.end = options.end || 0xffffffff;
options.from = options.from || 0; //TODO: check from/to options are working or not
options.to = options.to || 0xffffffff;
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
options.endHeightBuf = new Buffer(4);
options.endHeightBuf.writeUInt32BE(options.end);
if (_.isUndefined(options.queryMempool)) {
options.queryMempool = true;
}
//declare the queue to process tx data
var q = async.queue(function(task, cb) {
let {id, options} = task;
if (id.height === 0xffffffff) {
return self._mempool.getMempoolTransaction(id.txid, function(err, tx) {
if (err || !tx) {
return cb(err || new Error('Address Service: could not find tx: ' + id.txid));
}
self._transaction.setTxMetaInfo(tx, options, cb);
});
}
self._transaction.getDetailedTransaction(id.txid, options, cb);
}, 4);
q.pause(); //pause and wait until queue is set
function process_chunk(err, tx){
streamer(err, tx);
if(err){
q.kill();
return callback();
}
}
async.waterfall([
// query the mempool for relevant txs for this address
function(next) {
if (!options.queryMempool) {
return next(null, []);
}
self._mempool.getTxidsByAddress(address, 'both', next);
},
// add the meta data such as input values, etc.
function(mempoolTxids, next) {
if (mempoolTxids.length <= 0) {
return next();
}
mempoolTxids.map(id => q.push(id, process_chunk));
next();
},
// stream the rest of the confirmed txids out of the address index
function(next) {
var txIdTransformStream = new Transform({ objectMode: true });
txIdTransformStream._flush = function(cb) {
txIdTransformStream.emit('end');
cb();
};
txIdTransformStream.on('error', function(err) {
log.error('Address Service: txstream err: ' + err);
txIdTransformStream.unpipe();
});
txIdTransformStream.on('end', function() {
q.resume();
});
txIdTransformStream._transform = function(chunk, enc, cb) {
var txInfo = self._encoding.decodeAddressIndexKey(chunk);
q.push({ txid: txInfo.txid, height: txInfo.height }, process_chunk);
cb();
};
var txidStream = self._getTxidStream(address, options);
txidStream.pipe(txIdTransformStream);
q.drain(next);
}
], callback);
}
AddressService.prototype._removeBlock = function(block, callback) {
var self = this;