diff --git a/lib/services/address/constants.js b/lib/services/address/constants.js index f11b3fe7..6ab3aba1 100644 --- a/lib/services/address/constants.js +++ b/lib/services/address/constants.js @@ -45,7 +45,7 @@ exports.SUMMARY_CACHE_THRESHOLD = 10000; // The default maximum length queries exports.MAX_INPUTS_QUERY_LENGTH = 50000; exports.MAX_OUTPUTS_QUERY_LENGTH = 50000; - +exports.MAX_HISTORY_QUERY_LENGTH = 1000; module.exports = exports; diff --git a/lib/services/address/encoding.js b/lib/services/address/encoding.js index ca42fc7f..2f83faab 100644 --- a/lib/services/address/encoding.js +++ b/lib/services/address/encoding.js @@ -181,8 +181,12 @@ exports.encodeSummaryCacheValue = function(cache, tipHeight) { buffer.writeDoubleBE(cache.result.totalReceived, 4); buffer.writeDoubleBE(cache.result.balance, 12); var txidBuffers = []; - for (var key in cache.result.appearanceIds) { - txidBuffers.push(new Buffer(key, 'hex')); + for (var i = 0; i < cache.result.txids.length; i++) { + var buf = new Buffer(new Array(36)); + var txid = cache.result.txids[i]; + buf.write(txid, 'hex'); + buf.writeUInt32BE(cache.result.appearanceIds[txid], 32); + txidBuffers.push(buf); } var txidsBuffer = Buffer.concat(txidBuffers); var value = Buffer.concat([buffer, txidsBuffer]); @@ -198,17 +202,21 @@ exports.decodeSummaryCacheValue = function(buffer) { // read 32 byte chunks until exhausted var appearanceIds = {}; - var pos = 16; + var txids = []; + var pos = 20; while(pos < buffer.length) { var txid = buffer.slice(pos, pos + 32).toString('hex'); - appearanceIds[txid] = true; - pos += 32; + var txidHeight = buffer.readUInt32BE(pos + 32); + txids.push(txid); + appearanceIds[txid] = txidHeight; + pos += 36; } var cache = { height: height, result: { appearanceIds: appearanceIds, + txids: txids, totalReceived: totalReceived, balance: balance, unconfirmedAppearanceIds: {}, // unconfirmed values are never stored in cache diff --git a/lib/services/address/history.js b/lib/services/address/history.js index 1795fede..dc15766c 100644 --- a/lib/services/address/history.js +++ b/lib/services/address/history.js @@ -2,9 +2,10 @@ var bitcore = require('bitcore-lib'); var async = require('async'); -var CombinedStream = require('./streams/combined'); var _ = bitcore.deps._; +var constants = require('./constants'); + /** * This represents an instance that keeps track of data over a series of * asynchronous I/O calls to get the transaction history for a group of @@ -20,7 +21,21 @@ function AddressHistory(args) { } else { this.addresses = [args.addresses]; } - this.combinedArray = []; + + this.maxHistoryQueryLength = constants.MAX_HISTORY_QUERY_LENGTH; + + this.addressStrings = []; + for (var i = 0; i < this.addresses.length; i++) { + var address = this.addresses[i]; + if (address instanceof bitcore.Address) { + this.addressStrings.push(address.toString()); + } else if (_.isString(address)) { + this.addressStrings.push(address); + } else { + throw new TypeError('Addresses are expected to be strings'); + } + } + this.detailedArray = []; } @@ -42,28 +57,33 @@ AddressHistory.prototype.get = function(callback) { var address = self.addresses[0]; - var combinedStream = new CombinedStream({ - inputStream: this.node.services.address.createInputsStream(address, this.options), - outputStream: this.node.services.address.createOutputsStream(address, this.options) - }); + this.node.services.address.getAddressSummary(address, this.options, function(err, summary) { + if (err) { + return callback(err); + } - // Results from the transaction info stream are grouped into - // sets based on block height - combinedStream.on('data', function(block) { - self.combinedArray = self.combinedArray.concat(block); - }); + totalCount = summary.txids.length; - combinedStream.on('end', function() { - totalCount = Number(self.combinedArray.length); + // TODO: Make sure txids are sorted by height and time + var fromOffset = summary.txids.length - self.options.from; + var toOffset = summary.txids.length - self.options.to; + var txids = summary.txids.slice(toOffset, fromOffset); - self.sortAndPaginateCombinedArray(); + // Verify that this query isn't too long + if (txids.length > self.maxHistoryQueryLength) { + return callback(new Error( + 'Maximum length query (' + self.maxAddressQueryLength + ') exceeded for addresses:' + + this.address.join(',') + )); + } - // TODO: Add the mempool transactions + // Reverse to include most recent at the top + txids.reverse(); async.eachSeries( - self.combinedArray, - function(txInfo, next) { - self.getDetailedInfo(txInfo, next); + txids, + function(txid, next) { + self.getDetailedInfo(txid, next); }, function(err) { if (err) { @@ -75,12 +95,15 @@ AddressHistory.prototype.get = function(callback) { }); } ); + }); + }; /** * A helper function to sort and slice/paginate the `combinedArray` */ +// TODO: Remove once txids summary results are verified to be sorted AddressHistory.prototype.sortAndPaginateCombinedArray = function() { this.combinedArray.sort(AddressHistory.sortByHeight); if (!_.isUndefined(this.options.from) && !_.isUndefined(this.options.to)) { @@ -94,6 +117,7 @@ AddressHistory.prototype.sortAndPaginateCombinedArray = function() { * @param {Object} a - An item from the `combinedArray` * @param {Object} b */ +// TODO: Remove once txids summary results are verified to be sorted AddressHistory.sortByHeight = function(a, b) { if (a.height < 0 && b.height < 0) { // Both are from the mempool, compare timestamps @@ -123,12 +147,12 @@ AddressHistory.sortByHeight = function(a, b) { * @param {Object} txInfo - An item from the `combinedArray` * @param {Function} next */ -AddressHistory.prototype.getDetailedInfo = function(txInfo, next) { +AddressHistory.prototype.getDetailedInfo = function(txid, next) { var self = this; var queryMempool = _.isUndefined(self.options.queryMempool) ? true : self.options.queryMempool; self.node.services.db.getTransactionWithBlockInfo( - txInfo.txid, + txid, queryMempool, function(err, transaction) { if (err) { @@ -136,13 +160,15 @@ AddressHistory.prototype.getDetailedInfo = function(txInfo, next) { } transaction.populateInputs(self.node.services.db, [], function(err) { - if(err) { + if (err) { return next(err); } + var addressDetails = self.getAddressDetailsForTransaction(transaction); + self.detailedArray.push({ - addresses: txInfo.addresses, - satoshis: self.getSatoshisDetail(transaction, txInfo), + addresses: addressDetails.addresses, + satoshis: addressDetails.satoshis, height: transaction.__height, confirmations: self.getConfirmationsDetail(transaction), timestamp: transaction.__timestamp, @@ -169,23 +195,52 @@ AddressHistory.prototype.getConfirmationsDetail = function(transaction) { return confirmations; }; -/** - * A helper function for `getDetailedInfo` for getting the satoshis. - * @param {Transaction} transaction - A transaction populated with previous outputs - * @param {Object} txInfo - An item from `combinedArray` - */ -AddressHistory.prototype.getSatoshisDetail = function(transaction, txInfo) { - var satoshis = txInfo.satoshis || 0; +AddressHistory.prototype.getAddressDetailsForTransaction = function(transaction) { + var result = { + addresses: {}, + satoshis: 0 + }; - for(var address in txInfo.addresses) { - if (txInfo.addresses[address].inputIndexes.length >= 0) { - for(var j = 0; j < txInfo.addresses[address].inputIndexes.length; j++) { - satoshis -= transaction.inputs[txInfo.addresses[address].inputIndexes[j]].output.satoshis; + for (var inputIndex = 0; inputIndex < transaction.inputs.length; inputIndex++) { + var input = transaction.inputs[inputIndex]; + if (!input.script) { + continue; + } + var inputAddress = input.script.toAddress(this.node.network); + if (inputAddress && this.addressStrings.indexOf(inputAddress.toString()) > 0) { + if (!result.addresses[inputAddress]) { + result.addresses[inputAddress] = { + inputIndexes: [], + outputIndexes: [] + }; + } else { + result.addresses[inputAddress].inputIndexes.push(inputIndex); } + result.satoshis -= input.output.satoshis; } } - return satoshis; + for (var outputIndex = 0; outputIndex < transaction.outputs.length; outputIndex++) { + var output = transaction.outputs[outputIndex]; + if (!output.script) { + continue; + } + var outputAddress = output.script.toAddress(this.node.network); + if (outputAddress && this.addressStrings.indexOf(outputAddress.toString()) > 0) { + if (!result.addresses[outputAddress]) { + result.addresses[outputAddress] = { + inputIndexes: [], + outputIndexes: [] + }; + } else { + result.addresses[outputAddress].inputIndexes.push(outputIndex); + } + result.satoshis += output.satoshis; + } + } + + return result; + }; module.exports = AddressHistory; diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 1eeb0404..0d3a5576 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -763,7 +763,7 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options * @param {Number} [options.end] - The relevant end block height * @param {Function} callback */ -AddressService.prototype.createInputsStream = function(addressStr, options, callback) { +AddressService.prototype.createInputsStream = function(addressStr, options) { var inputStream = new InputsTransformStream({ address: new Address(addressStr, this.node.network), @@ -1331,6 +1331,9 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb function(cache, next) { self._getAddressOutputsSummary(address, cache, tipHeight, next); }, + function(cache, next) { + self._sortTxids(cache, tipHeight, next); + }, function(cache, next) { self._saveAddressSummaryCache(address, cache, tipHeight, next); } @@ -1340,7 +1343,7 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb } var result = cache.result; - var confirmedTxids = Object.keys(result.appearanceIds); + var confirmedTxids = result.txids; var unconfirmedTxids = Object.keys(result.unconfirmedAppearanceIds); var summary = { @@ -1360,16 +1363,7 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb } if (!options.noTxList) { - var txids = confirmedTxids.concat(unconfirmedTxids); - - // sort by height - summary.txids = txids.sort(function(a, b) { - return a.height > b.height ? 1 : -1; - }).map(function(obj) { - return obj.txid; - }).filter(function(value, index, self) { - return self.indexOf(value) === index; - }); + summary.txids = confirmedTxids.concat(unconfirmedTxids); } callback(null, summary); @@ -1378,8 +1372,22 @@ AddressService.prototype.getAddressSummary = function(addressArg, options, callb }; +AddressService.prototype._sortTxids = function(cache, tipHeight, callback) { + if (cache.height === tipHeight) { + return callback(null, cache); + } + cache.result.txids = Object.keys(cache.result.appearanceIds); + cache.result.txids.sort(function(a, b) { + return cache.result.appearanceIds[a] - cache.result.appearanceIds[b]; + }); + callback(null, cache); +}; + AddressService.prototype._saveAddressSummaryCache = function(address, cache, tipHeight, callback) { - var transactionLength = Object.keys(cache.result.appearanceIds).length; + if (cache.height === tipHeight) { + return callback(null, cache); + } + var transactionLength = cache.result.txids.length; var exceedsCacheThreshold = (transactionLength > this.summaryCacheThreshold); if (exceedsCacheThreshold) { log.info('Saving address summary cache for: ' + address.toString() + 'at height: ' + tipHeight); @@ -1422,6 +1430,9 @@ AddressService.prototype._getAddressSummaryCache = function(address, callback) { }; AddressService.prototype._getAddressInputsSummary = function(address, cache, tipHeight, callback) { + if (cache.height === tipHeight) { + return callback(null, cache); + } $.checkArgument(address instanceof Address); var self = this; @@ -1435,7 +1446,7 @@ AddressService.prototype._getAddressInputsSummary = function(address, cache, tip var inputsStream = self.createInputsStream(address, opts); inputsStream.on('data', function(input) { var txid = input.txid; - cache.result.appearanceIds[txid] = true; + cache.result.appearanceIds[txid] = input.height; }); inputsStream.on('error', function(err) { @@ -1462,6 +1473,9 @@ AddressService.prototype._getAddressInputsSummary = function(address, cache, tip }; AddressService.prototype._getAddressOutputsSummary = function(address, cache, tipHeight, callback) { + if (cache.height === tipHeight) { + return callback(null, cache); + } $.checkArgument(address instanceof Address); $.checkArgument(!_.isUndefined(cache.result) && !_.isUndefined(cache.result.appearanceIds) && @@ -1484,7 +1498,7 @@ AddressService.prototype._getAddressOutputsSummary = function(address, cache, ti // Bitcoind's isSpent only works for confirmed transactions var spentDB = self.node.services.bitcoind.isSpent(txid, outputIndex); cache.result.totalReceived += output.satoshis; - cache.result.appearanceIds[txid] = true; + cache.result.appearanceIds[txid] = output.height; if (!spentDB) { cache.result.balance += output.satoshis; diff --git a/lib/services/address/streams/combined.js b/lib/services/address/streams/combined.js deleted file mode 100644 index 050d8474..00000000 --- a/lib/services/address/streams/combined.js +++ /dev/null @@ -1,166 +0,0 @@ -'use strict'; - -var ReadableStream = require('stream').Readable; -var inherits = require('util').inherits; - -function TransactionInfoStream(options) { - ReadableStream.call(this, { - objectMode: true - }); - - // TODO: Be able to specify multiple input and output streams - // so that it's possible to query multiple addresses at the same time. - this._inputStream = options.inputStream; - this._outputStream = options.outputStream; - - // This holds a collection of combined inputs and outputs - // grouped into the matching block heights. - this._blocks = {}; - - this._inputCurrentHeight = 0; - this._outputCurrentHeight = 0; - this._inputFinishedHeights = []; - this._outputFinishedHeights = []; - this._inputEnded = false; - this._outputEnded = false; - - this._listenStreamEvents(); -} - -inherits(TransactionInfoStream, ReadableStream); - -TransactionInfoStream.prototype._listenStreamEvents = function() { - var self = this; - - self._inputStream.on('data', function(input) { - self._addToBlock(input); - if (input.height > self._inputCurrentHeight) { - self._inputFinishedHeights.push(input.height); - } - self._inputCurrentHeight = input.height; - self._maybePushBlock(); - }); - - self._outputStream.on('data', function(output) { - self._addToBlock(output); - if (output.height > self._outputCurrentHeight) { - self._outputFinishedHeights.push(output.height); - } - self._outputCurrentHeight = output.height; - self._maybePushBlock(); - }); - - self._inputStream.on('end', function() { - self._inputFinishedHeights.push(self._inputCurrentHeight); - self._inputEnded = true; - self._maybeEndStream(); - }); - - self._outputStream.on('end', function() { - self._outputFinishedHeights.push(self._outputCurrentHeight); - self._outputEnded = true; - self._maybeEndStream(); - }); - -}; - -TransactionInfoStream.prototype._read = function() { - this._inputStream.resume(); - this._outputStream.resume(); -}; - -TransactionInfoStream.prototype._addToBlock = function(data) { - if (!this._blocks[data.height]) { - this._blocks[data.height] = []; - } - this._blocks[data.height].push(data); -}; - -TransactionInfoStream.prototype._maybeEndStream = function() { - if (this._inputEnded && this._outputEnded) { - this._pushRemainingBlocks(); - this.push(null); - } -}; - -TransactionInfoStream.prototype._pushRemainingBlocks = function() { - var keys = Object.keys(this._blocks); - for (var i = 0; i < keys.length; i++) { - this.push(this._blocks[keys[i]]); - delete this._blocks[keys[i]]; - } -}; - -TransactionInfoStream.prototype._combineTransactionInfo = function(transactionInfo) { - var combinedArrayMap = {}; - var combinedArray = []; - var l = transactionInfo.length; - for(var i = 0; i < l; i++) { - var item = transactionInfo[i]; - var mapKey = item.txid; - if (combinedArrayMap[mapKey] >= 0) { - var combined = combinedArray[combinedArrayMap[mapKey]]; - if (!combined.addresses[item.address]) { - combined.addresses[item.address] = { - outputIndexes: [], - inputIndexes: [] - }; - } - if (item.outputIndex >= 0) { - combined.satoshis += item.satoshis; - combined.addresses[item.address].outputIndexes.push(item.outputIndex); - } else if (item.inputIndex >= 0) { - combined.addresses[item.address].inputIndexes.push(item.inputIndex); - } - } else { - item.addresses = {}; - item.addresses[item.address] = { - outputIndexes: [], - inputIndexes: [] - }; - if (item.outputIndex >= 0) { - item.addresses[item.address].outputIndexes.push(item.outputIndex); - } else if (item.inputIndex >= 0) { - item.addresses[item.address].inputIndexes.push(item.inputIndex); - } - delete item.outputIndex; - delete item.inputIndex; - delete item.address; - combinedArray.push(item); - combinedArrayMap[mapKey] = combinedArray.length - 1; - } - } - return combinedArray; -}; - -TransactionInfoStream.prototype._maybePushBlock = function() { - if (!this._inputFinishedHeights[0] && !this._outputFinishedHeights[0]) { - return; - } - - var inputFinished = this._inputFinishedHeights[0]; - var outputFinished = this._outputFinishedHeights[0]; - var bothFinished; - - if (inputFinished === outputFinished) { - bothFinished = inputFinished; - this._inputFinishedHeights.shift(); - this._outputFinishedHeights.shift(); - } else if (inputFinished <= outputFinished) { - bothFinished = inputFinished; - this._inputFinishedHeights.shift(); - } else if (outputFinished <= inputFinished) { - bothFinished = outputFinished; - this._outputFinishedHeights.shift(); - } - - if (bothFinished) { - var block = this._combineTransactionInfo(this._blocks[bothFinished]); - this.push(block); - delete this._blocks[bothFinished]; - //this._inputStream.pause(); - //this._outputStream.pause(); - } -}; - -module.exports = TransactionInfoStream;