diff --git a/lib/services/address/history.js b/lib/services/address/history.js index 61e77158..1795fede 100644 --- a/lib/services/address/history.js +++ b/lib/services/address/history.js @@ -2,6 +2,7 @@ var bitcore = require('bitcore-lib'); var async = require('async'); +var CombinedStream = require('./streams/combined'); var _ = bitcore.deps._; /** @@ -19,7 +20,6 @@ function AddressHistory(args) { } else { this.addresses = [args.addresses]; } - this.transactionInfo = []; this.combinedArray = []; this.detailedArray = []; } @@ -35,129 +35,47 @@ AddressHistory.prototype.get = function(callback) { var self = this; var totalCount; - async.eachLimit( - self.addresses, - AddressHistory.MAX_ADDRESS_QUERIES, - function(address, next) { - self.getTransactionInfo(address, next); - }, - function(err) { - if (err) { - return callback(err); - } - - self.combineTransactionInfo(); - totalCount = Number(self.combinedArray.length); - self.sortAndPaginateCombinedArray(); - - async.eachSeries( - self.combinedArray, - function(txInfo, next) { - self.getDetailedInfo(txInfo, next); - }, - function(err) { - if (err) { - return callback(err); - } - callback(null, { - totalCount: totalCount, - items: self.detailedArray - }); - } - ); - } - ); -}; - -/** - * This function will retrieve input and output information for an address - * and set the property `this.transactionInfo`. - * @param {String} address - A base58check encoded address - * @param {Function} next - */ -AddressHistory.prototype.getTransactionInfo = function(address, next) { - var self = this; - - var args = { - start: self.options.start, - end: self.options.end, - queryMempool: _.isUndefined(self.options.queryMempool) ? true : self.options.queryMempool - }; - - var outputs; - var inputs; - - async.parallel([ - function(done) { - self.node.services.address.getOutputs(address, args, function(err, result) { - if (err) { - return done(err); - } - outputs = result; - done(); - }); - }, - function(done) { - self.node.services.address.getInputs(address, args, function(err, result) { - if (err) { - return done(err); - } - inputs = result; - done(); - }); - } - ], function(err) { - if (err) { - return next(err); - } - self.transactionInfo = self.transactionInfo.concat(outputs, inputs); - next(); - }); -}; - -/** - * This function combines results from getInputs and getOutputs at - * `this.transactionInfo` to be "txid" unique at `this.combinedArray`. - */ -AddressHistory.prototype.combineTransactionInfo = function() { - var combinedArrayMap = {}; - this.combinedArray = []; - var l = this.transactionInfo.length; - for(var i = 0; i < l; i++) { - var item = this.transactionInfo[i]; - var mapKey = item.txid; - if (combinedArrayMap[mapKey] >= 0) { - var combined = this.combinedArray[combinedArrayMap[mapKey]]; - if (!combined.addresses[item.address]) { - combined.addresses[item.address] = { - outputIndexes: [], - inputIndexes: [] - }; - } - if (item.outputIndex >= 0) { - combined.satoshis += item.satoshis; - combined.addresses[item.address].outputIndexes.push(item.outputIndex); - } else if (item.inputIndex >= 0) { - combined.addresses[item.address].inputIndexes.push(item.inputIndex); - } - } else { - item.addresses = {}; - item.addresses[item.address] = { - outputIndexes: [], - inputIndexes: [] - }; - if (item.outputIndex >= 0) { - item.addresses[item.address].outputIndexes.push(item.outputIndex); - } else if (item.inputIndex >= 0) { - item.addresses[item.address].inputIndexes.push(item.inputIndex); - } - delete item.outputIndex; - delete item.inputIndex; - delete item.address; - this.combinedArray.push(item); - combinedArrayMap[mapKey] = this.combinedArray.length - 1; - } + // TODO: handle multiple addresses (restore previous functionality) + if (self.addresses.length > 1) { + return callback('Only single address queries supported currently'); } + + var address = self.addresses[0]; + + var combinedStream = new CombinedStream({ + inputStream: this.node.services.address.createInputsStream(address, this.options), + outputStream: this.node.services.address.createOutputsStream(address, this.options) + }); + + // Results from the transaction info stream are grouped into + // sets based on block height + combinedStream.on('data', function(block) { + self.combinedArray = self.combinedArray.concat(block); + }); + + combinedStream.on('end', function() { + totalCount = Number(self.combinedArray.length); + + self.sortAndPaginateCombinedArray(); + + // TODO: Add the mempool transactions + + async.eachSeries( + self.combinedArray, + function(txInfo, next) { + self.getDetailedInfo(txInfo, next); + }, + function(err) { + if (err) { + return callback(err); + } + callback(null, { + totalCount: totalCount, + items: self.detailedArray + }); + } + ); + }); }; /** diff --git a/lib/services/address/streams/combined.js b/lib/services/address/streams/combined.js new file mode 100644 index 00000000..050d8474 --- /dev/null +++ b/lib/services/address/streams/combined.js @@ -0,0 +1,166 @@ +'use strict'; + +var ReadableStream = require('stream').Readable; +var inherits = require('util').inherits; + +function TransactionInfoStream(options) { + ReadableStream.call(this, { + objectMode: true + }); + + // TODO: Be able to specify multiple input and output streams + // so that it's possible to query multiple addresses at the same time. + this._inputStream = options.inputStream; + this._outputStream = options.outputStream; + + // This holds a collection of combined inputs and outputs + // grouped into the matching block heights. + this._blocks = {}; + + this._inputCurrentHeight = 0; + this._outputCurrentHeight = 0; + this._inputFinishedHeights = []; + this._outputFinishedHeights = []; + this._inputEnded = false; + this._outputEnded = false; + + this._listenStreamEvents(); +} + +inherits(TransactionInfoStream, ReadableStream); + +TransactionInfoStream.prototype._listenStreamEvents = function() { + var self = this; + + self._inputStream.on('data', function(input) { + self._addToBlock(input); + if (input.height > self._inputCurrentHeight) { + self._inputFinishedHeights.push(input.height); + } + self._inputCurrentHeight = input.height; + self._maybePushBlock(); + }); + + self._outputStream.on('data', function(output) { + self._addToBlock(output); + if (output.height > self._outputCurrentHeight) { + self._outputFinishedHeights.push(output.height); + } + self._outputCurrentHeight = output.height; + self._maybePushBlock(); + }); + + self._inputStream.on('end', function() { + self._inputFinishedHeights.push(self._inputCurrentHeight); + self._inputEnded = true; + self._maybeEndStream(); + }); + + self._outputStream.on('end', function() { + self._outputFinishedHeights.push(self._outputCurrentHeight); + self._outputEnded = true; + self._maybeEndStream(); + }); + +}; + +TransactionInfoStream.prototype._read = function() { + this._inputStream.resume(); + this._outputStream.resume(); +}; + +TransactionInfoStream.prototype._addToBlock = function(data) { + if (!this._blocks[data.height]) { + this._blocks[data.height] = []; + } + this._blocks[data.height].push(data); +}; + +TransactionInfoStream.prototype._maybeEndStream = function() { + if (this._inputEnded && this._outputEnded) { + this._pushRemainingBlocks(); + this.push(null); + } +}; + +TransactionInfoStream.prototype._pushRemainingBlocks = function() { + var keys = Object.keys(this._blocks); + for (var i = 0; i < keys.length; i++) { + this.push(this._blocks[keys[i]]); + delete this._blocks[keys[i]]; + } +}; + +TransactionInfoStream.prototype._combineTransactionInfo = function(transactionInfo) { + var combinedArrayMap = {}; + var combinedArray = []; + var l = transactionInfo.length; + for(var i = 0; i < l; i++) { + var item = transactionInfo[i]; + var mapKey = item.txid; + if (combinedArrayMap[mapKey] >= 0) { + var combined = combinedArray[combinedArrayMap[mapKey]]; + if (!combined.addresses[item.address]) { + combined.addresses[item.address] = { + outputIndexes: [], + inputIndexes: [] + }; + } + if (item.outputIndex >= 0) { + combined.satoshis += item.satoshis; + combined.addresses[item.address].outputIndexes.push(item.outputIndex); + } else if (item.inputIndex >= 0) { + combined.addresses[item.address].inputIndexes.push(item.inputIndex); + } + } else { + item.addresses = {}; + item.addresses[item.address] = { + outputIndexes: [], + inputIndexes: [] + }; + if (item.outputIndex >= 0) { + item.addresses[item.address].outputIndexes.push(item.outputIndex); + } else if (item.inputIndex >= 0) { + item.addresses[item.address].inputIndexes.push(item.inputIndex); + } + delete item.outputIndex; + delete item.inputIndex; + delete item.address; + combinedArray.push(item); + combinedArrayMap[mapKey] = combinedArray.length - 1; + } + } + return combinedArray; +}; + +TransactionInfoStream.prototype._maybePushBlock = function() { + if (!this._inputFinishedHeights[0] && !this._outputFinishedHeights[0]) { + return; + } + + var inputFinished = this._inputFinishedHeights[0]; + var outputFinished = this._outputFinishedHeights[0]; + var bothFinished; + + if (inputFinished === outputFinished) { + bothFinished = inputFinished; + this._inputFinishedHeights.shift(); + this._outputFinishedHeights.shift(); + } else if (inputFinished <= outputFinished) { + bothFinished = inputFinished; + this._inputFinishedHeights.shift(); + } else if (outputFinished <= inputFinished) { + bothFinished = outputFinished; + this._outputFinishedHeights.shift(); + } + + if (bothFinished) { + var block = this._combineTransactionInfo(this._blocks[bothFinished]); + this.push(block); + delete this._blocks[bothFinished]; + //this._inputStream.pause(); + //this._outputStream.pause(); + } +}; + +module.exports = TransactionInfoStream;