From 765b7288a72e2d4787e8d67694ee580ec78cc370 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Fri, 3 Nov 2017 18:23:42 -0400 Subject: [PATCH] wip on fix for txs list. --- lib/services/address/index.js | 144 ++++++++++++++++++++---------- lib/services/block/index.js | 2 +- lib/services/mempool/index.js | 38 ++++---- lib/services/transaction/index.js | 2 +- 4 files changed, 117 insertions(+), 69 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 2592b842..95744561 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -44,6 +44,10 @@ AddressService.dependencies = [ 'mempool' ]; +// this must return the to-from number of txs for ALL passed in addresses sort from latest txs to earliest +// for example if the query /api/addrs/txs?from=0&to=5&noAsm=1&noScriptSig=1&noSpent=1, and the addresses passed +// in are [addr1, addr2, addr3], then if addr3 has tx1 at height 10, addr2 has tx2 at height 9 and tx1 has no txs, +// then I would pass back [tx1, tx2] in that order AddressService.prototype.getAddressHistory = function(addresses, options, callback) { var self = this; @@ -51,6 +55,7 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba options = options || {}; options.from = options.from || 0; options.to = options.to || 0xffffffff; + options.txIdList = []; if (_.isUndefined(options.queryMempool)) { options.queryMempool = true; @@ -60,26 +65,34 @@ AddressService.prototype.getAddressHistory = function(addresses, options, callba addresses = [addresses]; } - async.mapSeries(addresses, function(address, next) { + async.eachLimit(addresses, 8, function(address, next) { - self._getAddressHistory(address, options, next); + self._getAddressTxidHistory(address, options, next); - }, function(err, txList) { + }, function(err) { if(err) { return callback(err); } - txList = utils.dedupByTxid(txList); - txList = utils.orderByConfirmations(txList); + // armed with our complete list of txids and heights, we will go and get the actual txs + self._getAddressTxHistory(options, function(err, txList) { - var results = { - totalCount: options.txCount || 0, - items: txList - }; + if (err) { + return callback(err); + } - callback(null, results); + txList = utils.dedupByTxid(txList); + txList = utils.orderByConfirmations(txList); + var results = { + totalCount: options.txiIdList.length || 0, + items: txList + }; + + callback(null, results); + + }); }); }; @@ -226,21 +239,30 @@ AddressService.prototype.getAddressUnspentOutputs = function(address, options, c return next(null, []); } - self._mempool.getTxsByAddress(address, 'output', next); + self._mempool.getTxidsByAddress(address, 'output', next); }, // if mempool utxos, then add them first - function(mempoolTxs, next) { + function(mempoolTxids, next) { - if (mempoolTxs.length <= 0) { + if (mempoolTxids.length <= 0) { return next(); } - mempoolTxs.forEach(function(tx) { - results = results.concat(self._getMempoolUtxos(tx, address)); - }); + async.eachLimit(mempoolTxids, 4, function(id, next) { - next(); + self._mempool.getMempoolTransaction(id.txid, function(err, tx) { + + if (err || !tx) { + return next(err || new Error('Address Service: missing tx: ' + id.txid)); + } + + results = results.concat(self._getMempoolUtxos(tx, address)); + next(); + + }); + + }); }, @@ -415,8 +437,41 @@ AddressService.prototype._getTxStream = function(address, options) { }; +AddressService.prototype._getAddressTxHistory = function(options, callback) { + + var self = this; + + // sort the txids by height ascending + var ids = _.sortBy(options.txIdList, function(item) { + return item.height; + }); + + // slice the txids based on pagination needs + ids = ids.slice(options.from, options.to); + + // go and get the actual txs + async.mapLimit(ids, function(id, next) { + + if (id.height === -1) { + return self._mempool.getMempoolTransaction(id.txid, function(err, tx) { + + if (err || !tx) { + return next(err || new Error('Address Service: could not find tx: ' + id.txid)); + } + + self._transaction.setTxMetaInfo(tx, options, next); + + }); + } + + self._transaction.getTransaction(id.txid, next); + + }, callback); + +}; + // main api function for insight-api/bws -AddressService.prototype._getAddressHistory = function(address, options, callback) { +AddressService.prototype._getAddressTxidHistory = function(address, options, callback) { var self = this; @@ -426,8 +481,6 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac options.endHeightBuf = new Buffer(4); options.endHeightBuf.writeUInt32BE(options.end); - options.results = []; - options.txCount = 0; // this tracks the number of txs in the record set for pagination if (_.isUndefined(options.queryMempool)) { options.queryMempool = true; @@ -442,48 +495,47 @@ AddressService.prototype._getAddressHistory = function(address, options, callbac return next(null, []); } - self._mempool.getTxsByAddress(address, 'both', next); + self._mempool.getTxidsByAddress(address, 'both', next); }, // add the meta data such as input values, etc. - function(mempoolTxs, next) { + function(mempoolTxids, next) { - if (mempoolTxs.length <= 0) { + if (mempoolTxids.length <= 0) { return next(); } - async.mapSeries(mempoolTxs, function(tx, next) { - self._transaction.setTxMetaInfo(tx, options, next); - }, function(err, txs) { - if (err) { - return next(err); - } - // what tx range are we looking for? - options.results = txs.slice(options.from, options.to); - next(); - }); + + options.txIdList = mempoolTxids; + next(); }, // stream the rest of the confirmed txids out of the address index function(next) { - if (options.results.length >= (options.to - options.from)) { - return callback(null, options.results); - } + var txIdTransformStream = new Transform({ objectMode: true }); - options.from += options.results.length; + txIdTransformStream._flush = function(callback) { + txIdTransformStream.emit('end'); + callback(); + }; - var txStream = self._getTxStream(address, options); - - txStream.on('end', function() { - return next(null, options.results); - }); - - txStream.on('error', function(err) { + txIdTransformStream.on('error', function(err) { log.error('Address Service: txstream err: ' + err); - txStream.unpipe(); + txIdTransformStream.unpipe(); }); + txIdTransformStream.on('end', function() { + next(null, options.txIdList); + }); + + txIdTransformStream._transform = function(chunk, enc, callback) { + var txInfo = self._encoding.decodeAddressIndexKey(chunk); + options.txIdList.push({ txid: txInfo.txid, height: txInfo.height }); + callback(); + }; + var txidStream = self._getTxidStream(address, options); - txidStream.pipe(txStream); + txidStream.pipe(txIdTransformStream); + } ], callback); diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 12b60c41..8d078a8a 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -29,7 +29,7 @@ var BlockService = function(options) { this._processingBlock = false; this._blocksInQueue = 0; - this._recentBlockHashesCount = options.recentBlockHashesCount || 1000; // block service won't reorg past this point + this._recentBlockHashesCount = options.recentBlockHashesCount || 144; // block service won't reorg past this point this._recentBlockHashes = new LRU(this._recentBlockHashesCount); this._readAheadBlockCount = options.readAheadBlockCount || 2; // this is the number of blocks to direct the p2p service to read aheead }; diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index fc8be565..89125ed9 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -30,7 +30,7 @@ MempoolService.dependencies = ['db']; MempoolService.prototype.getAPIMethods = function() { var methods = [ ['getMempoolTransaction', this, this.getMempoolTransaction, 1], - ['getTxsByAddress', this, this.getTxsByAddress, 2], + ['getTxidsByAddress', this, this.getTxsByAddress, 2], ]; return methods; }; @@ -53,35 +53,31 @@ MempoolService.prototype.start = function(callback) { }; MempoolService.prototype._flushMempool = function(callback) { + var self = this; + var totalCount = 0; log.warn('Mempool Service: flushing mempool, this could take a minute.'); - // TODO: just handle the txindex for now, later handle both txindex and addressindex - // TODO: plan a migration system for upgrades to the indexes - var ops = []; var criteria = { - gte: Buffer.concat([ self._encoding.servicePrefix, new Buffer(new Array(65).join('0'), 'hex')]), - lt: Buffer.concat([ self._encoding.servicePrefix, new Buffer(new Array(65).join('f'), 'hex')]) + gte: self._encoding.encodeMempoolTransactionKey(new Array(65).join('0')), + lte: self._encoding.encodeMempoolTransactionKey(new Array(65).join('f')) }; - var stream = self._db.createKeyStream(criteria); + var stream = self._db.createReadStream(criteria); - stream.on('data', function(key) { + stream.on('data', function(data) { + var ops = self._getAddressOperations(self._encoding.decodeMempoolTransactionValue(data.value)); ops.push({ type: 'del', - key: key + key: data.key }); + self._db.batch(ops); }); stream.on('end', function() { - self._db.batch(ops, function(err) { - if (err) { - return callback(err); - } - log.info('Mempool Service: completed flushing: ' + ops.length + ' mempool records.'); - callback(); - }); + log.info('Mempool Service: completed flushing: ' + totalCount + ' tx mempool records.'); + callback(); }); }; @@ -254,7 +250,7 @@ MempoolService.prototype.getMempoolTransaction = function(txid, callback) { }; -MempoolService.prototype.getTxsByAddress = function(address, type, callback) { +MempoolService.prototype.getTxidsByAddress = function(address, type, callback) { var self = this; var results = []; @@ -266,7 +262,7 @@ MempoolService.prototype.getTxsByAddress = function(address, type, callback) { lte: end }; - var stream = self._db.createReadStream(criteria); + var stream = self._db.createKeyStream(criteria); stream.on('error', function() { return []; @@ -276,15 +272,15 @@ MempoolService.prototype.getTxsByAddress = function(address, type, callback) { callback(null, results); }); - stream.on('data', function(data) { - var addressInfo = self._encoding.decodeMempoolAddressKey(data.key); + stream.on('data', function(key) { + var addressInfo = self._encoding.decodeMempoolAddressKey(key); if (type === 'input') { type = 1; } else if (type === 'output') { type = 0; } if (type === 'both' || type === addressInfo.input) { - results.push(self._encoding.decodeMempoolAddressValue(data.value)); + results.push({ txid: addressInfo.txid, height: -1 }); } }); diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index 4235683f..cb0a6c19 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -154,7 +154,7 @@ TransactionService.prototype.setTxMetaInfo = function(tx, options, callback) { } // the tx's that contain these input values could, themselves be unconfirmed - // we are also assuming that this tx is from thet mempool + // we are also assuming that this tx is from the mempool self._getInputValues(tx, options, function(err, inputValues) { if (err) {