diff --git a/lib/services/mempool.js b/lib/services/mempool.js new file mode 100644 index 00000000..f1ac531d --- /dev/null +++ b/lib/services/mempool.js @@ -0,0 +1,312 @@ +'use strict'; +var BaseService = require('../Service'); +var util = require('util'); +var EventEmitter = require('events').EventEmitter; +var bitcore = require('bitcore-lib'); +var MempoolService = function(options) { + EventEmitter.call(this); + + this.node = options.node; + this.name = options.name; + this._txIndex = {}; + this._addressIndex = {}; + this.store = this.nodes.services.db.store; + this._handleBlocks = false; +}; + +util.inherits(MempoolService, BaseService); + +/** + * Describes the dependencies that should be loaded before this service. + */ +MempoolService.dependencies = [ 'bitcoind', 'db' ]; + +MempoolService.prototype.blockHandler = function(block, connectBlock, callback) { + var self = this; + + if (!self._handleBlocks) { + return setImmediate(callback); + } + + var txs = block.transactions; + + var action = 'del'; + if (!connectBlock) { + action = 'put'; + } + + var transactionLength = txs.length; + for(var i = 0; i < txs.length; i++) { + + var tx = txs[i]; + self._updateMempool(tx, action); + } + setImmediate(callback); +}; + +MempoolService.prototype.getPublishEvents = function() { + return []; +}; + +MempoolService.prototype.getAPIMethods = function() { + return []; +}; + +MempoolService.prototype.start = function(callback) { + var self = this; + self.node.services.db.on('synced', function() { + var bus = self.node.openBus({ remoteAddress: 'localhost' }); + bus.subscribe('bitcoind/rawtransaction'); + + bus.on('bitcoind/rawtransaction', function(txHex) { + var tx = new bitcore.Transaction(txHex); + self._updateMempool(tx, 'put'); + }); + self._handleBlocks = true; + }); + self.node.services.db.getPrefix(self.name, function(err, servicePrefix) { + if(err) { + return callback(err); + } + self.servicePrefix = servicePrefix; + self._encoding = new Encoding(self.servicePrefix); + callback(); + }); +}; + +/** + * Function to be called when bitcore-node is stopped + */ +MempoolService.prototype.stop = function(done) { + setImmediate(done); +}; + +/** + * Setup express routes + * @param {Express} app + */ +MempoolService.prototype.setupRoutes = function() { + // Setup express routes here +}; + +MempoolService.prototype.getRoutePrefix = function() { + return this.name; +}; + +MempoolService.prototype._getTransactionAddressDetailOperations = function(tx, action) { + var self = this; + var operations = []; + + var txid = tx.id; + var inputs = tx.inputs; + var outputs = tx.outputs; + + var outputLength = outputs.length; + for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { + var output = outputs[outputIndex]; + + var script = output.script; + + if(!script) { + log.debug('Invalid script'); + continue; + } + + var address = self..getAddressString(script); + + if(!address) { + continue; + } + + var key = self.encoding.encodeMempoolAddressIndexKey(address, txid); + operations.push({ + type: action, + key: key + }); + + } + + if(tx.isCoinbase()) { + continue; + } + + //TODO deal with P2PK + async.eachLimit(inputs, 10, function(input, next) { + + if(!input.script) { + log.debug('Invalid script'); + return next(); + } + + var inputAddress = self.getAddressString(input.script); + + if(!inputAddress) { + return next(); + } + + var inputKey = self.encodeMempoolAddressIndexKey(inputAddress, txid); + + operations.push({ + type: action, + key: inputKey + }); + + } + return operations; +}; + +MempoolService.prototype.getAddressString = function(script, output) { + var address = script.toAddress(); + if(address) { + return address.toString(); + } + + try { + var pubkey = script.getPublicKey(); + if(pubkey) { + return pubkey.toString('hex'); + } + } catch(e) { + //log.warn('Error getting public key from: ', script.toASM(), script.toHex()); + // if there is an error, it's because a pubkey can not be extracted from the script + // continue on and return null + } + + //TODO add back in P2PK, but for this we need to look up the utxo for this script + if(output && output.script && output.script.isPublicKeyOut()) { + return output.script.getPublicKey().toString('hex'); + } + + //log.warn('No utxo given for script spending a P2PK: ', script.toASM(), script.toHex()); + return null; +}; + +MempoolService.prototype._getInputValues = function(tx, callback) { + var self = this; + + if (tx.isCoinbase()) { + return callback(null, []); + } + + async.mapLimit(tx.inputs, this.concurrency, function(input, next) { + self.getTransaction(input.prevTxId.toString('hex'), {}, function(err, prevTx) { + if(err) { + return next(err); + } + if (!prevTx) { + return next(null, 0); + } + if (!prevTx.outputs[input.outputIndex]) { + return next(new Error('Input did not have utxo.')); + } + var satoshis = prevTx.outputs[input.outputIndex].satoshis; + next(null, satoshis); + }); + }, callback); +}; + + +MempoolService.prototype._updateCache = function(operation) { + if (operation.type === 'del') { + return this._cache.del(operation.key); + } + this._cache.set(operation.key, operation.value); +}; + +MempoolService.prototype.getTransaction = function(txid, callback) { + var self = this; + var key = self._encoding.encodeMempoolTransactionKey(txid); + var txBuffer = self._cache.get(key); + if (txBuffer) { + return setImmediate(function() { + callback(null, self._encoding.decodeMempoolTransactionValue(txBuffer)); + }); + } + self.store.get(key, function(err, value) { + if(err) { + return callback(err); + } + callback(null, self._encoding.decodeMempoolTransactionValue(value)); + }); +}; + +MempoolService.prototype._getTransactionOperation = function(tx, action, callback) { + var self = this; + self._getInputValues(tx, function(err, inputValues) { + if(err) { + return callback(err); + } + tx.__inputValues = inputValues; + var operation = { + type: action, + key: self._encoding.encodeMempoolTransactionKey(tx.id), + value: self._encoding.encodeMempoolTransactionValue(tx) + }; + callback(null, operation); + }); +}; + +MempoolService.prototype._updateMempool = function(tx, action, callback) { + var self = this; + self._getTransactionOperation(tx, action, function(err, operation) { + if(err) { + return callback(err); + } + var operations = [operation].concat(self._getTransactionAddressDetailOperations(tx, action)); + self.store.batch(operations, function(err) { + if(err) { + log.error('batch operation for updating Mempool failed.'); + return; + } + operations.forEach(self._updateCache); + }); + }); +}; + +MempoolService.prototype.getTransactionsByAddress = function(address, callback) { + var self = this; + var txids = []; + var maxTxid = new Buffer(new Array(65).join('f'), 'hex'); + var start = self._encoding.encodeMempoolAddressKey(address); + var end = Buffer.concat([self._encoding.encodeMempoolAddressKey(address), maxTxid]); + var stream = self.store.createKeyStream({ + gte: start, + lte: end + }); + var streamErr; + stream.on('error', function(err) { + streamErr = err; + }); + stream.on('data', function(data) { + var key = self._encoding.decodeMempoolAddressKey(data); + txids.push(key.txid); + }); + stream.on('end', function() { + async.mapLimit(txids, 10, function(txid, next) { + self.getTransaction(txid, next); + }, callback); + }); +}; + +MempoolService.prototype.getTransactionsByAddresses = function(addresses, callback) { + var transactions = {}; + async.eachLimit(addresses, 10, function(address, next) { + self.getTransactionsByAddress(address, function(err, txs) { + if(err) { + return next(err); + } + txs.forEach(function(tx) { + transactions[tx.id] = tx; + }); + next(); + }); + }, function(err) { + if(err) { + return callback(err); + } + callback(null, Object.values(transactions)); + }); +}; + +module.exports = MempoolService; + diff --git a/lib/services/transaction.js b/lib/services/transaction.js index 31ec1877..f2630dad 100644 --- a/lib/services/transaction.js +++ b/lib/services/transaction.js @@ -9,7 +9,7 @@ var Encoding = require('../encoding'); var LRU = require('lru-cache'); var utils = require('./wallet-api/utils'); var index = require('../'); -var log = index.log; + /** * The Transaction Service builds upon the Database Service and the Bitcoin Service to add additional * functionality for getting information by bitcoin transaction hash/id. This includes the current @@ -21,24 +21,6 @@ var log = index.log; function TransactionService(options) { BaseService.call(this, options); this.concurrency = options.concurrency || 20; - this.maxMemPoolSize = options.maxMemPoolSize || 100 * 1024 * 1024; // 100 MB - /* upon initialization of the mempool, only txids are obtained from - * a trusted bitcoin node (the bitcoind service's rpchost or p2p option) - * Since, the mempool is very temporal, I see no reason to take the - * resources to gather the actual tx data. Any NEW txs that arrive - * after the mempool is initialized will have their full tx data - * If a tx is queried from the mempool and it does not have tx data, - * then call to the trusted bitcoind will take place and the result set. - */ - this._mempool = LRU({ - max: utils.parseByteCount(this.maxMemPoolSize), - length: function(tx) { - if (tx) { - return tx.toBuffer().length; - } - return 1; - } - }); this.currentTransactions = {}; } @@ -46,7 +28,8 @@ inherits(TransactionService, BaseService); TransactionService.dependencies = [ 'db', - 'timestamp' + 'timestamp', + 'mempool' ]; TransactionService.prototype.start = function(callback) { @@ -54,39 +37,13 @@ TransactionService.prototype.start = function(callback) { self.store = this.node.services.db.store; - var bus = self.node.openBus({ remoteAddress: 'localhost' }); - bus.subscribe('bitcoind/rawtransaction'); - - bus.on('bitcoind/rawtransaction', function(txHex) { - var tx = new bitcore.Transaction(txHex); - self._updateMempool(tx); - }); - - async.series([ - function(next) { - self.node.services.bitcoind.getMempool(function(err, txidList) { - if(err) { - return next(err); - } - for(var i = 0; i < txidList.length; i++) { - self._updateMempool(txidList[i]); - } - next(); - }); - }, function(next) { - self.node.services.db.getPrefix(self.name, function(err, prefix) { - if(err) { - return callback(err); - } - self.prefix = prefix; - self.encoding = new Encoding(self.prefix); - next(); - }); - }], function(err) { - if(err) { - return callback(err); - } - callback(); + self.node.services.db.getPrefix(self.name, function(err, prefix) { + if(err) { + return callback(err); + } + self.prefix = prefix; + self.encoding = new Encoding(self.prefix); + callback(); }); }; @@ -94,31 +51,6 @@ TransactionService.prototype.stop = function(callback) { setImmediate(callback); }; -//TODO should we maintain a mempool whilst bitcoind is syncing from a start up to latest height? -TransactionService.prototype._updateMempool = function(tx, action) { - if (action === 'del') { - return this._mempool.del(tx.id); - } - var val, key; - if (_.isString(tx)) { - val = false; - key = tx; - } else { - val = tx; - key = tx.id; - } - this._mempool.set(key, val); - return this._checkMempoolSize(); -}; - -TransactionService.prototype._checkMempoolSize = function() { - var warningPercentage = 80; - var percentage = this._mempool.length / this.maxMemPoolSize * 100.0; - if (percentage >= warningPercentage) { - log.warn('Mempool size has reached ' + percentage + '% of the max size of the mempool (' + this.maxMemPoolSize + 'bytes)'); - } -}; - TransactionService.prototype.blockHandler = function(block, connectBlock, callback) { var self = this; var action = 'put'; @@ -175,6 +107,33 @@ TransactionService.prototype.blockHandler = function(block, connectBlock, callba }); }; +TransactionService.prototype._getMissingInputValues = function(tx, callback) { + var self = this; + + if (tx.isCoinbase()) { + return callback(null, []); + } + + async.eachOf(tx.inputs, function(input, index, next) { + if (tx.__inputValues[index]) { + return next(); + } + self.getTransaction(input.prevTxId.toString('hex'), {}, function(err, prevTx) { + if(err) { + return next(err); + } + if (!prevTx) { + return next(new Error('previous Tx missing.')); + } + if (!prevTx.outputs[input.outputIndex]) { + return next(new Error('Input did not have utxo.')); + } + var satoshis = prevTx.outputs[input.outputIndex].satoshis; + tx.__inputValues[index] = satoshis; + next(); + }); + }, callback); +}; TransactionService.prototype._getInputValues = function(tx, callback) { var self = this; @@ -206,32 +165,34 @@ TransactionService.prototype.getTransaction = function(txid, options, callback) }); } - var memPoolTx = self._mempool.get(txid); - if (options && options.queryMempool && memPoolTx) { - return setImmediate(function() { - callback(null, memPoolTx); - }); - } else if (memPoolTx === false) { - self.node.services.bitcoind.getTransaction(txid, function(err, tx) { - if(err) { - return callback(err); - } - self._updateMempool(tx); - callback(null, tx); - }); - } - - //transaction was not in our mempool, lookup in the database var key = self.encoding.encodeTransactionKey(txid); - self.node.services.db.store.get(key, function(err, buffer) { - if(err) { - return callback(err); - } - - var tx = self.encoding.decodeTransactionValue(buffer); - callback(null, tx); - }); + async.waterfall([ + function(next) { + self.node.services.db.store.get(key, function(err, buffer) { + if(err) { + return callback(err); + } + var tx = self.encoding.decodeTransactionValue(buffer); + callback(null, tx); + }); + }, function(next, tx) { + if (tx) { + return next(null, tx); + } + if (!options || !options.queryMempool) { + return next(); + } + self.node.services.mempool.getTransaction(txid, function(err, tx) { + if(err) { + return next(err); + } + if (!tx) { + return next(); + } + self._getMissingInputValues(tx, next); + }); + }], callback); }; diff --git a/lib/services/wallet-api/index.js b/lib/services/wallet-api/index.js index 636062d5..1725feac 100644 --- a/lib/services/wallet-api/index.js +++ b/lib/services/wallet-api/index.js @@ -569,10 +569,25 @@ WalletService.prototype._getTransactions = function(walletId, options, callback) end: options.end || Math.pow(2, 32) - 1 }; var key = walletId + opts.start + opts.end; + function finish(transactions) { var from = options.from || 0; var to = options.to || transactions.length; - callback(null, transactions.slice(from, to), transactions.length); + if (!options.queryMempool) { + return callback(null, transactions.slice(from, to), transactions.length); + } + self._getAddresses(walletId, function(err, addresses) { + if(err) { + return callback(err); + } + self.mempool.getTransactionsByAddresses(addresses, function(err, mempoolTxs) { + if(err) { + return callback(err); + } + transactions = transaction.concat(mempoolTxs); + callback(null, transactions.slice(from, to), transactions.length); + }); + }); } if (!self._cache.peek(key)) { @@ -650,8 +665,17 @@ WalletService.prototype._getTransactions = function(walletId, options, callback) }; WalletService.prototype._getAddresses = function(walletId, callback) { - var key = this._encoding.encodeWalletAddressKey(walletId); - this.store.get(key, callback); + var self = this; + var key = self._encoding.encodeWalletAddressKey(walletId); + self.store.get(key, function(err, value) { + if(err) { + return callback(err); + } + if (!value) { + return callback(null, []); + } + callback(null, self._encoding.decodeWalletAddressValue(value)); + }); }; WalletService.prototype._importAddresses = function(walletId, addresses, callback) {