From cab25cf397f67f8da60c3af3e2f120cff7fd5729 Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Mon, 28 Dec 2015 15:43:20 -0500 Subject: [PATCH] Address Service: Start to use streams for memory optimization with large queries --- lib/services/address/constants.js | 41 + lib/services/address/encoding.js | 211 +++++ lib/services/address/index.js | 800 +++++++----------- .../address/streams/inputs-transform.js | 40 + .../address/streams/outputs-transform.js | 42 + 5 files changed, 663 insertions(+), 471 deletions(-) create mode 100644 lib/services/address/constants.js create mode 100644 lib/services/address/encoding.js create mode 100644 lib/services/address/streams/inputs-transform.js create mode 100644 lib/services/address/streams/outputs-transform.js diff --git a/lib/services/address/constants.js b/lib/services/address/constants.js new file mode 100644 index 00000000..7a0cf8dd --- /dev/null +++ b/lib/services/address/constants.js @@ -0,0 +1,41 @@ +'use strict'; + +var exports = {}; + +exports.PREFIXES = { + OUTPUTS: new Buffer('02', 'hex'), // Query outputs by address and/or height + SPENTS: new Buffer('03', 'hex'), // Query inputs by address and/or height + SPENTSMAP: new Buffer('05', 'hex') // Get the input that spends an output +}; + +exports.MEMPREFIXES = { + OUTPUTS: new Buffer('01', 'hex'), // Query mempool outputs by address + SPENTS: new Buffer('02', 'hex'), // Query mempool inputs by address + SPENTSMAP: new Buffer('03', 'hex') // Query mempool for the input that spends an output +}; + +// To save space, we're only storing the PubKeyHash or ScriptHash in our index. +// To avoid intentional unspendable collisions, which have been seen on the blockchain, +// we must store the hash type (PK or Script) as well. +exports.HASH_TYPES = { + PUBKEY: new Buffer('01', 'hex'), + REDEEMSCRIPT: new Buffer('02', 'hex') +}; + +// Translates from our enum type back into the hash types returned by +// bitcore-lib/address. +exports.HASH_TYPES_READABLE = { + '01': 'pubkeyhash', + '02': 'scripthash' +}; + +exports.HASH_TYPES_MAP = { + 'pubkeyhash': exports.HASH_TYPES.PUBKEY, + 'scripthash': exports.HASH_TYPES.REDEEMSCRIPT +}; + +exports.SPACER_MIN = new Buffer('00', 'hex'); +exports.SPACER_MAX = new Buffer('ff', 'hex'); + +module.exports = exports; + diff --git a/lib/services/address/encoding.js b/lib/services/address/encoding.js new file mode 100644 index 00000000..68313754 --- /dev/null +++ b/lib/services/address/encoding.js @@ -0,0 +1,211 @@ +'use strict'; + +var bitcore = require('bitcore-lib'); +var BufferReader = bitcore.encoding.BufferReader; +var Address = bitcore.Address; +var PublicKey = bitcore.PublicKey; +var constants = require('./constants'); +var $ = bitcore.util.preconditions; + +var exports = {}; + +exports.encodeSpentIndexSyncKey = function(txidBuffer, outputIndex) { + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var key = Buffer.concat([ + txidBuffer, + outputIndexBuffer + ]); + return key.toString('binary'); +}; + +exports.encodeOutputKey = function(hashBuffer, hashTypeBuffer, height, txidBuffer, outputIndex) { + var heightBuffer = new Buffer(4); + heightBuffer.writeUInt32BE(height); + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var key = Buffer.concat([ + constants.PREFIXES.OUTPUTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + heightBuffer, + txidBuffer, + outputIndexBuffer + ]); + return key; +}; + +exports.decodeOutputKey = function(buffer) { + var reader = new BufferReader(buffer); + var prefix = reader.read(1); + var hashBuffer = reader.read(20); + var hashTypeBuffer = reader.read(1); + var spacer = reader.read(1); + var height = reader.readUInt32BE(); + var txid = reader.read(32); + var outputIndex = reader.readUInt32BE(); + return { + prefix: prefix, + hashBuffer: hashBuffer, + hashTypeBuffer: hashTypeBuffer, + height: height, + txid: txid, + outputIndex: outputIndex + }; +}; + +exports.encodeOutputValue = function(satoshis, scriptBuffer) { + var satoshisBuffer = new Buffer(8); + satoshisBuffer.writeDoubleBE(satoshis); + return Buffer.concat([satoshisBuffer, scriptBuffer]); +}; + +exports.decodeOutputValue = function(buffer) { + var satoshis = buffer.readDoubleBE(0); + var scriptBuffer = buffer.slice(8, buffer.length); + return { + satoshis: satoshis, + scriptBuffer: scriptBuffer + }; +}; + +exports.encodeInputKey = function(hashBuffer, hashTypeBuffer, height, prevTxIdBuffer, outputIndex) { + var heightBuffer = new Buffer(4); + heightBuffer.writeUInt32BE(height); + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + return Buffer.concat([ + constants.PREFIXES.SPENTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + heightBuffer, + prevTxIdBuffer, + outputIndexBuffer + ]); +}; + +exports.decodeInputKey = function(buffer) { + var reader = new BufferReader(buffer); + var prefix = reader.read(1); + var hashBuffer = reader.read(20); + var hashTypeBuffer = reader.read(1); + var spacer = reader.read(1); + var height = reader.readUInt32BE(); + var prevTxId = reader.read(32); + var outputIndex = reader.readUInt32BE(); + return { + prefix: prefix, + hashBuffer: hashBuffer, + hashTypeBuffer: hashTypeBuffer, + height: height, + prevTxId: prevTxId, + outputIndex: outputIndex + }; +}; + +exports.encodeInputValue = function(txidBuffer, inputIndex) { + var inputIndexBuffer = new Buffer(4); + inputIndexBuffer.writeUInt32BE(inputIndex); + return Buffer.concat([ + txidBuffer, + inputIndexBuffer + ]); +}; + +exports.decodeInputValue = function(buffer) { + var txid = buffer.slice(0, 32); + var inputIndex = buffer.readUInt32BE(32); + return { + txid: txid, + inputIndex: inputIndex + }; +}; + +exports.encodeInputKeyMap = function(outputTxIdBuffer, outputIndex) { + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + return Buffer.concat([ + constants.PREFIXES.SPENTSMAP, + outputTxIdBuffer, + outputIndexBuffer + ]); +}; + +exports.decodeInputKeyMap = function(buffer) { + var txid = buffer.slice(1, 33); + var outputIndex = buffer.readUInt32BE(33); + return { + outputTxId: txid, + outputIndex: outputIndex + }; +}; + +exports.encodeInputValueMap = function(inputTxIdBuffer, inputIndex) { + var inputIndexBuffer = new Buffer(4); + inputIndexBuffer.writeUInt32BE(inputIndex); + return Buffer.concat([ + inputTxIdBuffer, + inputIndexBuffer + ]); +}; + +exports.decodeInputValueMap = function(buffer) { + var txid = buffer.slice(0, 32); + var inputIndex = buffer.readUInt32BE(32); + return { + inputTxId: txid, + inputIndex: inputIndex + }; +}; + +exports.getAddressInfo = function(addressStr) { + var addrObj = bitcore.Address(addressStr); + var hashTypeBuffer = constants.HASH_TYPES_MAP[addrObj.type]; + + return { + hashBuffer: addrObj.hashBuffer, + hashTypeBuffer: hashTypeBuffer, + hashTypeReadable: addrObj.type + }; +}; + +/** + * This function is optimized to return address information about an output script + * without constructing a Bitcore Address instance. + * @param {Script} - An instance of a Bitcore Script + * @param {Network|String} - The network for the address + */ +exports.extractAddressInfoFromScript = function(script, network) { + $.checkArgument(network, 'Second argument is expected to be a network'); + var hashBuffer; + var addressType; + var hashTypeBuffer; + if (script.isPublicKeyHashOut()) { + hashBuffer = script.chunks[2].buf; + hashTypeBuffer = constants.HASH_TYPES.PUBKEY; + addressType = Address.PayToPublicKeyHash; + } else if (script.isScriptHashOut()) { + hashBuffer = script.chunks[1].buf; + hashTypeBuffer = constants.HASH_TYPES.REDEEMSCRIPT; + addressType = Address.PayToScriptHash; + } else if (script.isPublicKeyOut()) { + var pubkey = script.chunks[0].buf; + var address = Address.fromPublicKey(new PublicKey(pubkey), network); + hashBuffer = address.hashBuffer; + hashTypeBuffer = constants.HASH_TYPES.PUBKEY; + // pay-to-publickey doesn't have an address, however for compatibility + // purposes, we can create an address + addressType = Address.PayToPublicKeyHash; + } else { + return false; + } + return { + hashBuffer: hashBuffer, + hashTypeBuffer: hashTypeBuffer, + addressType: addressType + }; +}; + +module.exports = exports; diff --git a/lib/services/address/index.js b/lib/services/address/index.js index ef7e3e2c..cf37e7e2 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -16,11 +16,14 @@ var memdown = require('memdown'); var $ = bitcore.util.preconditions; var _ = bitcore.deps._; var Hash = bitcore.crypto.Hash; -var BufferReader = bitcore.encoding.BufferReader; var EventEmitter = require('events').EventEmitter; -var PublicKey = bitcore.PublicKey; var Address = bitcore.Address; var AddressHistory = require('./history'); +var constants = require('./constants'); +var encoding = require('./encoding'); +var InputsTransformStream = require('./streams/inputs-transform'); +var OutputsTransformStream = require('./streams/outputs-transform'); + /** * The Address Service builds upon the Database Service and the Bitcoin Service to add additional @@ -58,42 +61,6 @@ AddressService.dependencies = [ 'db' ]; -AddressService.PREFIXES = { - OUTPUTS: new Buffer('02', 'hex'), // Query outputs by address and/or height - SPENTS: new Buffer('03', 'hex'), // Query inputs by address and/or height - SPENTSMAP: new Buffer('05', 'hex') // Get the input that spends an output -}; - -AddressService.MEMPREFIXES = { - OUTPUTS: new Buffer('01', 'hex'), // Query mempool outputs by address - SPENTS: new Buffer('02', 'hex'), // Query mempool inputs by address - SPENTSMAP: new Buffer('03', 'hex') // Query mempool for the input that spends an output -}; - -// To save space, we're only storing the PubKeyHash or ScriptHash in our index. -// To avoid intentional unspendable collisions, which have been seen on the blockchain, -// we must store the hash type (PK or Script) as well. -AddressService.HASH_TYPES = { - PUBKEY: new Buffer('01', 'hex'), - REDEEMSCRIPT: new Buffer('02', 'hex') -}; - -// Translates from our enum type back into the hash types returned by -// bitcore-lib/address. -AddressService.HASH_TYPES_READABLE = { - '01': 'pubkeyhash', - '02': 'scripthash' -}; - -// Trnaslates from address types to our enum type. -AddressService.HASH_TYPES_MAP = { - 'pubkeyhash': AddressService.HASH_TYPES.PUBKEY, - 'scripthash': AddressService.HASH_TYPES.REDEEMSCRIPT -}; - -AddressService.SPACER_MIN = new Buffer('00', 'hex'); -AddressService.SPACER_MAX = new Buffer('ff', 'hex'); - AddressService.prototype.start = function(callback) { var self = this; @@ -205,7 +172,7 @@ AddressService.prototype.transactionOutputHandler = function(messages, tx, outpu return; } - var addressInfo = this._extractAddressInfoFromScript(script); + var addressInfo = encoding.extractAddressInfoFromScript(script, this.node.network); if (!addressInfo) { return; } @@ -312,7 +279,7 @@ AddressService.prototype.updateMempoolIndex = function(tx, add, callback) { if (!output.script) { continue; } - var addressInfo = this._extractAddressInfoFromScript(output.script); + var addressInfo = encoding.extractAddressInfoFromScript(output.script, this.node.network); if (!addressInfo) { continue; } @@ -322,14 +289,14 @@ AddressService.prototype.updateMempoolIndex = function(tx, add, callback) { outputIndexBuffer.writeUInt32BE(outputIndex); var outKey = Buffer.concat([ - AddressService.MEMPREFIXES.OUTPUTS, + constants.MEMPREFIXES.OUTPUTS, addressInfo.hashBuffer, addressInfo.hashTypeBuffer, txidBuffer, outputIndexBuffer ]); - var outValue = this._encodeOutputValue(output.satoshis, output._scriptBuffer); + var outValue = encoding.encodeOutputValue(output.satoshis, output._scriptBuffer); operations.push({ type: action, @@ -347,7 +314,7 @@ AddressService.prototype.updateMempoolIndex = function(tx, add, callback) { inputOutputIndexBuffer.writeUInt32BE(input.outputIndex); // Add an additional small spent index for fast synchronous lookups - var spentIndexSyncKey = this._encodeSpentIndexSyncKey( + var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( input.prevTxId, input.outputIndex ); @@ -359,7 +326,7 @@ AddressService.prototype.updateMempoolIndex = function(tx, add, callback) { // Add a more detailed spent index with values var spentIndexKey = Buffer.concat([ - AddressService.MEMPREFIXES.SPENTSMAP, + constants.MEMPREFIXES.SPENTSMAP, input.prevTxId, inputOutputIndexBuffer ]); @@ -380,15 +347,15 @@ AddressService.prototype.updateMempoolIndex = function(tx, add, callback) { var inputHashType; if (input.script.isPublicKeyHashIn()) { inputHashBuffer = Hash.sha256ripemd160(input.script.chunks[1].buf); - inputHashType = AddressService.HASH_TYPES.PUBKEY; + inputHashType = constants.HASH_TYPES.PUBKEY; } else if (input.script.isScriptHashIn()) { inputHashBuffer = Hash.sha256ripemd160(input.script.chunks[input.script.chunks.length - 1].buf); - inputHashType = AddressService.HASH_TYPES.REDEEMSCRIPT; + inputHashType = constants.HASH_TYPES.REDEEMSCRIPT; } else { continue; } var inputKey = Buffer.concat([ - AddressService.MEMPREFIXES.SPENTS, + constants.MEMPREFIXES.SPENTS, inputHashBuffer, inputHashType, input.prevTxId, @@ -417,42 +384,6 @@ AddressService.prototype.updateMempoolIndex = function(tx, add, callback) { this.mempoolIndex.batch(operations, callback); }; -/** - * This function is optimized to return address information about an output script - * without constructing a Bitcore Address instance. - * @param {Script} - An instance of a Bitcore Script - * @private - */ -AddressService.prototype._extractAddressInfoFromScript = function(script) { - var hashBuffer; - var addressType; - var hashTypeBuffer; - if (script.isPublicKeyHashOut()) { - hashBuffer = script.chunks[2].buf; - hashTypeBuffer = AddressService.HASH_TYPES.PUBKEY; - addressType = Address.PayToPublicKeyHash; - } else if (script.isScriptHashOut()) { - hashBuffer = script.chunks[1].buf; - hashTypeBuffer = AddressService.HASH_TYPES.REDEEMSCRIPT; - addressType = Address.PayToScriptHash; - } else if (script.isPublicKeyOut()) { - var pubkey = script.chunks[0].buf; - var address = Address.fromPublicKey(new PublicKey(pubkey), this.node.network); - hashBuffer = address.hashBuffer; - hashTypeBuffer = AddressService.HASH_TYPES.PUBKEY; - // pay-to-publickey doesn't have an address, however for compatibility - // purposes, we can create an address - addressType = Address.PayToPublicKeyHash; - } else { - return false; - } - return { - hashBuffer: hashBuffer, - hashTypeBuffer: hashTypeBuffer, - addressType: addressType - }; -}; - /** * The Database Service will run this function when blocks are connected and * disconnected to the chain during syncing and reorganizations. @@ -494,7 +425,7 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) { continue; } - var addressInfo = this._extractAddressInfoFromScript(script); + var addressInfo = encoding.extractAddressInfoFromScript(script, this.node.network); if (!addressInfo) { continue; } @@ -504,9 +435,9 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) { // can have a time that is previous to the previous block (however not // less than the mean of the 11 previous blocks) and not greater than 2 // hours in the future. - var key = this._encodeOutputKey(addressInfo.hashBuffer, addressInfo.hashTypeBuffer, - height, txidBuffer, outputIndex); - var value = this._encodeOutputValue(output.satoshis, output._scriptBuffer); + var key = encoding.encodeOutputKey(addressInfo.hashBuffer, addressInfo.hashTypeBuffer, + height, txidBuffer, outputIndex); + var value = encoding.encodeOutputValue(output.satoshis, output._scriptBuffer); operations.push({ type: action, key: key, @@ -549,10 +480,10 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) { if (input.script.isPublicKeyHashIn()) { inputHash = Hash.sha256ripemd160(input.script.chunks[1].buf); - inputHashType = AddressService.HASH_TYPES.PUBKEY; + inputHashType = constants.HASH_TYPES.PUBKEY; } else if (input.script.isScriptHashIn()) { inputHash = Hash.sha256ripemd160(input.script.chunks[input.script.chunks.length - 1].buf); - inputHashType = AddressService.HASH_TYPES.REDEEMSCRIPT; + inputHashType = constants.HASH_TYPES.REDEEMSCRIPT; } else { continue; } @@ -560,8 +491,8 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) { var prevTxIdBuffer = new Buffer(input.prevTxId, 'hex'); // To be able to query inputs by address and spent height - var inputKey = this._encodeInputKey(inputHash, inputHashType, height, prevTxIdBuffer, input.outputIndex); - var inputValue = this._encodeInputValue(txidBuffer, inputIndex); + var inputKey = encoding.encodeInputKey(inputHash, inputHashType, height, prevTxIdBuffer, input.outputIndex); + var inputValue = encoding.encodeInputValue(txidBuffer, inputIndex); operations.push({ type: action, @@ -570,8 +501,8 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) { }); // To be able to search for an input spending an output - var inputKeyMap = this._encodeInputKeyMap(prevTxIdBuffer, input.outputIndex); - var inputValueMap = this._encodeInputValueMap(txidBuffer, inputIndex); + var inputKeyMap = encoding.encodeInputKeyMap(prevTxIdBuffer, input.outputIndex); + var inputValueMap = encoding.encodeInputValueMap(txidBuffer, inputIndex); operations.push({ type: action, @@ -587,168 +518,6 @@ AddressService.prototype.blockHandler = function(block, addOutput, callback) { }); }; -AddressService.prototype._encodeSpentIndexSyncKey = function(txidBuffer, outputIndex) { - var outputIndexBuffer = new Buffer(4); - outputIndexBuffer.writeUInt32BE(outputIndex); - var key = Buffer.concat([ - txidBuffer, - outputIndexBuffer - ]); - return key.toString('binary'); -}; - -AddressService.prototype._encodeOutputKey = function(hashBuffer, hashTypeBuffer, height, txidBuffer, outputIndex) { - var heightBuffer = new Buffer(4); - heightBuffer.writeUInt32BE(height); - var outputIndexBuffer = new Buffer(4); - outputIndexBuffer.writeUInt32BE(outputIndex); - var key = Buffer.concat([ - AddressService.PREFIXES.OUTPUTS, - hashBuffer, - hashTypeBuffer, - AddressService.SPACER_MIN, - heightBuffer, - txidBuffer, - outputIndexBuffer - ]); - return key; -}; - -AddressService.prototype._decodeOutputKey = function(buffer) { - var reader = new BufferReader(buffer); - var prefix = reader.read(1); - var hashBuffer = reader.read(20); - var hashTypeBuffer = reader.read(1); - var spacer = reader.read(1); - var height = reader.readUInt32BE(); - var txid = reader.read(32); - var outputIndex = reader.readUInt32BE(); - return { - prefix: prefix, - hashBuffer: hashBuffer, - hashTypeBuffer: hashTypeBuffer, - height: height, - txid: txid, - outputIndex: outputIndex - }; -}; - -AddressService.prototype._encodeOutputValue = function(satoshis, scriptBuffer) { - var satoshisBuffer = new Buffer(8); - satoshisBuffer.writeDoubleBE(satoshis); - return Buffer.concat([satoshisBuffer, scriptBuffer]); -}; - -AddressService.prototype._decodeOutputValue = function(buffer) { - var satoshis = buffer.readDoubleBE(0); - var scriptBuffer = buffer.slice(8, buffer.length); - return { - satoshis: satoshis, - scriptBuffer: scriptBuffer - }; -}; - -AddressService.prototype._encodeInputKey = function(hashBuffer, hashTypeBuffer, height, prevTxIdBuffer, outputIndex) { - var heightBuffer = new Buffer(4); - heightBuffer.writeUInt32BE(height); - var outputIndexBuffer = new Buffer(4); - outputIndexBuffer.writeUInt32BE(outputIndex); - return Buffer.concat([ - AddressService.PREFIXES.SPENTS, - hashBuffer, - hashTypeBuffer, - AddressService.SPACER_MIN, - heightBuffer, - prevTxIdBuffer, - outputIndexBuffer - ]); -}; - -AddressService.prototype._decodeInputKey = function(buffer) { - var reader = new BufferReader(buffer); - var prefix = reader.read(1); - var hashBuffer = reader.read(20); - var hashTypeBuffer = reader.read(1); - var spacer = reader.read(1); - var height = reader.readUInt32BE(); - var prevTxId = reader.read(32); - var outputIndex = reader.readUInt32BE(); - return { - prefix: prefix, - hashBuffer: hashBuffer, - hashTypeBuffer: hashTypeBuffer, - height: height, - prevTxId: prevTxId, - outputIndex: outputIndex - }; -}; - -AddressService.prototype._encodeInputValue = function(txidBuffer, inputIndex) { - var inputIndexBuffer = new Buffer(4); - inputIndexBuffer.writeUInt32BE(inputIndex); - return Buffer.concat([ - txidBuffer, - inputIndexBuffer - ]); -}; - -AddressService.prototype._decodeInputValue = function(buffer) { - var txid = buffer.slice(0, 32); - var inputIndex = buffer.readUInt32BE(32); - return { - txid: txid, - inputIndex: inputIndex - }; -}; - -AddressService.prototype._encodeInputKeyMap = function(outputTxIdBuffer, outputIndex) { - var outputIndexBuffer = new Buffer(4); - outputIndexBuffer.writeUInt32BE(outputIndex); - return Buffer.concat([ - AddressService.PREFIXES.SPENTSMAP, - outputTxIdBuffer, - outputIndexBuffer - ]); -}; - -AddressService.prototype._decodeInputKeyMap = function(buffer) { - var txid = buffer.slice(1, 33); - var outputIndex = buffer.readUInt32BE(33); - return { - outputTxId: txid, - outputIndex: outputIndex - }; -}; - -AddressService.prototype._encodeInputValueMap = function(inputTxIdBuffer, inputIndex) { - var inputIndexBuffer = new Buffer(4); - inputIndexBuffer.writeUInt32BE(inputIndex); - return Buffer.concat([ - inputTxIdBuffer, - inputIndexBuffer - ]); -}; - -AddressService.prototype._decodeInputValueMap = function(buffer) { - var txid = buffer.slice(0, 32); - var inputIndex = buffer.readUInt32BE(32); - return { - inputTxId: txid, - inputIndex: inputIndex - }; -}; - -AddressService.prototype._getAddressInfo = function(addressStr) { - var addrObj = bitcore.Address(addressStr); - var hashTypeBuffer = AddressService.HASH_TYPES_MAP[addrObj.type]; - - return { - hashBuffer: addrObj.hashBuffer, - hashTypeBuffer: hashTypeBuffer, - hashTypeReadable: addrObj.type - }; -}; - /** * This function is responsible for emitting events to any subscribers to the * `address/transaction` event. @@ -926,12 +695,12 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options txidBuffer = new Buffer(txid, 'hex'); } if (options.queryMempool) { - var spentIndexSyncKey = this._encodeSpentIndexSyncKey(txidBuffer, outputIndex); + var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(txidBuffer, outputIndex); if (this.mempoolSpentIndex[spentIndexSyncKey]) { return this._getSpentMempool(txidBuffer, outputIndex, callback); } } - var key = this._encodeInputKeyMap(txidBuffer, outputIndex); + var key = encoding.encodeInputKeyMap(txidBuffer, outputIndex); var dbOptions = { valueEncoding: 'binary', keyEncoding: 'binary' @@ -942,7 +711,7 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options } else if (err) { return callback(err); } - var value = self._decodeInputValueMap(buffer); + var value = encoding.decodeInputValueMap(buffer); callback(null, { inputTxId: value.inputTxId.toString('hex'), inputIndex: value.inputIndex @@ -950,10 +719,78 @@ AddressService.prototype.getInputForOutput = function(txid, outputIndex, options }); }; +/** + * A streaming equivalent to `getInputs`, and returns a transform stream with data + * emitted in the same format as `getInputs`. + * + * @param {String} addressStr - The relevant address + * @param {Object} options - Additional options for query the outputs + * @param {Number} [options.start] - The relevant start block height + * @param {Number} [options.end] - The relevant end block height + * @param {Function} callback + */ +AddressService.prototype.createInputsStream = function(addressStr, options, callback) { + + var inputStream = new InputsTransformStream({ + address: new Address(addressStr, this.node.network), + tipHeight: this.node.services.db.tip.__height + }); + + var stream = this.createInputsDBStream(addressStr, options).pipe(inputStream); + + return stream; + +}; + +AddressService.prototype.createInputsDBStream = function(addressStr, options) { + var stream; + var addrObj = encoding.getAddressInfo(addressStr); + var hashBuffer = addrObj.hashBuffer; + var hashTypeBuffer = addrObj.hashTypeBuffer; + + if (options.start && options.end) { + + var endBuffer = new Buffer(4); + endBuffer.writeUInt32BE(options.end); + + var startBuffer = new Buffer(4); + startBuffer.writeUInt32BE(options.start + 1); + + stream = this.node.services.db.store.createReadStream({ + gte: Buffer.concat([ + constants.PREFIXES.SPENTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + endBuffer + ]), + lte: Buffer.concat([ + constants.PREFIXES.SPENTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + startBuffer + ]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + } else { + var allKey = Buffer.concat([constants.PREFIXES.SPENTS, hashBuffer, hashTypeBuffer]); + stream = this.node.services.db.store.createReadStream({ + gte: Buffer.concat([allKey, constants.SPACER_MIN]), + lte: Buffer.concat([allKey, constants.SPACER_MAX]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + } + + return stream; +}; + /** * Will give inputs that spend previous outputs for an address as an object with: * address - The base58check encoded address - * hashType - The type of the address, e.g. 'pubkeyhash' or 'scripthash' + * hashtype - The type of the address, e.g. 'pubkeyhash' or 'scripthash' * txid - A string of the transaction hash * outputIndex - A number of corresponding transaction input * height - The height of the block the transaction was included, will be -1 for mempool transactions @@ -971,67 +808,15 @@ AddressService.prototype.getInputs = function(addressStr, options, callback) { var self = this; var inputs = []; - var stream; - var addrObj = this._getAddressInfo(addressStr); + var addrObj = encoding.getAddressInfo(addressStr); var hashBuffer = addrObj.hashBuffer; var hashTypeBuffer = addrObj.hashTypeBuffer; - if (!hashTypeBuffer) { - return callback(new Error('Unknown address type: ' + addrObj.hashTypeReadable + ' for address: ' + addressStr)); - } - if (options.start && options.end) { - - var endBuffer = new Buffer(4); - endBuffer.writeUInt32BE(options.end); - - var startBuffer = new Buffer(4); - startBuffer.writeUInt32BE(options.start + 1); - - stream = this.node.services.db.store.createReadStream({ - gte: Buffer.concat([ - AddressService.PREFIXES.SPENTS, - hashBuffer, - hashTypeBuffer, - AddressService.SPACER_MIN, - endBuffer - ]), - lte: Buffer.concat([ - AddressService.PREFIXES.SPENTS, - hashBuffer, - hashTypeBuffer, - AddressService.SPACER_MIN, - startBuffer - ]), - valueEncoding: 'binary', - keyEncoding: 'binary' - }); - } else { - var allKey = Buffer.concat([AddressService.PREFIXES.SPENTS, hashBuffer, hashTypeBuffer]); - stream = this.node.services.db.store.createReadStream({ - gte: Buffer.concat([allKey, AddressService.SPACER_MIN]), - lte: Buffer.concat([allKey, AddressService.SPACER_MAX]), - valueEncoding: 'binary', - keyEncoding: 'binary' - }); - } - - stream.on('data', function(data) { - - var key = self._decodeInputKey(data.key); - var value = self._decodeInputValue(data.value); - - var input = { - address: addressStr, - hashType: addrObj.hashTypeReadable, - txid: value.txid.toString('hex'), - inputIndex: value.inputIndex, - height: key.height, - confirmations: self.node.services.db.tip.__height - key.height + 1 - }; + var stream = this.createInputsStream(addressStr, options); + stream.on('data', function(input) { inputs.push(input); - }); var error; @@ -1042,7 +827,7 @@ AddressService.prototype.getInputs = function(addressStr, options, callback) { } }); - stream.on('close', function() { + stream.on('end', function() { if (error) { return callback(error); } @@ -1071,16 +856,16 @@ AddressService.prototype._getInputsMempool = function(addressStr, hashBuffer, ha var stream = self.mempoolIndex.createReadStream({ gte: Buffer.concat([ - AddressService.MEMPREFIXES.SPENTS, + constants.MEMPREFIXES.SPENTS, hashBuffer, hashTypeBuffer, - AddressService.SPACER_MIN + constants.SPACER_MIN ]), lte: Buffer.concat([ - AddressService.MEMPREFIXES.SPENTS, + constants.MEMPREFIXES.SPENTS, hashBuffer, hashTypeBuffer, - AddressService.SPACER_MAX + constants.SPACER_MAX ]), valueEncoding: 'binary', keyEncoding: 'binary' @@ -1091,7 +876,7 @@ AddressService.prototype._getInputsMempool = function(addressStr, hashBuffer, ha var inputIndex = data.value.readUInt32BE(32); var output = { address: addressStr, - hashType: AddressService.HASH_TYPES_READABLE[hashTypeBuffer.toString('hex')], + hashType: constants.HASH_TYPES_READABLE[hashTypeBuffer.toString('hex')], txid: txid.toString('hex'), //TODO use a buffer inputIndex: inputIndex, height: -1, @@ -1121,7 +906,7 @@ AddressService.prototype._getSpentMempool = function(txidBuffer, outputIndex, ca var outputIndexBuffer = new Buffer(4); outputIndexBuffer.writeUInt32BE(outputIndex); var spentIndexKey = Buffer.concat([ - AddressService.MEMPREFIXES.SPENTSMAP, + constants.MEMPREFIXES.SPENTSMAP, txidBuffer, outputIndexBuffer ]); @@ -1142,10 +927,69 @@ AddressService.prototype._getSpentMempool = function(txidBuffer, outputIndex, ca ); }; +AddressService.prototype.createOutputsStream = function(addressStr, options) { + + var outputStream = new OutputsTransformStream({ + address: new Address(addressStr, this.node.network), + tipHeight: this.node.services.db.tip.__height + }); + + var stream = this.createOutputsDBStream(addressStr, options).pipe(outputStream); + + return stream; + +}; + +AddressService.prototype.createOutputsDBStream = function(addressStr, options) { + + var addrObj = encoding.getAddressInfo(addressStr); + var hashBuffer = addrObj.hashBuffer; + var hashTypeBuffer = addrObj.hashTypeBuffer; + var stream; + + if (options.start && options.end) { + + var startBuffer = new Buffer(4); + startBuffer.writeUInt32BE(options.start + 1); + var endBuffer = new Buffer(4); + endBuffer.writeUInt32BE(options.end); + + stream = this.node.services.db.store.createReadStream({ + gte: Buffer.concat([ + constants.PREFIXES.OUTPUTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + endBuffer + ]), + lte: Buffer.concat([ + constants.PREFIXES.OUTPUTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + startBuffer + ]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + } else { + var allKey = Buffer.concat([constants.PREFIXES.OUTPUTS, hashBuffer, hashTypeBuffer]); + stream = this.node.services.db.store.createReadStream({ + gte: Buffer.concat([allKey, constants.SPACER_MIN]), + lte: Buffer.concat([allKey, constants.SPACER_MAX]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + } + + return stream; + +}; + /** * Will give outputs for an address as an object with: * address - The base58check encoded address - * hashType - The type of the address, e.g. 'pubkeyhash' or 'scripthash' + * hashtype - The type of the address, e.g. 'pubkeyhash' or 'scripthash' * txid - A string of the transaction hash * outputIndex - A number of corresponding transaction output * height - The height of the block the transaction was included, will be -1 for mempool transactions @@ -1165,7 +1009,7 @@ AddressService.prototype.getOutputs = function(addressStr, options, callback) { $.checkArgument(_.isObject(options), 'Second argument is expected to be an options object.'); $.checkArgument(_.isFunction(callback), 'Third argument is expected to be a callback function.'); - var addrObj = this._getAddressInfo(addressStr); + var addrObj = encoding.getAddressInfo(addressStr); var hashBuffer = addrObj.hashBuffer; var hashTypeBuffer = addrObj.hashTypeBuffer; if (!hashTypeBuffer) { @@ -1173,61 +1017,10 @@ AddressService.prototype.getOutputs = function(addressStr, options, callback) { } var outputs = []; - var stream; - - if (options.start && options.end) { - - var startBuffer = new Buffer(4); - startBuffer.writeUInt32BE(options.start + 1); - var endBuffer = new Buffer(4); - endBuffer.writeUInt32BE(options.end); - - stream = this.node.services.db.store.createReadStream({ - gte: Buffer.concat([ - AddressService.PREFIXES.OUTPUTS, - hashBuffer, - hashTypeBuffer, - AddressService.SPACER_MIN, - endBuffer - ]), - lte: Buffer.concat([ - AddressService.PREFIXES.OUTPUTS, - hashBuffer, - hashTypeBuffer, - AddressService.SPACER_MIN, - startBuffer - ]), - valueEncoding: 'binary', - keyEncoding: 'binary' - }); - } else { - var allKey = Buffer.concat([AddressService.PREFIXES.OUTPUTS, hashBuffer, hashTypeBuffer]); - stream = this.node.services.db.store.createReadStream({ - gte: Buffer.concat([allKey, AddressService.SPACER_MIN]), - lte: Buffer.concat([allKey, AddressService.SPACER_MAX]), - valueEncoding: 'binary', - keyEncoding: 'binary' - }); - } + var stream = this.createOutputsStream(addressStr, options); stream.on('data', function(data) { - - var key = self._decodeOutputKey(data.key); - var value = self._decodeOutputValue(data.value); - - var output = { - address: addressStr, - hashType: addrObj.hashTypeReadable, - txid: key.txid.toString('hex'), //TODO use a buffer - outputIndex: key.outputIndex, - height: key.height, - satoshis: value.satoshis, - script: value.scriptBuffer.toString('hex'), //TODO use a buffer - confirmations: self.node.services.db.tip.__height - key.height + 1 - }; - - outputs.push(output); - + outputs.push(data); }); var error; @@ -1238,7 +1031,7 @@ AddressService.prototype.getOutputs = function(addressStr, options, callback) { } }); - stream.on('close', function() { + stream.on('end', function() { if (error) { return callback(error); } @@ -1266,16 +1059,16 @@ AddressService.prototype._getOutputsMempool = function(addressStr, hashBuffer, h var stream = self.mempoolIndex.createReadStream({ gte: Buffer.concat([ - AddressService.MEMPREFIXES.OUTPUTS, + constants.MEMPREFIXES.OUTPUTS, hashBuffer, hashTypeBuffer, - AddressService.SPACER_MIN + constants.SPACER_MIN ]), lte: Buffer.concat([ - AddressService.MEMPREFIXES.OUTPUTS, + constants.MEMPREFIXES.OUTPUTS, hashBuffer, hashTypeBuffer, - AddressService.SPACER_MAX + constants.SPACER_MAX ]), valueEncoding: 'binary', keyEncoding: 'binary' @@ -1285,10 +1078,10 @@ AddressService.prototype._getOutputsMempool = function(addressStr, hashBuffer, h // Format of data: prefix: 1, hashBuffer: 20, hashTypeBuffer: 1, txid: 32, outputIndex: 4 var txid = data.key.slice(22, 54); var outputIndex = data.key.readUInt32BE(54); - var value = self._decodeOutputValue(data.value); + var value = encoding.decodeOutputValue(data.value); var output = { address: addressStr, - hashType: AddressService.HASH_TYPES_READABLE[hashTypeBuffer.toString('hex')], + hashType: constants.HASH_TYPES_READABLE[hashTypeBuffer.toString('hex')], txid: txid.toString('hex'), //TODO use a buffer outputIndex: outputIndex, height: -1, @@ -1407,7 +1200,7 @@ AddressService.prototype.isSpent = function(output, options, callback) { var spent = self.node.services.bitcoind.isSpent(txid, output.outputIndex); if (!spent && queryMempool) { var txidBuffer = new Buffer(txid, 'hex'); - var spentIndexSyncKey = this._encodeSpentIndexSyncKey(txidBuffer, output.outputIndex); + var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(txidBuffer, output.outputIndex); spent = self.mempoolSpentIndex[spentIndexSyncKey] ? true : false; } setImmediate(function() { @@ -1474,111 +1267,176 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba * @param {Boolean} [options.noTxList] - if set, txid array will not be included * @param {Function} callback */ -AddressService.prototype.getAddressSummary = function(address, options, callback) { +AddressService.prototype.getAddressSummary = function(addressArg, options, callback) { var self = this; - var opt = { - queryMempool: true + var address = new Address(addressArg); + + async.waterfall([ + function(next) { + self._getAddressInputsSummary(address, options, next); + }, + function(result, next) { + self._getAddressOutputsSummary(address, options, result, next); + } + ], function(err, result) { + if (err) { + return callback(err); + } + + var confirmedTxids = Object.keys(result.appearanceIds); + var unconfirmedTxids = Object.keys(result.unconfirmedAppearanceIds); + + var summary = { + totalReceived: result.totalReceived, + totalSpent: result.totalSpent, + balance: result.balance, + unconfirmedBalance: result.unconfirmedBalance, + appearances: confirmedTxids.length, + unconfirmedAppearances: unconfirmedTxids.length + }; + + if (!options.noTxList) { + var txids = confirmedTxids.concat(unconfirmedTxids); + + // sort by height + summary.txids = txids.sort(function(a, b) { + return a.height > b.height ? 1 : -1; + }).map(function(obj) { + return obj.txid; + }).filter(function(value, index, self) { + return self.indexOf(value) === index; + }); + } + + callback(null, summary); + + }); + +}; + +AddressService.prototype._getAddressInputsSummary = function(address, options, callback) { + $.checkArgument(address instanceof Address); + var self = this; + + var error = null; + var result = { + appearanceIds: {}, + unconfirmedAppearanceIds: {}, }; - var outputs; - var inputs; + var inputsStream = self.createInputsStream(address, options); + inputsStream.on('data', function(input) { + var txid = input.txid; + result.appearanceIds[txid] = true; + }); - async.parallel( - [ - function(next) { - self.getInputs(address, opt, function(err, ins) { - inputs = ins; - next(err); - }); - }, - function(next) { - self.getOutputs(address, opt, function(err, outs) { - outputs = outs; - next(err); - }); + inputsStream.on('error', function(err) { + error = err; + }); + + inputsStream.on('end', function() { + + var addressStr = address.toString(); + var hashBuffer = address.hashBuffer; + var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type]; + + self._getInputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolInputs) { + if (err) { + return callback(err); } - ], - function(err) { - if(err) { + for(var i = 0; i < mempoolInputs.length; i++) { + var input = mempoolInputs[i]; + result.unconfirmedAppearanceIds[input.txid] = true; + } + callback(error, result); + }); + }); +}; + +AddressService.prototype._getAddressOutputsSummary = function(address, options, result, callback) { + $.checkArgument(address instanceof Address); + $.checkArgument(!_.isUndefined(result) && + !_.isUndefined(result.appearanceIds) && + !_.isUndefined(result.unconfirmedAppearanceIds)); + + var self = this; + + var outputStream = self.createOutputsStream(address, options); + + result.totalReceived = 0; + result.totalSpent = 0; + result.balance = 0; + result.unconfirmedBalance = 0; + + outputStream.on('data', function(output) { + + var txid = output.txid; + var outputIndex = output.outputIndex; + + // Bitcoind's isSpent only works for confirmed transactions + var spentDB = self.node.services.bitcoind.isSpent(txid, outputIndex); + result.totalReceived += output.satoshis; + result.appearanceIds[txid] = true; + + if (spentDB) { + result.totalSpent += output.satoshis; + } else { + result.balance += output.satoshis; + } + + // Check to see if this output is spent in the mempool and if so + // we will subtract it from the unconfirmedBalance (a.k.a unconfirmedDelta) + var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( + new Buffer(txid, 'hex'), // TODO: get buffer directly + outputIndex + ); + var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; + if (spentMempool) { + result.unconfirmedBalance -= output.satoshis; + } + + }); + + var error = null; + + outputStream.on('error', function(err) { + error = err; + }); + + outputStream.on('end', function() { + + var addressStr = address.toString(); + var hashBuffer = address.hashBuffer; + var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type]; + + self._getOutputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolOutputs) { + if (err) { return callback(err); } - var totalReceived = 0; - var totalSpent = 0; - var balance = 0; - var unconfirmedBalance = 0; - var appearanceIds = {}; - var unconfirmedAppearanceIds = {}; - var txids = []; + for(var i = 0; i < mempoolOutputs.length; i++) { + var output = mempoolOutputs[i]; - for(var i = 0; i < outputs.length; i++) { - // Bitcoind's isSpent only works for confirmed transactions - var spentDB = self.node.services.bitcoind.isSpent(outputs[i].txid, outputs[i].outputIndex); - var spentIndexSyncKey = self._encodeSpentIndexSyncKey( - new Buffer(outputs[i].txid, 'hex'), // TODO: get buffer directly - outputs[i].outputIndex + result.unconfirmedAppearanceIds[output.txid] = true; + + var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( + new Buffer(output.txid, 'hex'), // TODO: get buffer directly + output.outputIndex ); var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; - - txids.push(outputs[i]); - - if(outputs[i].confirmations) { - totalReceived += outputs[i].satoshis; - balance += outputs[i].satoshis; - appearanceIds[outputs[i].txid] = true; - } else { - unconfirmedBalance += outputs[i].satoshis; - unconfirmedAppearanceIds[outputs[i].txid] = true; - } - - if(spentDB || spentMempool) { - if(spentDB) { - totalSpent += outputs[i].satoshis; - balance -= outputs[i].satoshis; - } else if(!outputs[i].confirmations) { - unconfirmedBalance -= outputs[i].satoshis; - } + // Only add this to the balance if it's not spent in the mempool already + if (!spentMempool) { + result.unconfirmedBalance += output.satoshis; } } - for(var j = 0; j < inputs.length; j++) { - if (inputs[j].confirmations) { - appearanceIds[inputs[j].txid] = true; - } else { - unconfirmedAppearanceIds[outputs[j].txid] = true; - } - } + callback(error, result); - var summary = { - totalReceived: totalReceived, - totalSpent: totalSpent, - balance: balance, - unconfirmedBalance: unconfirmedBalance, - appearances: Object.keys(appearanceIds).length, - unconfirmedAppearances: Object.keys(unconfirmedAppearanceIds).length - }; + }); - if(!options.noTxList) { - for(var i = 0; i < inputs.length; i++) { - txids.push(inputs[i]); - } + }); - // sort by height - txids = txids.sort(function(a, b) { - return a.height > b.height ? 1 : -1; - }).map(function(obj) { - return obj.txid; - }).filter(function(value, index, self) { - return self.indexOf(value) === index; - }); - - summary.txids = txids; - } - - callback(null, summary); - } - ); }; module.exports = AddressService; diff --git a/lib/services/address/streams/inputs-transform.js b/lib/services/address/streams/inputs-transform.js new file mode 100644 index 00000000..8b8f71d3 --- /dev/null +++ b/lib/services/address/streams/inputs-transform.js @@ -0,0 +1,40 @@ +'use strict'; + +var Transform = require('stream').Transform; +var inherits = require('util').inherits; +var bitcore = require('bitcore-lib'); +var encodingUtil = require('../encoding'); +var $ = bitcore.util.preconditions; + +function InputsTransformStream(options) { + $.checkArgument(options.address instanceof bitcore.Address); + Transform.call(this, { + objectMode: true + }); + this._address = options.address; + this._addressStr = this._address.toString(); + this._tipHeight = options.tipHeight; +} +inherits(InputsTransformStream, Transform); + +InputsTransformStream.prototype._transform = function(chunk, encoding, callback) { + var self = this; + + var key = encodingUtil.decodeInputKey(chunk.key); + var value = encodingUtil.decodeInputValue(chunk.value); + + var input = { + address: this._addressStr, + hashType: this._address.type, + txid: value.txid.toString('hex'), + inputIndex: value.inputIndex, + height: key.height, + confirmations: this._tipHeight - key.height + 1 + }; + + self.push(input); + callback(); + +}; + +module.exports = InputsTransformStream; diff --git a/lib/services/address/streams/outputs-transform.js b/lib/services/address/streams/outputs-transform.js new file mode 100644 index 00000000..b9c8e8d3 --- /dev/null +++ b/lib/services/address/streams/outputs-transform.js @@ -0,0 +1,42 @@ +'use strict'; + +var Transform = require('stream').Transform; +var inherits = require('util').inherits; +var bitcore = require('bitcore-lib'); +var encodingUtil = require('../encoding'); +var $ = bitcore.util.preconditions; + +function OutputsTransformStream(options) { + Transform.call(this, { + objectMode: true + }); + $.checkArgument(options.address instanceof bitcore.Address); + this._address = options.address; + this._addressStr = this._address.toString(); + this._tipHeight = options.tipHeight; +} +inherits(OutputsTransformStream, Transform); + +OutputsTransformStream.prototype._transform = function(chunk, encoding, callback) { + var self = this; + + var key = encodingUtil.decodeOutputKey(chunk.key); + var value = encodingUtil.decodeOutputValue(chunk.value); + + var output = { + address: this._addressStr, + hashType: this._address.type, + txid: key.txid.toString('hex'), //TODO use a buffer + outputIndex: key.outputIndex, + height: key.height, + satoshis: value.satoshis, + script: value.scriptBuffer.toString('hex'), //TODO use a buffer + confirmations: this._tipHeight - key.height + 1 + }; + + self.push(output); + callback(); + +}; + +module.exports = OutputsTransformStream;