From c7c268f00a4e8ce31e7e34d111c4ae2e16a29682 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Thu, 5 Oct 2017 16:18:16 -0400 Subject: [PATCH] Adding search mempool by address. --- lib/services/address/index.js | 191 ++++++++++++++++++++-------------- lib/services/block/index.js | 2 +- lib/services/header/index.js | 24 ++++- lib/services/mempool/index.js | 73 ++++++++++++- 4 files changed, 209 insertions(+), 81 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 1985e7ed..d4caff8e 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -11,6 +11,7 @@ var _ = bitcore.deps._; var Encoding = require('./encoding'); var Transform = require('stream').Transform; var assert = require('assert'); +var Stream = require('stream'); var AddressService = function(options) { @@ -22,6 +23,7 @@ var AddressService = function(options) { this._transaction = this.node.services.transaction; this._network = this.node.network; this._db = this.node.services.db; + this._mempool = this.node.services.mempool; if (this._network === 'livenet') { this._network = 'main'; @@ -39,7 +41,8 @@ AddressService.dependencies = [ 'block', 'header', 'transaction', - 'timestamp' + 'timestamp', + 'mempool' ]; // ---- public function prototypes @@ -156,13 +159,13 @@ AddressService.prototype.getAddressSummary = function(address, options, callback self._tx.getTransaction(key.txid, options, function(err, tx) { if(err) { - log.error(err); + log.error('Address Service: gettransaction ' + err); txStream.emit('error', err); return; } if (!tx) { - log.error('Could not find tx for txid: ' + key.txid + '. This should not be possible, check indexes.'); + log.error('Address Service: Could not find tx for txid: ' + key.txid + '. This should not be possible, check indexes.'); txStream.emit('error', new Error('Txid should map to a tx.')); return; } @@ -201,7 +204,7 @@ AddressService.prototype.getAddressSummary = function(address, options, callback }; txStream.on('error', function(err) { - log.error(err); + log.error('Address Service: txstream on error ' + err); txStream.unpipe(); }); @@ -295,28 +298,12 @@ AddressService.prototype.stop = function(callback) { setImmediate(callback); }; +AddressService.prototype._getTxidStream = function(address, options) { -// ---- start private function prototypes -AddressService.prototype._getAddressHistory = function(address, options, callback) { - - var self = this; - - options = options || {}; - options.start = options.start || 0; - options.end = options.end || 0xffffffff; - - var endHeightBuf = new Buffer(4); - endHeightBuf.writeUInt32BE(options.end); - - if (_.isUndefined(options.queryMempool)) { - options.queryMempool = true; - } - - var results = []; - var start = self._encoding.encodeAddressIndexKey(address, options.start); + var start = this._encoding.encodeAddressIndexKey(address, options.start); var end = Buffer.concat([ start.slice(0, address.length + 4), - endHeightBuf, + options.endHeightBuf, new Buffer(new Array(83).join('f'), 'hex') ]); @@ -326,81 +313,72 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac }; // txid stream - var txidStream = self._db.createKeyStream(criteria); + var txidStream = this._db.createKeyStream(criteria); txidStream.on('close', function() { txidStream.unpipe(); }); +}; - // tx stream - var txStream = new Transform({ objectMode: true, highWaterMark: 1000 }); +AddressService.prototype._transformTxForAddressHistory = function(opts, chunk, enc, callback) { - var streamErr; - txStream.on('end', function() { + var self = this; + var key = self._encoding.decodeAddressIndexKey(chunk); - if (streamErr) { - return callback(streamErr); + self._tx.getTransaction(key.txid, opts, function(err, tx) { + + if (err) { + log.error('Address Service: gettransaction ' + err); + opts.stream.emit('error', err); + return callback(); } - callback(null, results); + if (!tx) { + log.error('Address Service: Could not find tx for txid: ' + key.txid + '. This should not be possible, check indexes.'); + opts.stream.emit('error', err); + return callback(); + } - }); + assert(tx.__height >- 0, 'tx must have a height'); + self._header.getBlockHeader(tx.__height, function(err, hash) { - // pipe txids into tx stream for processing - txidStream.pipe(txStream); - - txStream._transform = function(chunk, enc, callback) { - - var key = self._encoding.decodeAddressIndexKey(chunk); - - self._tx.getTransaction(key.txid, options, function(err, tx) { - - if (err) { - log.error(err); - txStream.emit('error', err); + if(err) { + log.error('Address Service: getblockheader ' + err); + opts.stream.emit('error', err); return callback(); } - if (!tx) { - log.error('Could not find tx for txid: ' + key.txid + '. This should not be possible, check indexes.'); - txStream.emit('error', err); - return callback(); - } - - assert(tx.__height >- 0, 'tx must have a height'); - self._header.getBlockHeader(tx.__height, function(err, hash) { - - if(err) { - log.error(err); - txStream.emit('error', err); - return callback(); - } - - tx.__blockhash = hash; - - var outputSatoshis = 0; - tx.outputs.forEach(function(output) { - outputSatoshis += output.value; - }); - - var inputSatoshis = 0; - tx.__inputValues.forEach(function(value) { - inputSatoshis += value; - }); - - tx.__outputSatoshis = outputSatoshis; - tx.__inputSatoshis = inputSatoshis; - results.push(tx); - callback(); + tx.__blockhash = hash; + var outputSatoshis = 0; + tx.outputs.forEach(function(output) { + outputSatoshis += output.value; }); + var inputSatoshis = 0; + tx.__inputValues.forEach(function(value) { + inputSatoshis += value; + }); + + tx.__outputSatoshis = outputSatoshis; + tx.__inputSatoshis = inputSatoshis; + opts.results.push(tx); + callback(); + }); - }; + }); + +}; + +AddressService.prototype._getTxStream = function(address, options) { + + var txStream = new Transform({ objectMode: true, highWaterMark: 1000 }); + + options.stream = txStream; txStream.on('error', function(err) { - log.error(err); + log.error('Address Service: txstream err: ' + err); txStream.unpipe(); }); @@ -409,6 +387,65 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac callback(); }; + txStream._transform = this._transformTxForAddressHistory.bind(this, options); + +}; + +AddressService.prototype._getAddressHistory = function(address, options, callback) { + + var self = this; + + options = options || {}; + options.start = options.start || 0; + options.end = options.end || 0xffffffff; + + options.endHeightBuf = new Buffer(4); + options.endHeightBuf.writeUInt32BE(options.end); + options.results = []; + + if (_.isUndefined(options.queryMempool)) { + options.queryMempool = true; + } + + async.waterfall([ + + function(next) { + + if (!options.queryMempool) { + return next(); + } + + self._mempool.getTxidsByAddress(address, next); + }, + + function(mempoolTxids, next) { + + console.log(mempoolTxids); + + var txStream = self._getTxStream(address, options); + txStream.on('end', function() { + return callback(null, options.results); + }); + + if (mempoolTxids) { + + var mempoolTxidStream = new Stream.Readable({ objectMode: true }); + mempoolTxidStream.pipe(txStream); + + mempoolTxids.forEach(function(txid) { + mempoolTxidStream.push(txid); + }); + + mempoolTxidStream.unpipe(); + + } + + var txidStream = self._getTxidStream(address, options); + txidStream.pipe(txStream); + next(); + } + ], callback); + }; AddressService.prototype._removeBlock = function(block, callback) { diff --git a/lib/services/block/index.js b/lib/services/block/index.js index d5f28072..1e21c71b 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -793,7 +793,7 @@ BlockService.prototype._saveBlock = function(block, callback) { BlockService.prototype._handleError = function(err) { if (!this.node.stopping) { - log.error(err); + log.error('Block Service: handle error ' + err); return this.node.stop(); } }; diff --git a/lib/services/header/index.js b/lib/services/header/index.js index ff3077f9..6cc97d5c 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -29,6 +29,7 @@ var HeaderService = function(options) { this._lastHeader = null; this._initialSync = true; this._originalHeight = 0; + this._lastHeaderCount = 2000; this._slowMode = options.slowMode; }; @@ -426,6 +427,20 @@ HeaderService.prototype._onHeaders = function(headers) { var self = this; + // TODO we could be reaching this because either our headers list is a multiple of 2000 + // or because our peer returned us an empty list because our checkpoint hash is not on + // its mainchain. + if (headers.length === 0) { + self._onHeadersSave(function(err) { + if (err) { + return self._handleError(err); + } + }); + } + + // used to tell the header sync loop when to stop + self._lastHeaderCount = headers.length; + if (self._headerInterval) { clearInterval(self._headerInterval); self._headerInterval = null; @@ -433,6 +448,10 @@ HeaderService.prototype._onHeaders = function(headers) { log.debug('Header Service: Received: ' + headers.length + ' header(s).'); + if (!headers[0]) { + return; + } + var dbOps = self._getDBOpForLastHeader(headers[0]); var transformedHeaders = self._transformHeaders(headers); @@ -549,7 +568,10 @@ HeaderService.prototype._startBlockSubscription = function() { HeaderService.prototype._syncComplete = function() { - return this._tip.height >= this._bestHeight; + // we always ask for the max number of headers, which is 2000. + // so any response with < 2000 means we have reached the end of the headers list. + // we could make an extra call if the number of total headers is multiple of 2000. + return this._lastHeaderCount < 2000; }; diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index d77e09cf..a9b53e47 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -10,6 +10,14 @@ var MempoolService = function(options) { this._subscriptions.transaction = []; this._db = this.node.services.db; this._p2p = this.node.services.p2p; + this._network = this.node.network; + + if (this._network === 'livenet') { + this._network = 'main'; + } + if (this._network === 'regtest') { + this._network = 'testnet'; + } }; util.inherits(MempoolService, BaseService); @@ -18,7 +26,8 @@ MempoolService.dependencies = ['db']; MempoolService.prototype.getAPIMethods = function() { var methods = [ - ['getMempoolTransaction', this, this.getMempoolTransaction, 1] + ['getMempoolTransaction', this, this.getMempoolTransaction, 1], + ['getTxidsByAddress', this, this.getTxidsByAddress, 1], ]; return methods; }; @@ -112,7 +121,6 @@ MempoolService.prototype.getMempoolTransaction = function(txid, callback) { var self = this; -console.log('query: ', txid); self._db.get(self._encoding.encodeMempoolTransactionKey(txid), function(err, tx) { if (err) { @@ -129,6 +137,67 @@ console.log('query: ', txid); }; +MempoolService.prototype.getTxidsByAddress = function(address, callback) { + // given an address, give me all the tx ids for txs that involve this address + // perceivably, our mempool will remain a managable size, so cycling over the whole + // pool won't be too spendy? + var self = this; + var results = []; + var start = self._encoding.encodeMempoolTransactionKey(new Array(65).join('0')); + var end = self._encoding.encodeMempoolTransactionKey(new Array(65).join('f')); + + var criteria = { + gte: start, + lte: end + }; + + var stream = self._db.createReadStream(criteria); + + stream.on('end', function() { + return callback(null, results); + }); + + stream.on('data', function(data) { + var tx = self._encoding.decodeMempoolTransactionValue(data.value); + var txid = self._involvesAddress(tx, address); + if (txid) { + results.push(txid); + } + }); + +}; + +MempoolService.prototype._involvesAddress = function(tx, address) { + + var _address; + for(var i = 0; i < tx.inputs.length; i++) { + var input = tx.inputs[i]; + _address = input.getAddress(); + if (!_address) { + continue; + } + _address.network = this._network; + _address = _address.toString(); + if (address === _address) { + return tx.txid(); + } + } + + for(i = 0; i < tx.outputs.length; i++) { + var output = tx.outputs[i]; + _address = output.getAddress(); + if (!_address) { + continue; + } + _address.network = this._network; + _address = _address.toString(); + if (address === _address) { + return tx.txid(); + } + } + +}; + MempoolService.prototype.stop = function(callback) { callback(); };