diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 3977219b..8d4d1a1b 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -574,6 +574,122 @@ AddressService.prototype._getAddressTxidHistory = function(address, options, cal }; +AddressService.prototype._streamAddressSummary = function(address, options, streamer, callback) { + var self = this; + + options = options || {}; + options.start = options.start || 0; + options.end = options.end || 0xffffffff; + + options.from = options.from || 0; //TODO: check from/to options are working or not + options.to = options.to || 0xffffffff; + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + options.endHeightBuf = new Buffer(4); + options.endHeightBuf.writeUInt32BE(options.end); + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + //declare the queue to process tx data + + var q = async.queue(function(task, cb) { + + let {id, options} = task; + + if (id.height === 0xffffffff) { + + return self._mempool.getMempoolTransaction(id.txid, function(err, tx) { + + if (err || !tx) { + return cb(err || new Error('Address Service: could not find tx: ' + id.txid)); + } + self._transaction.setTxMetaInfo(tx, options, cb); + + }); + + } + + self._transaction.getDetailedTransaction(id.txid, options, cb); + + }, 4); + + q.pause(); //pause and wait until queue is set + + function process_chunk(err, tx){ + + streamer(err, tx); + + if(err){ + q.kill(); + + return callback(); + } + + } + + async.waterfall([ + + // query the mempool for relevant txs for this address + function(next) { + + if (!options.queryMempool) { + return next(null, []); + } + + self._mempool.getTxidsByAddress(address, 'both', next); + }, + + // add the meta data such as input values, etc. + function(mempoolTxids, next) { + + if (mempoolTxids.length <= 0) { + return next(); + } + + mempoolTxids.map(id => q.push(id, process_chunk)); + next(); + }, + // stream the rest of the confirmed txids out of the address index + function(next) { + + var txIdTransformStream = new Transform({ objectMode: true }); + + txIdTransformStream._flush = function(cb) { + txIdTransformStream.emit('end'); + cb(); + }; + + txIdTransformStream.on('error', function(err) { + log.error('Address Service: txstream err: ' + err); + txIdTransformStream.unpipe(); + }); + + txIdTransformStream.on('end', function() { + q.resume(); + }); + + txIdTransformStream._transform = function(chunk, enc, cb) { + var txInfo = self._encoding.decodeAddressIndexKey(chunk); + q.push({ txid: txInfo.txid, height: txInfo.height }, process_chunk); + + cb(); + }; + + var txidStream = self._getTxidStream(address, options); + txidStream.pipe(txIdTransformStream); + + q.drain(next); + + } + ], callback); + +} + AddressService.prototype._removeBlock = function(block, callback) { var self = this;