diff --git a/lib/node.js b/lib/node.js index d7e95c4f..bebc0288 100644 --- a/lib/node.js +++ b/lib/node.js @@ -49,6 +49,7 @@ function Node(config) { this.log.formatting = config.formatLogs ? true : false; } + this.datadir = config.datadir; this.network = null; this.services = {}; this._unloadedServices = []; diff --git a/lib/scaffold/default-config.js b/lib/scaffold/default-config.js index 7075a7fc..f8f0b273 100644 --- a/lib/scaffold/default-config.js +++ b/lib/scaffold/default-config.js @@ -24,7 +24,7 @@ function getDefaultConfig(options) { mkdirp.sync(defaultPath); } - var defaultServices = ['bitcoind', 'web']; + var defaultServices = ['bitcoind', 'db', 'address', 'web']; if (options.additionalServices) { defaultServices = defaultServices.concat(options.additionalServices); } diff --git a/lib/scaffold/start.js b/lib/scaffold/start.js index e8654575..e1a4acf9 100644 --- a/lib/scaffold/start.js +++ b/lib/scaffold/start.js @@ -15,6 +15,9 @@ log.debug = function() {}; * "db" service, or having "datadir" at the root of the config. */ function checkConfigVersion2(fullConfig) { + // TODO: Remove + return false; + var datadirUndefined = _.isUndefined(fullConfig.datadir); var addressDefined = (fullConfig.services.indexOf('address') >= 0); var dbDefined = (fullConfig.services.indexOf('db') >= 0); @@ -78,10 +81,6 @@ function start(options) { fullConfig.path = path.resolve(options.path, './bitcore-node.json'); - if (checkConfigVersion2(fullConfig)) { - process.exit(1); - } - fullConfig.services = start.setupServices(require, servicesPath, options.config); var node = new BitcoreNode(fullConfig); @@ -138,6 +137,7 @@ function loadModule(req, service) { // first try in the built-in bitcore-node services directory service.module = req(path.resolve(__dirname, '../services/' + service.name)); } catch(e) { + console.log(e); // check if the package.json specifies a specific file to use var servicePackage = req(service.name + '/package.json'); diff --git a/lib/services/address/constants.js b/lib/services/address/constants.js new file mode 100644 index 00000000..3653e9cb --- /dev/null +++ b/lib/services/address/constants.js @@ -0,0 +1,56 @@ +'use strict'; + +var exports = {}; + +exports.PREFIXES = { + OUTPUTS: new Buffer('02', 'hex'), // Query outputs by address and/or height + SPENTS: new Buffer('03', 'hex'), // Query inputs by address and/or height + SPENTSMAP: new Buffer('05', 'hex') // Get the input that spends an output +}; + +exports.MEMPREFIXES = { + OUTPUTS: new Buffer('01', 'hex'), // Query mempool outputs by address + SPENTS: new Buffer('02', 'hex'), // Query mempool inputs by address + SPENTSMAP: new Buffer('03', 'hex') // Query mempool for the input that spends an output +}; + +// To save space, we're only storing the PubKeyHash or ScriptHash in our index. +// To avoid intentional unspendable collisions, which have been seen on the blockchain, +// we must store the hash type (PK or Script) as well. +exports.HASH_TYPES = { + PUBKEY: new Buffer('01', 'hex'), + REDEEMSCRIPT: new Buffer('02', 'hex') +}; + +// Translates from our enum type back into the hash types returned by +// bitcore-lib/address. +exports.HASH_TYPES_READABLE = { + '01': 'pubkeyhash', + '02': 'scripthash' +}; + +exports.HASH_TYPES_MAP = { + 'pubkeyhash': exports.HASH_TYPES.PUBKEY, + 'scripthash': exports.HASH_TYPES.REDEEMSCRIPT +}; + +exports.SPACER_MIN = new Buffer('00', 'hex'); +exports.SPACER_MAX = new Buffer('ff', 'hex'); +exports.SPACER_HEIGHT_MIN = new Buffer('0000000000', 'hex'); +exports.SPACER_HEIGHT_MAX = new Buffer('ffffffffff', 'hex'); +exports.TIMESTAMP_MIN = new Buffer('0000000000000000', 'hex'); +exports.TIMESTAMP_MAX = new Buffer('ffffffffffffffff', 'hex'); + +// The maximum number of inputs that can be queried at once +exports.MAX_INPUTS_QUERY_LENGTH = 50000; +// The maximum number of outputs that can be queried at once +exports.MAX_OUTPUTS_QUERY_LENGTH = 50000; +// The maximum number of transactions that can be queried at once +exports.MAX_HISTORY_QUERY_LENGTH = 100; +// The maximum number of addresses that can be queried at once +exports.MAX_ADDRESSES_QUERY = 10000; +// The maximum number of simultaneous requests +exports.MAX_ADDRESSES_LIMIT = 5; + +module.exports = exports; + diff --git a/lib/services/address/encoding.js b/lib/services/address/encoding.js new file mode 100644 index 00000000..8ebce51c --- /dev/null +++ b/lib/services/address/encoding.js @@ -0,0 +1,307 @@ +'use strict'; + +var bitcore = require('bitcore-lib'); +var BufferReader = bitcore.encoding.BufferReader; +var Address = bitcore.Address; +var PublicKey = bitcore.PublicKey; +var constants = require('./constants'); +var $ = bitcore.util.preconditions; + +var exports = {}; + +exports.encodeSpentIndexSyncKey = function(txidBuffer, outputIndex) { + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var key = Buffer.concat([ + txidBuffer, + outputIndexBuffer + ]); + return key.toString('binary'); +}; + +exports.encodeMempoolAddressIndexKey = function(hashBuffer, hashTypeBuffer) { + var key = Buffer.concat([ + hashBuffer, + hashTypeBuffer, + ]); + return key.toString('binary'); +}; + + +exports.encodeOutputKey = function(hashBuffer, hashTypeBuffer, height, txidBuffer, outputIndex) { + var heightBuffer = new Buffer(4); + heightBuffer.writeUInt32BE(height); + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var key = Buffer.concat([ + constants.PREFIXES.OUTPUTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + heightBuffer, + txidBuffer, + outputIndexBuffer + ]); + return key; +}; + +exports.decodeOutputKey = function(buffer) { + var reader = new BufferReader(buffer); + var prefix = reader.read(1); + var hashBuffer = reader.read(20); + var hashTypeBuffer = reader.read(1); + var spacer = reader.read(1); + var height = reader.readUInt32BE(); + var txid = reader.read(32); + var outputIndex = reader.readUInt32BE(); + return { + prefix: prefix, + hashBuffer: hashBuffer, + hashTypeBuffer: hashTypeBuffer, + height: height, + txid: txid, + outputIndex: outputIndex + }; +}; + +exports.encodeOutputValue = function(satoshis, scriptBuffer) { + var satoshisBuffer = new Buffer(8); + satoshisBuffer.writeDoubleBE(satoshis); + return Buffer.concat([satoshisBuffer, scriptBuffer]); +}; + +exports.encodeOutputMempoolValue = function(satoshis, timestampBuffer, scriptBuffer) { + var satoshisBuffer = new Buffer(8); + satoshisBuffer.writeDoubleBE(satoshis); + return Buffer.concat([satoshisBuffer, timestampBuffer, scriptBuffer]); +}; + +exports.decodeOutputValue = function(buffer) { + var satoshis = buffer.readDoubleBE(0); + var scriptBuffer = buffer.slice(8, buffer.length); + return { + satoshis: satoshis, + scriptBuffer: scriptBuffer + }; +}; + +exports.decodeOutputMempoolValue = function(buffer) { + var satoshis = buffer.readDoubleBE(0); + var timestamp = buffer.readDoubleBE(8); + var scriptBuffer = buffer.slice(16, buffer.length); + return { + satoshis: satoshis, + timestamp: timestamp, + scriptBuffer: scriptBuffer + }; +}; + +exports.encodeInputKey = function(hashBuffer, hashTypeBuffer, height, prevTxIdBuffer, outputIndex) { + var heightBuffer = new Buffer(4); + heightBuffer.writeUInt32BE(height); + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + return Buffer.concat([ + constants.PREFIXES.SPENTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + heightBuffer, + prevTxIdBuffer, + outputIndexBuffer + ]); +}; + +exports.decodeInputKey = function(buffer) { + var reader = new BufferReader(buffer); + var prefix = reader.read(1); + var hashBuffer = reader.read(20); + var hashTypeBuffer = reader.read(1); + var spacer = reader.read(1); + var height = reader.readUInt32BE(); + var prevTxId = reader.read(32); + var outputIndex = reader.readUInt32BE(); + return { + prefix: prefix, + hashBuffer: hashBuffer, + hashTypeBuffer: hashTypeBuffer, + height: height, + prevTxId: prevTxId, + outputIndex: outputIndex + }; +}; + +exports.encodeInputValue = function(txidBuffer, inputIndex) { + var inputIndexBuffer = new Buffer(4); + inputIndexBuffer.writeUInt32BE(inputIndex); + return Buffer.concat([ + txidBuffer, + inputIndexBuffer + ]); +}; + +exports.decodeInputValue = function(buffer) { + var txid = buffer.slice(0, 32); + var inputIndex = buffer.readUInt32BE(32); + return { + txid: txid, + inputIndex: inputIndex + }; +}; + +exports.encodeInputKeyMap = function(outputTxIdBuffer, outputIndex) { + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + return Buffer.concat([ + constants.PREFIXES.SPENTSMAP, + outputTxIdBuffer, + outputIndexBuffer + ]); +}; + +exports.decodeInputKeyMap = function(buffer) { + var txid = buffer.slice(1, 33); + var outputIndex = buffer.readUInt32BE(33); + return { + outputTxId: txid, + outputIndex: outputIndex + }; +}; + +exports.encodeInputValueMap = function(inputTxIdBuffer, inputIndex) { + var inputIndexBuffer = new Buffer(4); + inputIndexBuffer.writeUInt32BE(inputIndex); + return Buffer.concat([ + inputTxIdBuffer, + inputIndexBuffer + ]); +}; + +exports.decodeInputValueMap = function(buffer) { + var txid = buffer.slice(0, 32); + var inputIndex = buffer.readUInt32BE(32); + return { + inputTxId: txid, + inputIndex: inputIndex + }; +}; + +exports.encodeSummaryCacheKey = function(address) { + return Buffer.concat([address.hashBuffer, constants.HASH_TYPES_MAP[address.type]]); +}; + +exports.decodeSummaryCacheKey = function(buffer, network) { + var hashBuffer = buffer.read(20); + var type = constants.HASH_TYPES_READABLE[buffer.read(20, 2).toString('hex')]; + var address = new Address({ + hashBuffer: hashBuffer, + type: type, + network: network + }); + return address; +}; + +exports.encodeSummaryCacheValue = function(cache, tipHeight, tipHash) { + var tipHashBuffer = new Buffer(tipHash, 'hex'); + var buffer = new Buffer(new Array(20)); + buffer.writeUInt32BE(tipHeight); + buffer.writeDoubleBE(cache.result.totalReceived, 4); + buffer.writeDoubleBE(cache.result.balance, 12); + var txidBuffers = []; + for (var i = 0; i < cache.result.txids.length; i++) { + var buf = new Buffer(new Array(36)); + var txid = cache.result.txids[i]; + buf.write(txid, 'hex'); + buf.writeUInt32BE(cache.result.appearanceIds[txid], 32); + txidBuffers.push(buf); + } + var txidsBuffer = Buffer.concat(txidBuffers); + var value = Buffer.concat([tipHashBuffer, buffer, txidsBuffer]); + + return value; +}; + +exports.decodeSummaryCacheValue = function(buffer) { + + var hash = buffer.slice(0, 32).toString('hex'); + var height = buffer.readUInt32BE(32); + var totalReceived = buffer.readDoubleBE(36); + var balance = buffer.readDoubleBE(44); + + // read 32 byte chunks until exhausted + var appearanceIds = {}; + var txids = []; + var pos = 52; + while(pos < buffer.length) { + var txid = buffer.slice(pos, pos + 32).toString('hex'); + var txidHeight = buffer.readUInt32BE(pos + 32); + txids.push(txid); + appearanceIds[txid] = txidHeight; + pos += 36; + } + + var cache = { + height: height, + hash: hash, + result: { + appearanceIds: appearanceIds, + txids: txids, + totalReceived: totalReceived, + balance: balance, + unconfirmedAppearanceIds: {}, // unconfirmed values are never stored in cache + unconfirmedBalance: 0 + } + }; + + return cache; +}; + +exports.getAddressInfo = function(addressStr) { + var addrObj = bitcore.Address(addressStr); + var hashTypeBuffer = constants.HASH_TYPES_MAP[addrObj.type]; + + return { + hashBuffer: addrObj.hashBuffer, + hashTypeBuffer: hashTypeBuffer, + hashTypeReadable: addrObj.type + }; +}; + +/** + * This function is optimized to return address information about an output script + * without constructing a Bitcore Address instance. + * @param {Script} - An instance of a Bitcore Script + * @param {Network|String} - The network for the address + */ +exports.extractAddressInfoFromScript = function(script, network) { + $.checkArgument(network, 'Second argument is expected to be a network'); + var hashBuffer; + var addressType; + var hashTypeBuffer; + if (script.isPublicKeyHashOut()) { + hashBuffer = script.chunks[2].buf; + hashTypeBuffer = constants.HASH_TYPES.PUBKEY; + addressType = Address.PayToPublicKeyHash; + } else if (script.isScriptHashOut()) { + hashBuffer = script.chunks[1].buf; + hashTypeBuffer = constants.HASH_TYPES.REDEEMSCRIPT; + addressType = Address.PayToScriptHash; + } else if (script.isPublicKeyOut()) { + var pubkey = script.chunks[0].buf; + var address = Address.fromPublicKey(new PublicKey(pubkey), network); + hashBuffer = address.hashBuffer; + hashTypeBuffer = constants.HASH_TYPES.PUBKEY; + // pay-to-publickey doesn't have an address, however for compatibility + // purposes, we can create an address + addressType = Address.PayToPublicKeyHash; + } else { + return false; + } + return { + hashBuffer: hashBuffer, + hashTypeBuffer: hashTypeBuffer, + addressType: addressType + }; +}; + +module.exports = exports; diff --git a/lib/services/address/history.js b/lib/services/address/history.js new file mode 100644 index 00000000..88b5c96b --- /dev/null +++ b/lib/services/address/history.js @@ -0,0 +1,266 @@ +'use strict'; + +var bitcore = require('bitcore-lib'); +var async = require('async'); +var _ = bitcore.deps._; + +var constants = require('./constants'); + +/** + * This represents an instance that keeps track of data over a series of + * asynchronous I/O calls to get the transaction history for a group of + * addresses. History can be queried by start and end block heights to limit large sets + * of results (uses leveldb key streaming). + */ +function AddressHistory(args) { + this.node = args.node; + this.options = args.options; + + if(Array.isArray(args.addresses)) { + this.addresses = args.addresses; + } else { + this.addresses = [args.addresses]; + } + + this.maxHistoryQueryLength = args.options.maxHistoryQueryLength || constants.MAX_HISTORY_QUERY_LENGTH; + this.maxAddressesQuery = args.options.maxAddressesQuery || constants.MAX_ADDRESSES_QUERY; + this.maxAddressesLimit = args.options.maxAddressesLimit || constants.MAX_ADDRESSES_LIMIT; + + this.addressStrings = []; + for (var i = 0; i < this.addresses.length; i++) { + var address = this.addresses[i]; + if (address instanceof bitcore.Address) { + this.addressStrings.push(address.toString()); + } else if (_.isString(address)) { + this.addressStrings.push(address); + } else { + throw new TypeError('Addresses are expected to be strings'); + } + } + + this.detailedArray = []; +} + +AddressHistory.prototype._mergeAndSortTxids = function(summaries) { + var appearanceIds = {}; + var unconfirmedAppearanceIds = {}; + + for (var i = 0; i < summaries.length; i++) { + var summary = summaries[i]; + for (var key in summary.appearanceIds) { + appearanceIds[key] = summary.appearanceIds[key]; + delete summary.appearanceIds[key]; + } + for (var unconfirmedKey in summary.unconfirmedAppearanceIds) { + unconfirmedAppearanceIds[unconfirmedKey] = summary.unconfirmedAppearanceIds[unconfirmedKey]; + delete summary.unconfirmedAppearanceIds[key]; + } + } + var confirmedTxids = Object.keys(appearanceIds); + confirmedTxids.sort(function(a, b) { + // Confirmed are sorted by height + return appearanceIds[a] - appearanceIds[b]; + }); + var unconfirmedTxids = Object.keys(unconfirmedAppearanceIds); + unconfirmedTxids.sort(function(a, b) { + // Unconfirmed are sorted by timestamp + return unconfirmedAppearanceIds[a] - unconfirmedAppearanceIds[b]; + }); + return confirmedTxids.concat(unconfirmedTxids); +}; + +/** + * This function will give detailed history for the configured + * addresses. See AddressService.prototype.getAddressHistory + * for complete documentation about options and response format. + */ +AddressHistory.prototype.get = function(callback) { + var self = this; + if (this.addresses.length > this.maxAddressesQuery) { + return callback(new TypeError('Maximum number of addresses (' + this.maxAddressesQuery + ') exceeded')); + } + + var opts = _.clone(this.options); + opts.noBalance = true; + + if (this.addresses.length === 1) { + var address = this.addresses[0]; + self.node.services.address.getAddressSummary(address, opts, function(err, summary) { + if (err) { + return callback(err); + } + return self._paginateWithDetails.call(self, summary.txids, callback); + }); + } else { + + opts.fullTxList = true; + async.mapLimit( + self.addresses, + self.maxAddressesLimit, + function(address, next) { + self.node.services.address.getAddressSummary(address, opts, next); + }, + function(err, summaries) { + if (err) { + return callback(err); + } + var txids = self._mergeAndSortTxids(summaries); + return self._paginateWithDetails.call(self, txids, callback); + } + ); + } + +}; + +AddressHistory.prototype._paginateWithDetails = function(allTxids, callback) { + var self = this; + var totalCount = allTxids.length; + + // Slice the page starting with the most recent + var txids; + if (self.options.from >= 0 && self.options.to >= 0) { + var fromOffset = Math.max(0, totalCount - self.options.from); + var toOffset = Math.max(0, totalCount - self.options.to); + txids = allTxids.slice(toOffset, fromOffset); + } else { + txids = allTxids; + } + + // Verify that this query isn't too long + if (txids.length > self.maxHistoryQueryLength) { + return callback(new Error( + 'Maximum length query (' + self.maxHistoryQueryLength + ') exceeded for address(es): ' + + self.addresses.join(',') + )); + } + + // Reverse to include most recent at the top + txids.reverse(); + + async.eachSeries( + txids, + function(txid, next) { + self.getDetailedInfo(txid, next); + }, + function(err) { + if (err) { + return callback(err); + } + callback(null, { + totalCount: totalCount, + items: self.detailedArray + }); + } + ); + +}; + +/** + * This function will transform items from the combinedArray into + * the detailedArray with the full transaction, satoshis and confirmation. + * @param {Object} txInfo - An item from the `combinedArray` + * @param {Function} next + */ +AddressHistory.prototype.getDetailedInfo = function(txid, next) { + var self = this; + var queryMempool = _.isUndefined(self.options.queryMempool) ? true : self.options.queryMempool; + + self.node.services.db.getTransactionWithBlockInfo( + txid, + queryMempool, + function(err, transaction) { + if (err) { + return next(err); + } + + transaction.populateInputs(self.node.services.db, [], function(err) { + if (err) { + return next(err); + } + + var addressDetails = self.getAddressDetailsForTransaction(transaction); + + self.detailedArray.push({ + addresses: addressDetails.addresses, + satoshis: addressDetails.satoshis, + height: transaction.__height, + confirmations: self.getConfirmationsDetail(transaction), + timestamp: transaction.__timestamp, + // TODO bitcore-lib should return null instead of throwing error on coinbase + fees: !transaction.isCoinbase() ? transaction.getFee() : null, + tx: transaction + }); + + next(); + }); + } + ); +}; + +/** + * A helper function for `getDetailedInfo` for getting the confirmations. + * @param {Transaction} transaction - A transaction with a populated __height value. + */ +AddressHistory.prototype.getConfirmationsDetail = function(transaction) { + var confirmations = 0; + if (transaction.__height >= 0) { + confirmations = this.node.services.db.tip.__height - transaction.__height + 1; + } + return confirmations; +}; + +AddressHistory.prototype.getAddressDetailsForTransaction = function(transaction) { + var result = { + addresses: {}, + satoshis: 0 + }; + + for (var inputIndex = 0; inputIndex < transaction.inputs.length; inputIndex++) { + var input = transaction.inputs[inputIndex]; + if (!input.script) { + continue; + } + var inputAddress = input.script.toAddress(this.node.network); + if (inputAddress) { + var inputAddressString = inputAddress.toString(); + if (this.addressStrings.indexOf(inputAddressString) >= 0) { + if (!result.addresses[inputAddressString]) { + result.addresses[inputAddressString] = { + inputIndexes: [inputIndex], + outputIndexes: [] + }; + } else { + result.addresses[inputAddressString].inputIndexes.push(inputIndex); + } + result.satoshis -= input.output.satoshis; + } + } + } + + for (var outputIndex = 0; outputIndex < transaction.outputs.length; outputIndex++) { + var output = transaction.outputs[outputIndex]; + if (!output.script) { + continue; + } + var outputAddress = output.script.toAddress(this.node.network); + if (outputAddress) { + var outputAddressString = outputAddress.toString(); + if (this.addressStrings.indexOf(outputAddressString) >= 0) { + if (!result.addresses[outputAddressString]) { + result.addresses[outputAddressString] = { + inputIndexes: [], + outputIndexes: [outputIndex] + }; + } else { + result.addresses[outputAddressString].outputIndexes.push(outputIndex); + } + result.satoshis += output.satoshis; + } + } + } + + return result; + +}; + +module.exports = AddressHistory; diff --git a/lib/services/address/index.js b/lib/services/address/index.js new file mode 100644 index 00000000..2896a178 --- /dev/null +++ b/lib/services/address/index.js @@ -0,0 +1,1614 @@ +'use strict'; + +var fs = require('fs'); +var BaseService = require('../../service'); +var inherits = require('util').inherits; +var async = require('async'); +var mkdirp = require('mkdirp'); +var index = require('../../'); +var log = index.log; +var errors = index.errors; +var bitcore = require('bitcore-lib'); +var Networks = bitcore.Networks; +var levelup = require('levelup'); +var leveldown = require('leveldown'); +var memdown = require('memdown'); +var $ = bitcore.util.preconditions; +var _ = bitcore.deps._; +var Hash = bitcore.crypto.Hash; +var EventEmitter = require('events').EventEmitter; +var Address = bitcore.Address; +var AddressHistory = require('./history'); +var constants = require('./constants'); +var encoding = require('./encoding'); +var InputsTransformStream = require('./streams/inputs-transform'); +var OutputsTransformStream = require('./streams/outputs-transform'); + + +/** + * The Address Service builds upon the Database Service and the Bitcoin Service to add additional + * functionality for getting information by base58check encoded addresses. This includes getting the + * balance for an address, the history for a collection of addresses, and unspent outputs for + * constructing transactions. This is typically the core functionality for building a wallet. + * @param {Object} options + * @param {Node} options.node - An instance of the node + * @param {String} options.name - An optional name of the service + */ +var AddressService = function(options) { + BaseService.call(this, options); + + this.subscriptions = {}; + this.subscriptions['address/transaction'] = {}; + this.subscriptions['address/balance'] = {}; + + this._bitcoindTransactionListener = this.transactionHandler.bind(this); + this._bitcoindTransactionLeaveListener = this.transactionLeaveHandler.bind(this); + this.node.services.bitcoind.on('tx', this._bitcoindTransactionListener); + this.node.services.bitcoind.on('txleave', this._bitcoindTransactionLeaveListener); + + this.maxInputsQueryLength = options.maxInputsQueryLength || constants.MAX_INPUTS_QUERY_LENGTH; + this.maxOutputsQueryLength = options.maxOutputsQueryLength || constants.MAX_OUTPUTS_QUERY_LENGTH; + + this._setMempoolIndexPath(); + if (options.mempoolMemoryIndex) { + this.levelupStore = memdown; + } else { + this.levelupStore = leveldown; + } + this.mempoolIndex = null; // Used for larger mempool indexes + this.mempoolSpentIndex = {}; // Used for small quick synchronous lookups + this.mempoolAddressIndex = {}; // Used to check if an address is on the spend pool +}; + +inherits(AddressService, BaseService); + +AddressService.dependencies = [ + 'bitcoind', + 'db' +]; + +AddressService.prototype.start = function(callback) { + var self = this; + + async.series([ + + function(next) { + // Flush any existing mempool index + if (fs.existsSync(self.mempoolIndexPath)) { + leveldown.destroy(self.mempoolIndexPath, next); + } else { + setImmediate(next); + } + }, + function(next) { + // Setup new mempool index + if (!fs.existsSync(self.mempoolIndexPath)) { + mkdirp(self.mempoolIndexPath, next); + } else { + setImmediate(next); + } + }, + function(next) { + self.mempoolIndex = levelup( + self.mempoolIndexPath, + { + db: self.levelupStore, + keyEncoding: 'binary', + valueEncoding: 'binary', + fillCache: false, + maxOpenFiles: 200 + }, + next + ); + } + ], callback); + +}; + +AddressService.prototype.stop = function(callback) { + // TODO Keep track of ongoing db requests before shutting down + this.node.services.bitcoind.removeListener('tx', this._bitcoindTransactionListener); + this.node.services.bitcoind.removeListener('txleave', this._bitcoindTransactionLeaveListener); + this.mempoolIndex.close(callback); +}; + +/** + * This function will set `this.mempoolIndexPath` based on `this.node.network`. + * @private + */ +AddressService.prototype._setMempoolIndexPath = function() { + this.mempoolIndexPath = this._getDBPathFor('bitcore-addressmempool.db'); +}; + +AddressService.prototype._getDBPathFor = function(dbname) { + $.checkState(this.node.datadir, 'Node is expected to have a "datadir" property'); + var path; + if (this.node.network === Networks.livenet) { + path = this.node.datadir + '/' + dbname; + } else if (this.node.network === Networks.testnet) { + if (this.node.network.regtestEnabled) { + path = this.node.datadir + '/regtest/' + dbname; + } else { + path = this.node.datadir + '/testnet3/' + dbname; + } + } else { + throw new Error('Unknown network: ' + this.network); + } + return path; +}; + +/** + * Called by the Node to get the available API methods for this service, + * that can be exposed over the JSON-RPC interface. + */ +AddressService.prototype.getAPIMethods = function() { + return [ + ['getBalance', this, this.getBalance, 2], + ['getOutputs', this, this.getOutputs, 2], + ['getUnspentOutputs', this, this.getUnspentOutputs, 2], + ['getInputForOutput', this, this.getInputForOutput, 2], + ['isSpent', this, this.isSpent, 2], + ['getAddressHistory', this, this.getAddressHistory, 2], + ['getAddressSummary', this, this.getAddressSummary, 1] + ]; +}; + +/** + * Called by the Bus to get the available events for this service. + */ +AddressService.prototype.getPublishEvents = function() { + return [ + { + name: 'address/transaction', + scope: this, + subscribe: this.subscribe.bind(this, 'address/transaction'), + unsubscribe: this.unsubscribe.bind(this, 'address/transaction') + }, + { + name: 'address/balance', + scope: this, + subscribe: this.subscribe.bind(this, 'address/balance'), + unsubscribe: this.unsubscribe.bind(this, 'address/balance') + } + ]; +}; + +/** + * Will process each output of a transaction from the daemon "tx" event, and construct + * an object with the data for the message to be relayed to any subscribers for an address. + * + * @param {Object} messages - An object to collect messages + * @param {Transaction} tx - Instance of the transaction + * @param {Number} outputIndex - The index of the output in the transaction + * @param {Boolean} rejected - If the transaction was rejected by the mempool + */ +AddressService.prototype.transactionOutputHandler = function(messages, tx, outputIndex, rejected) { + var script = tx.outputs[outputIndex].script; + + // If the script is invalid skip + if (!script) { + return; + } + + var addressInfo = encoding.extractAddressInfoFromScript(script, this.node.network); + if (!addressInfo) { + return; + } + + addressInfo.hashHex = addressInfo.hashBuffer.toString('hex'); + + // Collect data to publish to address subscribers + if (messages[addressInfo.hashHex]) { + messages[addressInfo.hashHex].outputIndexes.push(outputIndex); + } else { + messages[addressInfo.hashHex] = { + tx: tx, + outputIndexes: [outputIndex], + addressInfo: addressInfo, + rejected: rejected + }; + } +}; + +/** + * This will handle data from the daemon "txleave" that a transaction has left the mempool. + * @param {Object} txInfo - The data from the daemon.on('txleave') event + * @param {Buffer} txInfo.buffer - The transaction buffer + * @param {String} txInfo.hash - The hash of the transaction + */ +AddressService.prototype.transactionLeaveHandler = function(txInfo) { + var tx = bitcore.Transaction().fromBuffer(txInfo.buffer); + this.updateMempoolIndex(tx, false); +}; + +/** + * This will handle data from the daemon "tx" event, go through each of the outputs + * and send messages by calling `transactionEventHandler` to any subscribers for a + * particular address. + * @param {Object} txInfo - The data from the daemon.on('tx') event + * @param {Buffer} txInfo.buffer - The transaction buffer + * @param {Boolean} txInfo.mempool - If the transaction was accepted in the mempool + * @param {String} txInfo.hash - The hash of the transaction + * @param {Function} [callback] - Optional callback + */ +AddressService.prototype.transactionHandler = function(buffer, callback) { + var self = this; + + if (!callback) { + callback = function(err) { + if (err) { + return log.error(err); + } + }; + } + + if (this.node.stopping) { + return callback(); + } + + // Basic transaction format is handled by the daemon + // and we can safely assume the buffer is properly formatted. + var tx = bitcore.Transaction().fromBuffer(buffer); + + var messages = {}; + + var mempool = false; // txInfo.mempool + + var outputsLength = tx.outputs.length; + for (var i = 0; i < outputsLength; i++) { + // TODO missing txInfo.mempool + this.transactionOutputHandler(messages, tx, i, mempool); + } + + function finish(err) { + if (err) { + return callback(err); + } + for (var key in messages) { + self.transactionEventHandler(messages[key]); + self.balanceEventHandler(null, messages[key].addressInfo); + } + callback(); + } + + if (mempool) { + self.updateMempoolIndex(tx, true, finish); + } else { + setImmediate(finish); + } + +}; + +AddressService.prototype._updateAddressIndex = function(key, add) { + var currentValue = this.mempoolAddressIndex[key] || 0; + + if(add) { + if (currentValue > 0) { + this.mempoolAddressIndex[key] = currentValue + 1; + } else { + this.mempoolAddressIndex[key] = 1; + } + } else { + if (currentValue <= 1) { + delete this.mempoolAddressIndex[key]; + } else { + this.mempoolAddressIndex[key]--; + } + } +}; + + +/** + * This function will update the mempool address index with the necessary + * information for further lookups. + * @param {Transaction} - An instance of a Bitcore Transaction + * @param {Boolean} - Add/remove from the index + */ +AddressService.prototype.updateMempoolIndex = function(tx, add, callback) { + /* jshint maxstatements: 100 */ + + var operations = []; + var timestampBuffer = new Buffer(new Array(8)); + timestampBuffer.writeDoubleBE(new Date().getTime()); + + var action = 'put'; + if (!add) { + action = 'del'; + } + + var txid = tx.hash; + var txidBuffer = new Buffer(txid, 'hex'); + + var outputLength = tx.outputs.length; + for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { + var output = tx.outputs[outputIndex]; + if (!output.script) { + continue; + } + var addressInfo = encoding.extractAddressInfoFromScript(output.script, this.node.network); + if (!addressInfo) { + continue; + } + + var addressIndexKey = encoding.encodeMempoolAddressIndexKey(addressInfo.hashBuffer, addressInfo.hashTypeBuffer); + + this._updateAddressIndex(addressIndexKey, add); + + // Update output index + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + + var outKey = Buffer.concat([ + constants.MEMPREFIXES.OUTPUTS, + addressInfo.hashBuffer, + addressInfo.hashTypeBuffer, + txidBuffer, + outputIndexBuffer + ]); + + var outValue = encoding.encodeOutputMempoolValue( + output.satoshis, + timestampBuffer, + output._scriptBuffer + ); + + operations.push({ + type: action, + key: outKey, + value: outValue + }); + + } + var inputLength = tx.inputs.length; + for (var inputIndex = 0; inputIndex < inputLength; inputIndex++) { + + var input = tx.inputs[inputIndex]; + + var inputOutputIndexBuffer = new Buffer(4); + inputOutputIndexBuffer.writeUInt32BE(input.outputIndex); + + // Add an additional small spent index for fast synchronous lookups + var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( + input.prevTxId, + input.outputIndex + ); + if (add) { + this.mempoolSpentIndex[spentIndexSyncKey] = true; + } else { + delete this.mempoolSpentIndex[spentIndexSyncKey]; + } + + // Add a more detailed spent index with values + var spentIndexKey = Buffer.concat([ + constants.MEMPREFIXES.SPENTSMAP, + input.prevTxId, + inputOutputIndexBuffer + ]); + var inputIndexBuffer = new Buffer(4); + inputIndexBuffer.writeUInt32BE(inputIndex); + var inputIndexValue = Buffer.concat([ + txidBuffer, + inputIndexBuffer + ]); + operations.push({ + type: action, + key: spentIndexKey, + value: inputIndexValue + }); + + // Update input index + var inputHashBuffer; + var inputHashType; + if (input.script.isPublicKeyHashIn()) { + inputHashBuffer = Hash.sha256ripemd160(input.script.chunks[1].buf); + inputHashType = constants.HASH_TYPES.PUBKEY; + } else if (input.script.isScriptHashIn()) { + inputHashBuffer = Hash.sha256ripemd160(input.script.chunks[input.script.chunks.length - 1].buf); + inputHashType = constants.HASH_TYPES.REDEEMSCRIPT; + } else { + continue; + } + var inputKey = Buffer.concat([ + constants.MEMPREFIXES.SPENTS, + inputHashBuffer, + inputHashType, + input.prevTxId, + inputOutputIndexBuffer + ]); + var inputValue = Buffer.concat([ + txidBuffer, + inputIndexBuffer, + timestampBuffer + ]); + operations.push({ + type: action, + key: inputKey, + value: inputValue + }); + + var addressIndexKey = encoding.encodeMempoolAddressIndexKey(inputHashBuffer, inputHashType); + + this._updateAddressIndex(addressIndexKey, add); + } + + if (!callback) { + callback = function(err) { + if (err) { + return log.error(err); + } + }; + } + + this.mempoolIndex.batch(operations, callback); +}; + +/** + * The Database Service will run this function when blocks are connected and + * disconnected to the chain during syncing and reorganizations. + * @param {Block} block - An instance of a Bitcore Block + * @param {Boolean} addOutput - If the block is being removed or added to the chain + * @param {Function} callback + */ +AddressService.prototype.blockHandler = function(block, addOutput, callback) { + var txs = block.transactions; + var height = block.__height; + + var action = 'put'; + if (!addOutput) { + action = 'del'; + } + + var operations = []; + + var transactionLength = txs.length; + for (var i = 0; i < transactionLength; i++) { + + var tx = txs[i]; + var txid = tx.id; + var txidBuffer = new Buffer(txid, 'hex'); + var inputs = tx.inputs; + var outputs = tx.outputs; + + // Subscription messages + var txmessages = {}; + + var outputLength = outputs.length; + for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { + var output = outputs[outputIndex]; + + var script = output.script; + + if(!script) { + log.debug('Invalid script'); + continue; + } + + var addressInfo = encoding.extractAddressInfoFromScript(script, this.node.network); + if (!addressInfo) { + continue; + } + + // We need to use the height for indexes (and not the timestamp) because the + // the timestamp has unreliable sequential ordering. The next block + // can have a time that is previous to the previous block (however not + // less than the mean of the 11 previous blocks) and not greater than 2 + // hours in the future. + var key = encoding.encodeOutputKey(addressInfo.hashBuffer, addressInfo.hashTypeBuffer, + height, txidBuffer, outputIndex); + var value = encoding.encodeOutputValue(output.satoshis, output._scriptBuffer); + operations.push({ + type: action, + key: key, + value: value + }); + + addressInfo.hashHex = addressInfo.hashBuffer.toString('hex'); + + // Collect data for subscribers + if (txmessages[addressInfo.hashHex]) { + txmessages[addressInfo.hashHex].outputIndexes.push(outputIndex); + } else { + txmessages[addressInfo.hashHex] = { + tx: tx, + height: height, + outputIndexes: [outputIndex], + addressInfo: addressInfo, + timestamp: block.header.timestamp + }; + } + + this.balanceEventHandler(block, addressInfo); + + } + + // Publish events to any subscribers for this transaction + for (var addressKey in txmessages) { + this.transactionEventHandler(txmessages[addressKey]); + } + + if(tx.isCoinbase()) { + continue; + } + + for(var inputIndex = 0; inputIndex < inputs.length; inputIndex++) { + + var input = inputs[inputIndex]; + var inputHash; + var inputHashType; + + if (input.script.isPublicKeyHashIn()) { + inputHash = Hash.sha256ripemd160(input.script.chunks[1].buf); + inputHashType = constants.HASH_TYPES.PUBKEY; + } else if (input.script.isScriptHashIn()) { + inputHash = Hash.sha256ripemd160(input.script.chunks[input.script.chunks.length - 1].buf); + inputHashType = constants.HASH_TYPES.REDEEMSCRIPT; + } else { + continue; + } + + var prevTxIdBuffer = new Buffer(input.prevTxId, 'hex'); + + // To be able to query inputs by address and spent height + var inputKey = encoding.encodeInputKey(inputHash, inputHashType, height, prevTxIdBuffer, input.outputIndex); + var inputValue = encoding.encodeInputValue(txidBuffer, inputIndex); + + operations.push({ + type: action, + key: inputKey, + value: inputValue + }); + + // To be able to search for an input spending an output + var inputKeyMap = encoding.encodeInputKeyMap(prevTxIdBuffer, input.outputIndex); + var inputValueMap = encoding.encodeInputValueMap(txidBuffer, inputIndex); + + operations.push({ + type: action, + key: inputKeyMap, + value: inputValueMap + }); + + } + } + + setImmediate(function() { + callback(null, operations); + }); +}; + +/** + * This function is responsible for emitting events to any subscribers to the + * `address/transaction` event. + * @param {Object} obj + * @param {Transaction} obj.tx - The transaction + * @param {Object} obj.addressInfo + * @param {String} obj.addressInfo.hashHex - The hex string of address hash for the subscription + * @param {String} obj.addressInfo.hashBuffer - The address hash buffer + * @param {String} obj.addressInfo.addressType - The address type + * @param {Array} obj.outputIndexes - Indexes of the inputs that includes the address + * @param {Array} obj.inputIndexes - Indexes of the outputs that includes the address + * @param {Date} obj.timestamp - The time of the block the transaction was included + * @param {Number} obj.height - The height of the block the transaction was included + * @param {Boolean} obj.rejected - If the transaction was not accepted in the mempool + */ +AddressService.prototype.transactionEventHandler = function(obj) { + if(this.subscriptions['address/transaction'][obj.addressInfo.hashHex]) { + var emitters = this.subscriptions['address/transaction'][obj.addressInfo.hashHex]; + var address = new Address({ + hashBuffer: obj.addressInfo.hashBuffer, + network: this.node.network, + type: obj.addressInfo.addressType + }); + for(var i = 0; i < emitters.length; i++) { + emitters[i].emit('address/transaction', { + rejected: obj.rejected, + height: obj.height, + timestamp: obj.timestamp, + inputIndexes: obj.inputIndexes, + outputIndexes: obj.outputIndexes, + address: address, + tx: obj.tx + }); + } + } +}; + +/** + * The function is responsible for emitting events to any subscribers for the + * `address/balance` event. + * @param {Block} block + * @param {Object} obj + * @param {String} obj.hashHex + * @param {Buffer} obj.hashBuffer + * @param {String} obj.addressType + */ +AddressService.prototype.balanceEventHandler = function(block, obj) { + if(this.subscriptions['address/balance'][obj.hashHex]) { + var emitters = this.subscriptions['address/balance'][obj.hashHex]; + var address = new Address({ + hashBuffer: obj.hashBuffer, + network: this.node.network, + type: obj.addressType + }); + this.getBalance(address, true, function(err, balance) { + if(err) { + return this.emit(err); + } + for(var i = 0; i < emitters.length; i++) { + emitters[i].emit('address/balance', address, balance, block); + } + }); + } +}; + +/** + * The Bus will use this function to subscribe to the available + * events for this service. For information about the available events + * please see `getPublishEvents`. + * @param {String} name - The name of the event + * @param {EventEmitter} emitter - An event emitter instance + * @param {Array} addresses - An array of addresses to subscribe + */ +AddressService.prototype.subscribe = function(name, emitter, addresses) { + $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); + $.checkArgument(Array.isArray(addresses), 'Second argument is expected to be an Array of addresses'); + + for(var i = 0; i < addresses.length; i++) { + var hashHex = bitcore.Address(addresses[i]).hashBuffer.toString('hex'); + if(!this.subscriptions[name][hashHex]) { + this.subscriptions[name][hashHex] = []; + } + this.subscriptions[name][hashHex].push(emitter); + } +}; + +/** + * The Bus will use this function to unsubscribe to the available + * events for this service. + * @param {String} name - The name of the event + * @param {EventEmitter} emitter - An event emitter instance + * @param {Array} addresses - An array of addresses to subscribe + */ +AddressService.prototype.unsubscribe = function(name, emitter, addresses) { + $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); + $.checkArgument(Array.isArray(addresses) || _.isUndefined(addresses), 'Second argument is expected to be an Array of addresses or undefined'); + + if(!addresses) { + return this.unsubscribeAll(name, emitter); + } + + for(var i = 0; i < addresses.length; i++) { + var hashHex = bitcore.Address(addresses[i]).hashBuffer.toString('hex'); + if(this.subscriptions[name][hashHex]) { + var emitters = this.subscriptions[name][hashHex]; + var index = emitters.indexOf(emitter); + if(index > -1) { + emitters.splice(index, 1); + } + } + } +}; + +/** + * A helper function for the `unsubscribe` method to unsubscribe from all addresses. + * @param {String} name - The name of the event + * @param {EventEmitter} emitter - An instance of an event emitter + */ +AddressService.prototype.unsubscribeAll = function(name, emitter) { + $.checkArgument(emitter instanceof EventEmitter, 'First argument is expected to be an EventEmitter'); + + for(var hashHex in this.subscriptions[name]) { + var emitters = this.subscriptions[name][hashHex]; + var index = emitters.indexOf(emitter); + if(index > -1) { + emitters.splice(index, 1); + } + } +}; + +/** + * Will sum the total of all unspent outputs to calculate the balance + * for an address. + * @param {String} address - The base58check encoded address + * @param {Boolean} queryMempool - Include mempool in the results + * @param {Function} callback + */ +AddressService.prototype.getBalance = function(address, queryMempool, callback) { + this.getUnspentOutputs(address, queryMempool, function(err, outputs) { + if(err) { + return callback(err); + } + + var satoshis = outputs.map(function(output) { + return output.satoshis; + }); + + var sum = satoshis.reduce(function(a, b) { + return a + b; + }, 0); + + return callback(null, sum); + }); +}; + +/** + * Will give the input that spends an output if it exists with: + * inputTxId - The input txid hex string + * inputIndex - A number with the spending input index + * @param {String|Buffer} txid - The transaction hash with the output + * @param {Number} outputIndex - The output index in the transaction + * @param {Object} options + * @param {Object} options.queryMempool - Include mempool in results + * @param {Function} callback + */ +AddressService.prototype.getInputForOutput = function(txid, outputIndex, options, callback) { + $.checkArgument(_.isNumber(outputIndex)); + $.checkArgument(_.isObject(options)); + $.checkArgument(_.isFunction(callback)); + var self = this; + var txidBuffer; + if (Buffer.isBuffer(txid)) { + txidBuffer = txid; + } else { + txidBuffer = new Buffer(txid, 'hex'); + } + if (options.queryMempool) { + var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(txidBuffer, outputIndex); + if (this.mempoolSpentIndex[spentIndexSyncKey]) { + return this._getSpentMempool(txidBuffer, outputIndex, callback); + } + } + var key = encoding.encodeInputKeyMap(txidBuffer, outputIndex); + var dbOptions = { + valueEncoding: 'binary', + keyEncoding: 'binary' + }; + this.node.services.db.store.get(key, dbOptions, function(err, buffer) { + if (err instanceof levelup.errors.NotFoundError) { + return callback(null, false); + } else if (err) { + return callback(err); + } + var value = encoding.decodeInputValueMap(buffer); + callback(null, { + inputTxId: value.inputTxId.toString('hex'), + inputIndex: value.inputIndex + }); + }); +}; + +/** + * A streaming equivalent to `getInputs`, and returns a transform stream with data + * emitted in the same format as `getInputs`. + * + * @param {String} addressStr - The relevant address + * @param {Object} options - Additional options for query the outputs + * @param {Number} [options.start] - The relevant start block height + * @param {Number} [options.end] - The relevant end block height + * @param {Function} callback + */ +AddressService.prototype.createInputsStream = function(addressStr, options) { + var inputStream = new InputsTransformStream({ + address: new Address(addressStr, this.node.network), + tipHeight: this.node.services.db.tip.__height + }); + + var stream = this.createInputsDBStream(addressStr, options) + .on('error', function(err) { + // Forward the error + inputStream.emit('error', err); + inputStream.end(); + }).pipe(inputStream); + + return stream; + +}; + +AddressService.prototype.createInputsDBStream = function(addressStr, options) { + var stream; + var addrObj = encoding.getAddressInfo(addressStr); + var hashBuffer = addrObj.hashBuffer; + var hashTypeBuffer = addrObj.hashTypeBuffer; + + if (options.start >= 0 && options.end >= 0) { + + var endBuffer = new Buffer(4); + endBuffer.writeUInt32BE(options.end, 0); + + var startBuffer = new Buffer(4); + // Because the key has additional data following it, we don't have an ability + // to use "gte" or "lte" we can only use "gt" and "lt", we therefore need to adjust the number + // to be one value larger to include it. + var adjustedStart = options.start + 1; + startBuffer.writeUInt32BE(adjustedStart, 0); + + stream = this.node.services.db.store.createReadStream({ + gt: Buffer.concat([ + constants.PREFIXES.SPENTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + endBuffer + ]), + lt: Buffer.concat([ + constants.PREFIXES.SPENTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + startBuffer + ]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + } else { + var allKey = Buffer.concat([constants.PREFIXES.SPENTS, hashBuffer, hashTypeBuffer]); + stream = this.node.services.db.store.createReadStream({ + gt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MIN]), + lt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MAX]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + } + + return stream; +}; + +/** + * Will give inputs that spend previous outputs for an address as an object with: + * address - The base58check encoded address + * hashtype - The type of the address, e.g. 'pubkeyhash' or 'scripthash' + * txid - A string of the transaction hash + * outputIndex - A number of corresponding transaction input + * height - The height of the block the transaction was included, will be -1 for mempool transactions + * confirmations - The number of confirmations, will equal 0 for mempool transactions + * + * @param {String} addressStr - The relevant address + * @param {Object} options - Additional options for query the outputs + * @param {Number} [options.start] - The relevant start block height + * @param {Number} [options.end] - The relevant end block height + * @param {Boolean} [options.queryMempool] - Include the mempool in the results + * @param {Function} callback + */ +AddressService.prototype.getInputs = function(addressStr, options, callback) { + + var self = this; + + var inputs = []; + + var addrObj = encoding.getAddressInfo(addressStr); + var hashBuffer = addrObj.hashBuffer; + var hashTypeBuffer = addrObj.hashTypeBuffer; + + var stream = this.createInputsStream(addressStr, options); + + stream.on('data', function(input) { + inputs.push(input); + if (inputs.length > self.maxInputsQueryLength) { + log.warn('Tried to query too many inputs (' + self.maxInputsQueryLength + ') for address '+ addressStr); + error = new Error('Maximum number of inputs (' + self.maxInputsQueryLength + ') per query reached'); + stream.end(); + } + }); + + var error; + + stream.on('error', function(streamError) { + if (streamError) { + error = streamError; + } + }); + + stream.on('finish', function() { + if (error) { + return callback(error); + } + + if(options.queryMempool) { + self._getInputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolInputs) { + if (err) { + return callback(err); + } + inputs = inputs.concat(mempoolInputs); + callback(null, inputs); + }); + } else { + callback(null, inputs); + } + + }); + + return stream; + +}; + +AddressService.prototype._getInputsMempool = function(addressStr, hashBuffer, hashTypeBuffer, callback) { + var self = this; + var mempoolInputs = []; + + var stream = self.mempoolIndex.createReadStream({ + gte: Buffer.concat([ + constants.MEMPREFIXES.SPENTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN + ]), + lte: Buffer.concat([ + constants.MEMPREFIXES.SPENTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MAX + ]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + + stream.on('data', function(data) { + var txid = data.value.slice(0, 32); + var inputIndex = data.value.readUInt32BE(32); + var timestamp = data.value.readDoubleBE(36); + var input = { + address: addressStr, + hashType: constants.HASH_TYPES_READABLE[hashTypeBuffer.toString('hex')], + txid: txid.toString('hex'), //TODO use a buffer + inputIndex: inputIndex, + timestamp: timestamp, + height: -1, + confirmations: 0 + }; + mempoolInputs.push(input); + }); + + var error; + + stream.on('error', function(streamError) { + if (streamError) { + error = streamError; + } + }); + + stream.on('close', function() { + if (error) { + return callback(error); + } + callback(null, mempoolInputs); + }); + +}; + +AddressService.prototype._getSpentMempool = function(txidBuffer, outputIndex, callback) { + var outputIndexBuffer = new Buffer(4); + outputIndexBuffer.writeUInt32BE(outputIndex); + var spentIndexKey = Buffer.concat([ + constants.MEMPREFIXES.SPENTSMAP, + txidBuffer, + outputIndexBuffer + ]); + + this.mempoolIndex.get( + spentIndexKey, + function(err, mempoolValue) { + if (err) { + return callback(err); + } + var inputTxId = mempoolValue.slice(0, 32); + var inputIndex = mempoolValue.readUInt32BE(32); + callback(null, { + inputTxId: inputTxId.toString('hex'), + inputIndex: inputIndex + }); + } + ); +}; + +AddressService.prototype.createOutputsStream = function(addressStr, options) { + var outputStream = new OutputsTransformStream({ + address: new Address(addressStr, this.node.network), + tipHeight: this.node.services.db.tip.__height + }); + + var stream = this.createOutputsDBStream(addressStr, options) + .on('error', function(err) { + // Forward the error + outputStream.emit('error', err); + outputStream.end(); + }) + .pipe(outputStream); + + return stream; + +}; + +AddressService.prototype.createOutputsDBStream = function(addressStr, options) { + + var addrObj = encoding.getAddressInfo(addressStr); + var hashBuffer = addrObj.hashBuffer; + var hashTypeBuffer = addrObj.hashTypeBuffer; + var stream; + + if (options.start >= 0 && options.end >= 0) { + + var endBuffer = new Buffer(4); + endBuffer.writeUInt32BE(options.end, 0); + + var startBuffer = new Buffer(4); + // Because the key has additional data following it, we don't have an ability + // to use "gte" or "lte" we can only use "gt" and "lt", we therefore need to adjust the number + // to be one value larger to include it. + var startAdjusted = options.start + 1; + startBuffer.writeUInt32BE(startAdjusted, 0); + + stream = this.node.services.db.store.createReadStream({ + gt: Buffer.concat([ + constants.PREFIXES.OUTPUTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + endBuffer + ]), + lt: Buffer.concat([ + constants.PREFIXES.OUTPUTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN, + startBuffer + ]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + } else { + var allKey = Buffer.concat([constants.PREFIXES.OUTPUTS, hashBuffer, hashTypeBuffer]); + stream = this.node.services.db.store.createReadStream({ + gt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MIN]), + lt: Buffer.concat([allKey, constants.SPACER_HEIGHT_MAX]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + } + + return stream; + +}; + +/** + * Will give outputs for an address as an object with: + * address - The base58check encoded address + * hashtype - The type of the address, e.g. 'pubkeyhash' or 'scripthash' + * txid - A string of the transaction hash + * outputIndex - A number of corresponding transaction output + * height - The height of the block the transaction was included, will be -1 for mempool transactions + * satoshis - The satoshis value of the output + * script - The script of the output as a hex string + * confirmations - The number of confirmations, will equal 0 for mempool transactions + * + * @param {String} addressStr - The relevant address + * @param {Object} options - Additional options for query the outputs + * @param {Number} [options.start] - The relevant start block height + * @param {Number} [options.end] - The relevant end block height + * @param {Boolean} [options.queryMempool] - Include the mempool in the results + * @param {Function} callback + */ +AddressService.prototype.getOutputs = function(addressStr, options, callback) { + var self = this; + $.checkArgument(_.isObject(options), 'Second argument is expected to be an options object.'); + $.checkArgument(_.isFunction(callback), 'Third argument is expected to be a callback function.'); + + var addrObj = encoding.getAddressInfo(addressStr); + var hashBuffer = addrObj.hashBuffer; + var hashTypeBuffer = addrObj.hashTypeBuffer; + if (!hashTypeBuffer) { + return callback(new Error('Unknown address type: ' + addrObj.hashTypeReadable + ' for address: ' + addressStr)); + } + + var outputs = []; + var stream = this.createOutputsStream(addressStr, options); + + stream.on('data', function(data) { + outputs.push(data); + if (outputs.length > self.maxOutputsQueryLength) { + log.warn('Tried to query too many outputs (' + self.maxOutputsQueryLength + ') for address ' + addressStr); + error = new Error('Maximum number of outputs (' + self.maxOutputsQueryLength + ') per query reached'); + stream.end(); + } + }); + + var error; + + stream.on('error', function(streamError) { + if (streamError) { + error = streamError; + } + }); + + stream.on('finish', function() { + if (error) { + return callback(error); + } + + if(options.queryMempool) { + self._getOutputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolOutputs) { + if (err) { + return callback(err); + } + outputs = outputs.concat(mempoolOutputs); + callback(null, outputs); + }); + } else { + callback(null, outputs); + } + }); + + return stream; + +}; + +AddressService.prototype._getOutputsMempool = function(addressStr, hashBuffer, hashTypeBuffer, callback) { + var self = this; + var mempoolOutputs = []; + + var stream = self.mempoolIndex.createReadStream({ + gte: Buffer.concat([ + constants.MEMPREFIXES.OUTPUTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MIN + ]), + lte: Buffer.concat([ + constants.MEMPREFIXES.OUTPUTS, + hashBuffer, + hashTypeBuffer, + constants.SPACER_MAX + ]), + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + + stream.on('data', function(data) { + // Format of data: + // prefix: 1, hashBuffer: 20, hashTypeBuffer: 1, txid: 32, outputIndex: 4 + var txid = data.key.slice(22, 54); + var outputIndex = data.key.readUInt32BE(54); + var value = encoding.decodeOutputMempoolValue(data.value); + var output = { + address: addressStr, + hashType: constants.HASH_TYPES_READABLE[hashTypeBuffer.toString('hex')], + txid: txid.toString('hex'), //TODO use a buffer + outputIndex: outputIndex, + height: -1, + timestamp: value.timestamp, + satoshis: value.satoshis, + script: value.scriptBuffer.toString('hex'), //TODO use a buffer + confirmations: 0 + }; + mempoolOutputs.push(output); + }); + + var error; + + stream.on('error', function(streamError) { + if (streamError) { + error = streamError; + } + }); + + stream.on('close', function() { + if (error) { + return callback(error); + } + callback(null, mempoolOutputs); + }); + +}; + +/** + * Will give unspent outputs for an address or an array of addresses. + * @param {Array|String} addresses - An array of addresses + * @param {Boolean} queryMempool - Include or exclude the mempool + * @param {Function} callback + */ +AddressService.prototype.getUnspentOutputs = function(addresses, queryMempool, callback) { + var self = this; + + if(!Array.isArray(addresses)) { + addresses = [addresses]; + } + + var utxos = []; + + async.eachSeries(addresses, function(address, next) { + self.getUnspentOutputsForAddress(address, queryMempool, function(err, unspents) { + if(err && err instanceof errors.NoOutputs) { + return next(); + } else if(err) { + return next(err); + } + + utxos = utxos.concat(unspents); + next(); + }); + }, function(err) { + callback(err, utxos); + }); +}; + +/** + * Will give unspent outputs for an address. + * @param {String} address - An address in base58check encoding + * @param {Boolean} queryMempool - Include or exclude the mempool + * @param {Function} callback + */ +AddressService.prototype.getUnspentOutputsForAddress = function(address, queryMempool, callback) { + + var self = this; + + this.getOutputs(address, {queryMempool: queryMempool}, function(err, outputs) { + if (err) { + return callback(err); + } else if(!outputs.length) { + return callback(new errors.NoOutputs('Address ' + address + ' has no outputs'), []); + } + + var opts = { + queryMempool: queryMempool + }; + + var isUnspent = function(output, callback) { + self.isUnspent(output, opts, callback); + }; + + async.filter(outputs, isUnspent, function(results) { + callback(null, results); + }); + }); +}; + +/** + * Will give the inverse of isSpent + * @param {Object} output + * @param {Object} options + * @param {Boolean} options.queryMempool - Include mempool in results + * @param {Function} callback + */ +AddressService.prototype.isUnspent = function(output, options, callback) { + $.checkArgument(_.isFunction(callback)); + this.isSpent(output, options, function(spent) { + callback(!spent); + }); +}; + +/** + * Will determine if an output is spent. + * @param {Object} output - An output as returned from getOutputs + * @param {Object} options + * @param {Boolean} options.queryMempool - Include mempool in results + * @param {Function} callback + */ +AddressService.prototype.isSpent = function(output, options, callback) { + $.checkArgument(_.isFunction(callback)); + var queryMempool = _.isUndefined(options.queryMempool) ? true : options.queryMempool; + var self = this; + var txid = output.prevTxId ? output.prevTxId.toString('hex') : output.txid; + var spent = self.node.services.bitcoind.isSpent(txid, output.outputIndex); + if (!spent && queryMempool) { + var txidBuffer = new Buffer(txid, 'hex'); + var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey(txidBuffer, output.outputIndex); + spent = self.mempoolSpentIndex[spentIndexSyncKey] ? true : false; + } + setImmediate(function() { + // TODO error should be the first argument? + callback(spent); + }); +}; + + +/** + * This will give the history for many addresses limited by a range of block heights (to limit + * the database lookup times) and/or paginated to limit the results length. + * + * The response format will be: + * { + * totalCount: 12 // the total number of items there are between the two heights + * items: [ + * { + * addresses: { + * '12c6DSiU4Rq3P4ZxziKxzrL5LmMBrzjrJX': { + * inputIndexes: [], + * outputIndexes: [0] + * } + * }, + * satoshis: 100, + * height: 300000, + * confirmations: 1, + * timestamp: 1442337090 // in seconds + * fees: 1000 // in satoshis + * tx: + * } + * ] + * } + * @param {Array} addresses - An array of addresses + * @param {Object} options - The options to limit the query + * @param {Number} [options.from] - The pagination "from" index + * @param {Number} [options.to] - The pagination "to" index + * @param {Number} [options.start] - The beginning block height (e.g. 1500 the most recent block height). + * @param {Number} [options.end] - The ending block height (e.g. 0 the older block height, results are inclusive). + * @param {Boolean} [options.queryMempool] - Include the mempool in the query + * @param {Function} callback + */ +AddressService.prototype.getAddressHistory = function(addresses, options, callback) { + var history = new AddressHistory({ + node: this.node, + options: options, + addresses: addresses + }); + history.get(callback); +}; + +/** + * This will give an object with: + * balance - confirmed balance + * unconfirmedBalance - unconfirmed balance + * totalReceived - satoshis received + * totalSpent - satoshis spent + * appearances - number of transactions + * unconfirmedAppearances - number of unconfirmed transactions + * txids - list of txids (unless noTxList is set) + * + * @param {String} address + * @param {Object} options + * @param {Boolean} [options.noTxList] - if set, txid array will not be included + * @param {Function} callback + */ +AddressService.prototype.getAddressSummary = function(addressArg, options, callback) { + var self = this; + + var startTime = new Date(); + var address = new Address(addressArg); + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + async.waterfall([ + function(next) { + self._getAddressConfirmedSummary(address, options, next); + }, + function(result, next) { + self._getAddressMempoolSummary(address, options, result, next); + }, + function(result, next) { + self._setAndSortTxidsFromAppearanceIds(result, next); + } + ], function(err, result) { + if (err) { + return callback(err); + } + + var summary = self._transformAddressSummaryFromResult(result, options); + + var timeDelta = new Date() - startTime; + if (timeDelta > 5000) { + var seconds = Math.round(timeDelta / 1000); + log.warn('Slow (' + seconds + 's) getAddressSummary request for address: ' + address.toString()); + } + + callback(null, summary); + + }); + +}; + +AddressService.prototype._getAddressConfirmedSummary = function(address, options, callback) { + var self = this; + var baseResult = { + appearanceIds: {}, + totalReceived: 0, + balance: 0, + unconfirmedAppearanceIds: {}, + unconfirmedBalance: 0 + }; + + async.waterfall([ + function(next) { + self._getAddressConfirmedInputsSummary(address, baseResult, options, next); + }, + function(result, next) { + self._getAddressConfirmedOutputsSummary(address, result, options, next); + } + ], callback); + +}; + +AddressService.prototype._getAddressConfirmedInputsSummary = function(address, result, options, callback) { + $.checkArgument(address instanceof Address); + var self = this; + var error = null; + var count = 0; + + var inputsStream = self.createInputsStream(address, options); + inputsStream.on('data', function(input) { + var txid = input.txid; + result.appearanceIds[txid] = input.height; + + count++; + + if (count > self.maxInputsQueryLength) { + log.warn('Tried to query too many inputs (' + self.maxInputsQueryLength + ') for summary of address ' + address.toString()); + error = new Error('Maximum number of inputs (' + self.maxInputsQueryLength + ') per query reached'); + inputsStream.end(); + } + + }); + + inputsStream.on('error', function(err) { + error = err; + }); + + inputsStream.on('end', function() { + if (error) { + return callback(error); + } + callback(null, result); + }); +}; + +AddressService.prototype._getAddressConfirmedOutputsSummary = function(address, result, options, callback) { + $.checkArgument(address instanceof Address); + $.checkArgument(!_.isUndefined(result) && + !_.isUndefined(result.appearanceIds) && + !_.isUndefined(result.unconfirmedAppearanceIds)); + + var self = this; + var count = 0; + + var outputStream = self.createOutputsStream(address, options); + + outputStream.on('data', function(output) { + + var txid = output.txid; + var outputIndex = output.outputIndex; + result.totalReceived += output.satoshis; + result.appearanceIds[txid] = output.height; + + if(!options.noBalance) { + + // Bitcoind's isSpent only works for confirmed transactions + var spentDB = self.node.services.bitcoind.isSpent(txid, outputIndex); + + if(!spentDB) { + result.balance += output.satoshis; + } + + if(options.queryMempool) { + // Check to see if this output is spent in the mempool and if so + // we will subtract it from the unconfirmedBalance (a.k.a unconfirmedDelta) + var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( + new Buffer(txid, 'hex'), // TODO: get buffer directly + outputIndex + ); + var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; + if(spentMempool) { + result.unconfirmedBalance -= output.satoshis; + } + } + } + + count++; + + if (count > self.maxOutputsQueryLength) { + log.warn('Tried to query too many outputs (' + self.maxOutputsQueryLength + ') for summary of address ' + address.toString()); + error = new Error('Maximum number of outputs (' + self.maxOutputsQueryLength + ') per query reached'); + outputStream.end(); + } + + }); + + var error = null; + + outputStream.on('error', function(err) { + error = err; + }); + + outputStream.on('end', function() { + if (error) { + return callback(error); + } + callback(null, result); + }); + +}; + +AddressService.prototype._setAndSortTxidsFromAppearanceIds = function(result, callback) { + result.txids = Object.keys(result.appearanceIds); + result.txids.sort(function(a, b) { + return result.appearanceIds[a] - result.appearanceIds[b]; + }); + result.unconfirmedTxids = Object.keys(result.unconfirmedAppearanceIds); + result.unconfirmedTxids.sort(function(a, b) { + return result.unconfirmedAppearanceIds[a] - result.unconfirmedAppearanceIds[b]; + }); + callback(null, result); +}; + +AddressService.prototype._getAddressMempoolSummary = function(address, options, result, callback) { + var self = this; + + // Skip if the options do not want to include the mempool + if (!options.queryMempool) { + return callback(null, result); + } + + var addressStr = address.toString(); + var hashBuffer = address.hashBuffer; + var hashTypeBuffer = constants.HASH_TYPES_MAP[address.type]; + var addressIndexKey = encoding.encodeMempoolAddressIndexKey(hashBuffer, hashTypeBuffer); + + if(!this.mempoolAddressIndex[addressIndexKey]) { + return callback(null, result); + } + + async.waterfall([ + function(next) { + self._getInputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolInputs) { + if (err) { + return next(err); + } + for(var i = 0; i < mempoolInputs.length; i++) { + var input = mempoolInputs[i]; + result.unconfirmedAppearanceIds[input.txid] = input.timestamp; + } + next(null, result); + }); + + }, function(result, next) { + self._getOutputsMempool(addressStr, hashBuffer, hashTypeBuffer, function(err, mempoolOutputs) { + if (err) { + return next(err); + } + for(var i = 0; i < mempoolOutputs.length; i++) { + var output = mempoolOutputs[i]; + + result.unconfirmedAppearanceIds[output.txid] = output.timestamp; + + if(!options.noBalance) { + var spentIndexSyncKey = encoding.encodeSpentIndexSyncKey( + new Buffer(output.txid, 'hex'), // TODO: get buffer directly + output.outputIndex + ); + var spentMempool = self.mempoolSpentIndex[spentIndexSyncKey]; + // Only add this to the balance if it's not spent in the mempool already + if(!spentMempool) { + result.unconfirmedBalance += output.satoshis; + } + } + } + next(null, result); + }); + } + ], callback); +}; + +AddressService.prototype._transformAddressSummaryFromResult = function(result, options) { + + var confirmedTxids = result.txids; + var unconfirmedTxids = result.unconfirmedTxids; + + var summary = { + totalReceived: result.totalReceived, + totalSpent: result.totalReceived - result.balance, + balance: result.balance, + appearances: confirmedTxids.length, + unconfirmedBalance: result.unconfirmedBalance, + unconfirmedAppearances: unconfirmedTxids.length + }; + + if (options.fullTxList) { + summary.appearanceIds = result.appearanceIds; + summary.unconfirmedAppearanceIds = result.unconfirmedAppearanceIds; + } else if (!options.noTxList) { + summary.txids = confirmedTxids.concat(unconfirmedTxids); + } + + return summary; + +}; + +module.exports = AddressService; diff --git a/lib/services/address/streams/inputs-transform.js b/lib/services/address/streams/inputs-transform.js new file mode 100644 index 00000000..8b8f71d3 --- /dev/null +++ b/lib/services/address/streams/inputs-transform.js @@ -0,0 +1,40 @@ +'use strict'; + +var Transform = require('stream').Transform; +var inherits = require('util').inherits; +var bitcore = require('bitcore-lib'); +var encodingUtil = require('../encoding'); +var $ = bitcore.util.preconditions; + +function InputsTransformStream(options) { + $.checkArgument(options.address instanceof bitcore.Address); + Transform.call(this, { + objectMode: true + }); + this._address = options.address; + this._addressStr = this._address.toString(); + this._tipHeight = options.tipHeight; +} +inherits(InputsTransformStream, Transform); + +InputsTransformStream.prototype._transform = function(chunk, encoding, callback) { + var self = this; + + var key = encodingUtil.decodeInputKey(chunk.key); + var value = encodingUtil.decodeInputValue(chunk.value); + + var input = { + address: this._addressStr, + hashType: this._address.type, + txid: value.txid.toString('hex'), + inputIndex: value.inputIndex, + height: key.height, + confirmations: this._tipHeight - key.height + 1 + }; + + self.push(input); + callback(); + +}; + +module.exports = InputsTransformStream; diff --git a/lib/services/address/streams/outputs-transform.js b/lib/services/address/streams/outputs-transform.js new file mode 100644 index 00000000..b9c8e8d3 --- /dev/null +++ b/lib/services/address/streams/outputs-transform.js @@ -0,0 +1,42 @@ +'use strict'; + +var Transform = require('stream').Transform; +var inherits = require('util').inherits; +var bitcore = require('bitcore-lib'); +var encodingUtil = require('../encoding'); +var $ = bitcore.util.preconditions; + +function OutputsTransformStream(options) { + Transform.call(this, { + objectMode: true + }); + $.checkArgument(options.address instanceof bitcore.Address); + this._address = options.address; + this._addressStr = this._address.toString(); + this._tipHeight = options.tipHeight; +} +inherits(OutputsTransformStream, Transform); + +OutputsTransformStream.prototype._transform = function(chunk, encoding, callback) { + var self = this; + + var key = encodingUtil.decodeOutputKey(chunk.key); + var value = encodingUtil.decodeOutputValue(chunk.value); + + var output = { + address: this._addressStr, + hashType: this._address.type, + txid: key.txid.toString('hex'), //TODO use a buffer + outputIndex: key.outputIndex, + height: key.height, + satoshis: value.satoshis, + script: value.scriptBuffer.toString('hex'), //TODO use a buffer + confirmations: this._tipHeight - key.height + 1 + }; + + self.push(output); + callback(); + +}; + +module.exports = OutputsTransformStream; diff --git a/lib/services/bitcoind.js b/lib/services/bitcoind.js index 21480251..f505d736 100644 --- a/lib/services/bitcoind.js +++ b/lib/services/bitcoind.js @@ -156,6 +156,7 @@ Bitcoin.prototype._initClients = function() { * Called by Node to determine the available API methods. */ Bitcoin.prototype.getAPIMethods = function() { + return []; var methods = [ ['getBlock', this, this.getBlock, 1], ['getRawBlock', this, this.getRawBlock, 1], diff --git a/lib/services/db.js b/lib/services/db.js new file mode 100644 index 00000000..54496782 --- /dev/null +++ b/lib/services/db.js @@ -0,0 +1,804 @@ +'use strict'; + +var util = require('util'); +var fs = require('fs'); +var async = require('async'); +var levelup = require('levelup'); +var leveldown = require('leveldown'); +var mkdirp = require('mkdirp'); +var bitcore = require('bitcore-lib'); +var BufferUtil = bitcore.util.buffer; +var Networks = bitcore.Networks; +var Block = bitcore.Block; +var $ = bitcore.util.preconditions; +var index = require('../'); +var errors = index.errors; +var log = index.log; +var Transaction = require('../transaction'); +var Service = require('../service'); + +/** + * This service synchronizes a leveldb database with bitcoin block chain by connecting and + * disconnecting blocks to build new indexes that can be queried. Other services can extend + * the data that is indexed by implementing a `blockHandler` method. + * + * @param {Object} options + * @param {Node} options.node - A reference to the node + * @param {Node} options.store - A levelup backend store + */ +function DB(options) { + /* jshint maxstatements: 20 */ + + if (!(this instanceof DB)) { + return new DB(options); + } + if (!options) { + options = {}; + } + + Service.call(this, options); + + // Used to keep track of the version of the indexes + // to determine during an upgrade if a reindex is required + this.version = 2; + + this.tip = null; + this.genesis = null; + + $.checkState(this.node.network, 'Node is expected to have a "network" property'); + this.network = this.node.network; + + this._setDataPath(); + + this.maxOpenFiles = options.maxOpenFiles || DB.DEFAULT_MAX_OPEN_FILES; + this.maxTransactionLimit = options.maxTransactionLimit || DB.MAX_TRANSACTION_LIMIT; + + this.levelupStore = leveldown; + if (options.store) { + this.levelupStore = options.store; + } + + this.retryInterval = 60000; + + this.subscriptions = { + transaction: [], + block: [] + }; +} + +util.inherits(DB, Service); + +DB.dependencies = ['bitcoind']; + +DB.PREFIXES = { + VERSION: new Buffer('ff', 'hex'), + BLOCKS: new Buffer('01', 'hex'), + TIP: new Buffer('04', 'hex') +}; + +// The maximum number of transactions to query at once +// Used for populating previous inputs +DB.MAX_TRANSACTION_LIMIT = 5; + +// The default maxiumum number of files open for leveldb +DB.DEFAULT_MAX_OPEN_FILES = 200; + +/** + * This function will set `this.dataPath` based on `this.node.network`. + * @private + */ +DB.prototype._setDataPath = function() { + $.checkState(this.node.datadir, 'Node is expected to have a "datadir" property'); + if (this.node.network === Networks.livenet) { + this.dataPath = this.node.datadir + '/bitcore-node.db'; + } else if (this.node.network === Networks.testnet) { + if (this.node.network.regtestEnabled) { + this.dataPath = this.node.datadir + '/regtest/bitcore-node.db'; + } else { + this.dataPath = this.node.datadir + '/testnet3/bitcore-node.db'; + } + } else { + throw new Error('Unknown network: ' + this.network); + } +}; + +DB.prototype._checkVersion = function(callback) { + var self = this; + var options = { + keyEncoding: 'binary', + valueEncoding: 'binary' + }; + self.store.get(DB.PREFIXES.TIP, options, function(err) { + if (err instanceof levelup.errors.NotFoundError) { + // The database is brand new and doesn't have a tip stored + // we can skip version checking + return callback(); + } else if (err) { + return callback(err); + } + self.store.get(DB.PREFIXES.VERSION, options, function(err, buffer) { + var version; + if (err instanceof levelup.errors.NotFoundError) { + // The initial version (1) of the database didn't store the version number + version = 1; + } else if (err) { + return callback(err); + } else { + version = buffer.readUInt32BE(); + } + if (self.version !== version) { + var helpUrl = 'https://github.com/bitpay/bitcore-node/blob/master/docs/services/db.md#how-to-reindex'; + return callback(new Error( + 'The version of the database "' + version + '" does not match the expected version "' + + self.version + '". A recreation of "' + self.dataPath + '" (can take several hours) is ' + + 'required or to switch versions of software to match. Please see ' + helpUrl + + ' for more information.' + )); + } + callback(); + }); + }); +}; + +DB.prototype._setVersion = function(callback) { + var versionBuffer = new Buffer(new Array(4)); + versionBuffer.writeUInt32BE(this.version); + this.store.put(DB.PREFIXES.VERSION, versionBuffer, callback); +}; + +/** + * Called by Node to start the service. + * @param {Function} callback + */ +DB.prototype.start = function(callback) { + + var self = this; + if (!fs.existsSync(this.dataPath)) { + mkdirp.sync(this.dataPath); + } + + this.genesis = Block.fromBuffer(this.node.services.bitcoind.genesisBuffer); + this.store = levelup(this.dataPath, { db: this.levelupStore, maxOpenFiles: this.maxOpenFiles }); + this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); + + this.once('ready', function() { + log.info('Bitcoin Database Ready'); + + // Notify that there is a new tip + self.node.services.bitcoind.on('tip', function(height) { + if(!self.node.stopping) { + self.sync(); + } + }); + }); + + async.series([ + function(next) { + self._checkVersion(next); + }, + function(next) { + self._setVersion(next); + } + ], function(err) { + if (err) { + return callback(err); + } + self.loadTip(function(err) { + if (err) { + return callback(err); + } + + self.sync(); + self.emit('ready'); + setImmediate(callback); + }); + }); +}; + +/** + * Called by Node to stop the service + * @param {Function} callback + */ +DB.prototype.stop = function(callback) { + var self = this; + + // Wait until syncing stops and all db operations are completed before closing leveldb + async.whilst(function() { + return self.bitcoindSyncing; + }, function(next) { + setTimeout(next, 10); + }, function() { + self.store.close(callback); + }); +}; + +/** + * Will give information about the database from bitcoin. + * @param {Function} callback + */ +DB.prototype.getInfo = function(callback) { + var self = this; + setImmediate(function() { + var info = self.node.bitcoind.getInfo(); + callback(null, info); + }); +}; + +/** + * Closes the underlying store database + * @param {Function} callback + */ +DB.prototype.close = function(callback) { + this.store.close(callback); +}; + +/** + * This function is responsible for emitting `db/transaction` events. + * @param {Object} txInfo - The data from the bitcoind.on('tx') event + * @param {Buffer} txInfo.buffer - The transaction buffer + * @param {Boolean} txInfo.mempool - If the transaction was accepted in the mempool + * @param {String} txInfo.hash - The hash of the transaction + */ +DB.prototype.transactionHandler = function(tx) { + for (var i = 0; i < this.subscriptions.transaction.length; i++) { + this.subscriptions.transaction[i].emit('db/transaction', { + rejected: !txInfo.mempool, + tx: tx + }); + } +}; + +/** + * Called by Node to determine the available API methods. + */ +DB.prototype.getAPIMethods = function() { + var methods = [ + ['getBlock', this, this.getBlock, 1], + ['getBlockHashesByTimestamp', this, this.getBlockHashesByTimestamp, 2], + ['getTransaction', this, this.getTransaction, 2], + ['getTransactionWithBlockInfo', this, this.getTransactionWithBlockInfo, 2], + ['sendTransaction', this, this.sendTransaction, 1], + ['estimateFee', this, this.estimateFee, 1] + ]; + return methods; +}; + +DB.prototype.loadTip = function(callback) { + var self = this; + + var options = { + keyEncoding: 'binary', + valueEncoding: 'binary' + }; + + self.store.get(DB.PREFIXES.TIP, options, function(err, tipData) { + if(err && err instanceof levelup.errors.NotFoundError) { + self.tip = self.genesis; + self.tip.__height = 0; + self.connectBlock(self.genesis, function(err) { + if(err) { + return callback(err); + } + + self.emit('addblock', self.genesis); + callback(); + }); + return; + } else if(err) { + return callback(err); + } + + var hash = tipData.toString('hex'); + + var times = 0; + async.retry({times: 3, interval: self.retryInterval}, function(done) { + self.getBlock(hash, function(err, tip) { + if(err) { + times++; + log.warn('Bitcoind does not have our tip (' + hash + '). Bitcoind may have crashed and needs to catch up.'); + if(times < 3) { + log.warn('Retrying in ' + (self.retryInterval / 1000) + ' seconds.'); + } + return done(err); + } + + done(null, tip); + }); + }, function(err, tip) { + if(err) { + log.warn('Giving up after 3 tries. Please report this bug to https://github.com/bitpay/bitcore-node/issues'); + log.warn('Please reindex your database.'); + return callback(err); + } + + self.tip = tip; + var blockIndex = self.node.services.bitcoind.getBlockIndex(self.tip.hash); + if(!blockIndex) { + return callback(new Error('Could not get height for tip.')); + } + self.tip.__height = blockIndex.height; + callback(); + }); + }); +}; + +/** + * Will get a block from bitcoind and give a Bitcore Block + * @param {String|Number} hash - A block hash or block height + */ +DB.prototype.getBlock = function(hash, callback) { + this.node.services.bitcoind.getBlock(hash, callback); +}; + +/** + * Get block hashes between two timestamps + * @param {Number} high - high timestamp, in seconds, inclusive + * @param {Number} low - low timestamp, in seconds, inclusive + * @param {Function} callback + */ +DB.prototype.getBlockHashesByTimestamp = function(high, low, callback) { + var self = this; + var hashes = []; + var lowKey; + var highKey; + + try { + lowKey = this._encodeBlockIndexKey(low); + highKey = this._encodeBlockIndexKey(high); + } catch(e) { + return callback(e); + } + + var stream = this.store.createReadStream({ + gte: lowKey, + lte: highKey, + reverse: true, + valueEncoding: 'binary', + keyEncoding: 'binary' + }); + + stream.on('data', function(data) { + hashes.push(self._decodeBlockIndexValue(data.value)); + }); + + var error; + + stream.on('error', function(streamError) { + if (streamError) { + error = streamError; + } + }); + + stream.on('close', function() { + if (error) { + return callback(error); + } + callback(null, hashes); + }); + + return stream; +}; + +/** + * Will give a Bitcore Transaction from bitcoind by txid + * @param {String} txid - A transaction hash + * @param {Boolean} queryMempool - Include the mempool + * @param {Function} callback + */ +DB.prototype.getTransaction = function(txid, queryMempool, callback) { + this.node.services.bitcoind.getTransaction(txid, queryMempool, function(err, txBuffer) { + if (err) { + return callback(err); + } + if (!txBuffer) { + return callback(new errors.Transaction.NotFound()); + } + + callback(null, Transaction().fromBuffer(txBuffer)); + }); +}; + +/** + * Will give a Bitcore Transaction and populated information about the block included. + * @param {String} txid - A transaction hash + * @param {Boolean} queryMempool - Include the mempool + * @param {Function} callback + */ +DB.prototype.getTransactionWithBlockInfo = function(txid, queryMempool, callback) { + this.node.services.bitcoind.getTransactionWithBlockInfo(txid, queryMempool, function(err, obj) { + if (err) { + return callback(err); + } + + var tx = Transaction().fromBuffer(obj.buffer); + tx.__blockHash = obj.blockHash; + tx.__height = obj.height; + tx.__timestamp = obj.timestamp; + + callback(null, tx); + }); +}; + +/** + * Will send a transaction to the Bitcoin network. + * @param {Transaction} tx - An instance of a Bitcore Transaction + * @param {Function} callback + */ +DB.prototype.sendTransaction = function(tx, callback) { + var txString; + if (tx instanceof Transaction) { + txString = tx.serialize(); + } else { + txString = tx; + } + + try { + var txid = this.node.services.bitcoind.sendTransaction(txString); + return callback(null, txid); + } catch(err) { + return callback(err); + } +}; + +/** + * Will estimate fees for a transaction and give a result in + * satoshis per kilobyte. Similar to the bitcoind estimateFee method. + * @param {Number} blocks - The number of blocks for the transaction to be included. + * @param {Function} callback + */ +DB.prototype.estimateFee = function(blocks, callback) { + var self = this; + setImmediate(function() { + callback(null, self.node.services.bitcoind.estimateFee(blocks)); + }); +}; + +/** + * Called by the Bus to determine the available events. + */ +DB.prototype.getPublishEvents = function() { + return [ + { + name: 'db/transaction', + scope: this, + subscribe: this.subscribe.bind(this, 'transaction'), + unsubscribe: this.unsubscribe.bind(this, 'transaction') + }, + { + name: 'db/block', + scope: this, + subscribe: this.subscribe.bind(this, 'block'), + unsubscribe: this.unsubscribe.bind(this, 'block') + } + ]; +}; + +DB.prototype.subscribe = function(name, emitter) { + this.subscriptions[name].push(emitter); +}; + +DB.prototype.unsubscribe = function(name, emitter) { + var index = this.subscriptions[name].indexOf(emitter); + if (index > -1) { + this.subscriptions[name].splice(index, 1); + } +}; + +/** + * Will give the previous hash for a block. + * @param {String} blockHash + * @param {Function} callback + */ +DB.prototype.getPrevHash = function(blockHash, callback) { + var blockIndex = this.node.services.bitcoind.getBlockIndex(blockHash); + setImmediate(function() { + if (blockIndex) { + callback(null, blockIndex.prevHash); + } else { + callback(new Error('Could not get prevHash, block not found')); + } + }); +}; + +/** + * Connects a block to the database and add indexes + * @param {Block} block - The bitcore block + * @param {Function} callback + */ +DB.prototype.connectBlock = function(block, callback) { + log.debug('DB handling new chain block'); + this.runAllBlockHandlers(block, true, callback); +}; + +/** + * Disconnects a block from the database and removes indexes + * @param {Block} block - The bitcore block + * @param {Function} callback + */ +DB.prototype.disconnectBlock = function(block, callback) { + log.debug('DB removing chain block'); + this.runAllBlockHandlers(block, false, callback); +}; + +/** + * Will collect all database operations for a block from other services that implement + * `blockHandler` methods and then save operations to the database. + * @param {Block} block - The bitcore block + * @param {Boolean} add - If the block is being added/connected or removed/disconnected + * @param {Function} callback + */ +DB.prototype.runAllBlockHandlers = function(block, add, callback) { + var self = this; + var operations = []; + + // Notify block subscribers + for (var i = 0; i < this.subscriptions.block.length; i++) { + this.subscriptions.block[i].emit('db/block', block.hash); + } + + // Update tip + var tipHash = add ? new Buffer(block.hash, 'hex') : BufferUtil.reverse(block.header.prevHash); + operations.push({ + type: 'put', + key: DB.PREFIXES.TIP, + value: tipHash + }); + + // Update block index + operations.push({ + type: add ? 'put' : 'del', + key: this._encodeBlockIndexKey(block.header.timestamp), + value: this._encodeBlockIndexValue(block.hash) + }); + + async.eachSeries( + this.node.services, + function(mod, next) { + if(mod.blockHandler) { + $.checkArgument(typeof mod.blockHandler === 'function', 'blockHandler must be a function'); + + mod.blockHandler.call(mod, block, add, function(err, ops) { + if (err) { + return next(err); + } + if (ops) { + $.checkArgument(Array.isArray(ops), 'blockHandler for ' + mod.name + ' returned non-array'); + operations = operations.concat(ops); + } + next(); + }); + } else { + setImmediate(next); + } + }, + function(err) { + if (err) { + return callback(err); + } + + log.debug('Updating the database with operations', operations); + self.store.batch(operations, callback); + } + ); +}; + +DB.prototype._encodeBlockIndexKey = function(timestamp) { + $.checkArgument(timestamp >= 0 && timestamp <= 4294967295, 'timestamp out of bounds'); + var timestampBuffer = new Buffer(4); + timestampBuffer.writeUInt32BE(timestamp); + return Buffer.concat([DB.PREFIXES.BLOCKS, timestampBuffer]); +}; + +DB.prototype._encodeBlockIndexValue = function(hash) { + return new Buffer(hash, 'hex'); +}; + +DB.prototype._decodeBlockIndexValue = function(value) { + return value.toString('hex'); +}; + +/** + * This function will find the common ancestor between the current chain and a forked block, + * by moving backwards on both chains until there is a meeting point. + * @param {Block} block - The new tip that forks the current chain. + * @param {Function} done - A callback function that is called when complete. + */ +DB.prototype.findCommonAncestor = function(block, done) { + + var self = this; + + var mainPosition = self.tip.hash; + var forkPosition = block.hash; + + var mainHashesMap = {}; + var forkHashesMap = {}; + + mainHashesMap[mainPosition] = true; + forkHashesMap[forkPosition] = true; + + var commonAncestor = null; + + async.whilst( + function() { + return !commonAncestor; + }, + function(next) { + + if(mainPosition) { + var mainBlockIndex = self.node.services.bitcoind.getBlockIndex(mainPosition); + if(mainBlockIndex && mainBlockIndex.prevHash) { + mainHashesMap[mainBlockIndex.prevHash] = true; + mainPosition = mainBlockIndex.prevHash; + } else { + mainPosition = null; + } + } + + if(forkPosition) { + var forkBlockIndex = self.node.services.bitcoind.getBlockIndex(forkPosition); + if(forkBlockIndex && forkBlockIndex.prevHash) { + forkHashesMap[forkBlockIndex.prevHash] = true; + forkPosition = forkBlockIndex.prevHash; + } else { + forkPosition = null; + } + } + + if(forkPosition && mainHashesMap[forkPosition]) { + commonAncestor = forkPosition; + } + + if(mainPosition && forkHashesMap[mainPosition]) { + commonAncestor = mainPosition; + } + + if(!mainPosition && !forkPosition) { + return next(new Error('Unknown common ancestor')); + } + + setImmediate(next); + }, + function(err) { + done(err, commonAncestor); + } + ); +}; + +/** + * This function will attempt to rewind the chain to the common ancestor + * between the current chain and a forked block. + * @param {Block} block - The new tip that forks the current chain. + * @param {Function} done - A callback function that is called when complete. + */ +DB.prototype.syncRewind = function(block, done) { + + var self = this; + + self.findCommonAncestor(block, function(err, ancestorHash) { + if (err) { + return done(err); + } + log.warn('Reorg common ancestor found:', ancestorHash); + // Rewind the chain to the common ancestor + async.whilst( + function() { + // Wait until the tip equals the ancestor hash + return self.tip.hash !== ancestorHash; + }, + function(removeDone) { + + var tip = self.tip; + + // TODO: expose prevHash as a string from bitcore + var prevHash = BufferUtil.reverse(tip.header.prevHash).toString('hex'); + + self.getBlock(prevHash, function(err, previousTip) { + if (err) { + removeDone(err); + } + + // Undo the related indexes for this block + self.disconnectBlock(tip, function(err) { + if (err) { + return removeDone(err); + } + + // Set the new tip + previousTip.__height = self.tip.__height - 1; + self.tip = previousTip; + self.emit('removeblock', tip); + removeDone(); + }); + + }); + + }, done + ); + }); +}; + +/** + * This function will synchronize additional indexes for the chain based on + * the current active chain in the bitcoin daemon. In the event that there is + * a reorganization in the daemon, the chain will rewind to the last common + * ancestor and then resume syncing. + */ +DB.prototype.sync = function() { + var self = this; + + if (self.bitcoindSyncing || self.node.stopping || !self.tip) { + return; + } + + self.bitcoindSyncing = true; + + var height; + + async.whilst(function() { + height = self.tip.__height; + return height < self.node.services.bitcoind.height && !self.node.stopping; + }, function(done) { + self.node.services.bitcoind.getBlock(height + 1, function(err, block) { + if (err) { + return done(err); + } + + // TODO: expose prevHash as a string from bitcore + var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex'); + + if (prevHash === self.tip.hash) { + + // This block appends to the current chain tip and we can + // immediately add it to the chain and create indexes. + + // Populate height + block.__height = self.tip.__height + 1; + + // Create indexes + self.connectBlock(block, function(err) { + if (err) { + return done(err); + } + self.tip = block; + log.debug('Chain added block to main chain'); + self.emit('addblock', block); + setImmediate(done); + }); + } else { + // This block doesn't progress the current tip, so we'll attempt + // to rewind the chain to the common ancestor of the block and + // then we can resume syncing. + log.warn('Beginning reorg! Current tip: ' + self.tip.hash + '; New tip: ' + block.hash); + self.syncRewind(block, function(err) { + if(err) { + return done(err); + } + + log.warn('Reorg complete. New tip is ' + self.tip.hash); + done(); + }); + } + }); + }, function(err) { + if (err) { + Error.captureStackTrace(err); + return self.node.emit('error', err); + } + + if(self.node.stopping) { + self.bitcoindSyncing = false; + return; + } + + if (self.node.services.bitcoind.isSynced()) { + self.bitcoindSyncing = false; + self.node.emit('synced'); + } else { + self.bitcoindSyncing = false; + } + + }); + +}; + +module.exports = DB; diff --git a/lib/transaction.js b/lib/transaction.js new file mode 100644 index 00000000..f55df37b --- /dev/null +++ b/lib/transaction.js @@ -0,0 +1,63 @@ +'use strict'; + +var async = require('async'); +var levelup = require('levelup'); +var bitcore = require('bitcore-lib'); +var Transaction = bitcore.Transaction; + +var MAX_TRANSACTION_LIMIT = 5; + +Transaction.prototype.populateInputs = function(db, poolTransactions, callback) { + var self = this; + + if(this.isCoinbase()) { + return setImmediate(callback); + } + + async.eachLimit( + this.inputs, + db.maxTransactionLimit || MAX_TRANSACTION_LIMIT, + function(input, next) { + self._populateInput(db, input, poolTransactions, next); + }, + callback + ); +}; + +Transaction.prototype._populateInput = function(db, input, poolTransactions, callback) { + if (!input.prevTxId || !Buffer.isBuffer(input.prevTxId)) { + return callback(new Error('Input is expected to have prevTxId as a buffer')); + } + var txid = input.prevTxId.toString('hex'); + db.getTransaction(txid, true, function(err, prevTx) { + if(err instanceof levelup.errors.NotFoundError) { + // Check the pool for transaction + for(var i = 0; i < poolTransactions.length; i++) { + if(txid === poolTransactions[i].hash) { + input.output = poolTransactions[i].outputs[input.outputIndex]; + return callback(); + } + } + + return callback(new Error('Previous tx ' + input.prevTxId.toString('hex') + ' not found')); + } else if(err) { + callback(err); + } else { + input.output = prevTx.outputs[input.outputIndex]; + callback(); + } + }); +}; + +Transaction.prototype._checkSpent = function(db, input, poolTransactions, callback) { + // TODO check and see if another transaction in the pool spent the output + db.isSpentDB(input, function(spent) { + if(spent) { + return callback(new Error('Input already spent')); + } else { + callback(); + } + }); +}; + +module.exports = Transaction; diff --git a/package.json b/package.json index 4e02b7cf..c671afdb 100644 --- a/package.json +++ b/package.json @@ -52,8 +52,11 @@ "commander": "^2.8.1", "errno": "^0.1.4", "express": "^4.13.3", + "leveldown": "bitpay/leveldown#bitpay-1.4.4", + "levelup": "^1.3.1", "liftoff": "^2.2.0", "lru-cache": "^4.0.1", + "memdown": "^1.0.0", "mkdirp": "0.5.0", "path-is-absolute": "^1.0.0", "semver": "^5.0.1",