From 086ba5fcfc7f7a32b720e361e87756af04ead062 Mon Sep 17 00:00:00 2001 From: Braydon Fuller Date: Thu, 29 Oct 2015 17:13:36 -0400 Subject: [PATCH] Switch to remove items from mempool index as they leave asynchronously. --- lib/services/address/index.js | 111 ++++++++++++++++++++-------- test/services/address/index.unit.js | 36 +++------ 2 files changed, 92 insertions(+), 55 deletions(-) diff --git a/lib/services/address/index.js b/lib/services/address/index.js index 3a9d0e29..3cc62510 100644 --- a/lib/services/address/index.js +++ b/lib/services/address/index.js @@ -6,7 +6,6 @@ var async = require('async'); var index = require('../../'); var log = index.log; var errors = index.errors; -var Transaction = require('../../transaction'); var bitcore = require('bitcore-lib'); var levelup = require('levelup'); var $ = bitcore.util.preconditions; @@ -35,6 +34,7 @@ var AddressService = function(options) { this.subscriptions['address/balance'] = {}; this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this)); + this.node.services.bitcoind.on('txleave', this.transactionLeaveHandler.bind(this)); this.mempoolOutputIndex = {}; this.mempoolInputIndex = {}; @@ -131,6 +131,17 @@ AddressService.prototype.transactionOutputHandler = function(messages, tx, outpu } }; +/** + * This will handle data from the daemon "txleave" that a transaction has left the mempool. + * @param {Object} txInfo - The data from the daemon.on('txleave') event + * @param {Buffer} txInfo.buffer - The transaction buffer + * @param {String} txInfo.hash - The hash of the transaction + */ +AddressService.prototype.transactionLeaveHandler = function(txInfo) { + var tx = bitcore.Transaction().fromBuffer(txInfo.buffer); + this.removeMempoolIndex(tx); +}; + /** * This will handle data from the daemon "tx" event, go through each of the outputs * and send messages by calling `transactionEventHandler` to any subscribers for a @@ -164,6 +175,72 @@ AddressService.prototype.transactionHandler = function(txInfo) { } }; +AddressService.prototype.removeMempoolIndex = function(tx) { + + var txid = tx.hash; + + var outputLength = tx.outputs.length; + for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) { + var output = tx.outputs[outputIndex]; + if (!output.script) { + continue; + } + var addressInfo = this._extractAddressInfoFromScript(output.script); + if (!addressInfo) { + continue; + } + + var addressStr = bitcore.Address({ + hashBuffer: addressInfo.hashBuffer, + type: addressInfo.addressType, + network: this.node.network + }).toString(); + + // Remove from the mempool output index + if (this.mempoolOutputIndex[addressStr]) { + var txs = this.mempoolOutputIndex[addressStr]; + for (var t = 0; t < txs.length; t++) { + if (txs[t].txid === txid && txs[t].outputIndex === outputIndex) { + txs.splice(t, 1); + } + } + if (txs.length === 0) { + delete this.mempoolOutputIndex[addressStr]; + } + } + } + var inputLength = tx.inputs.length; + for (var inputIndex = 0; inputIndex < inputLength; inputIndex++) { + + var input = tx.inputs[inputIndex]; + + // Remove from the mempool spent index + var spentIndexKey = [input.prevTxId.toString('hex'), input.outputIndex].join('-'); + if (this.mempoolSpentIndex[spentIndexKey]) { + delete this.mempoolSpentIndex[spentIndexKey]; + } + + var address = input.script.toAddress(this.node.network); + if (!address) { + continue; + } + var inputAddressStr = address.toString(); + + // Remove from the mempool input index + if (this.mempoolInputIndex[inputAddressStr]) { + var inputTxs = this.mempoolInputIndex[inputAddressStr]; + for (var x = 0; x < inputTxs.length; x++) { + if (inputTxs[x].txid === txid && inputTxs[x].inputIndex === inputIndex) { + inputTxs.splice(x, 1); + } + } + if (inputTxs.length === 0) { + delete this.mempoolInputIndex[inputAddressStr]; + } + } + } +}; + /** * This function will update the mempool address index with the necessary * information for further lookups. There are three indexes: @@ -239,11 +316,11 @@ AddressService.prototype.updateMempoolIndex = function(tx) { if (!address) { continue; } - var addressStr = address.toString(); - if (!this.mempoolInputIndex[addressStr]) { - this.mempoolInputIndex[addressStr] = []; + var inputAddressStr = address.toString(); + if (!this.mempoolInputIndex[inputAddressStr]) { + this.mempoolInputIndex[inputAddressStr] = []; } - this.mempoolInputIndex[addressStr].push({ + this.mempoolInputIndex[inputAddressStr].push({ txid: tx.hash, // TODO use buffer inputIndex: inputIndex }); @@ -251,30 +328,6 @@ AddressService.prototype.updateMempoolIndex = function(tx) { }; -/** - * This function is called by the Database Service when the database itself - * has finished synchronization. It will retrieve a copy of all transactions - * from the mempool and create an address index for fast look-ups. The previous - * index will be reset. - */ -AddressService.prototype.resetMempoolIndex = function(callback) { - var self = this; - var transactionBuffers = self.node.services.bitcoind.getMempoolTransactions(); - this.mempoolInputIndex = {}; - this.mempoolOutputIndex = {}; - this.mempoolSpentIndex = {}; - async.each(transactionBuffers, function(txBuffer, next) { - var tx = Transaction().fromBuffer(txBuffer); - self.updateMempoolIndex(tx); - setImmediate(next); - }, function(err) { - if (err) { - return callback(err); - } - callback(); - }); -}; - /** * This function is optimized to return address information about an output script * without constructing a Bitcore Address instance. diff --git a/test/services/address/index.unit.js b/test/services/address/index.unit.js index 41e65307..a63f6647 100644 --- a/test/services/address/index.unit.js +++ b/test/services/address/index.unit.js @@ -1036,9 +1036,8 @@ describe('Address Service', function() { }); }); }); - describe('#updateMempoolIndex', function() { + describe('#updateMempoolIndex/#removeMempoolIndex', function() { var am; - var db = {}; var tx = Transaction().fromBuffer(txBuf); before(function() { @@ -1049,35 +1048,20 @@ describe('Address Service', function() { am.updateMempoolIndex(tx); am.mempoolInputIndex['18Z29uNgWyUDtNyTKE1PaurbSR131EfANc'][0].txid.should.equal('45202ffdeb8344af4dec07cddf0478485dc65cc7d08303e45959630c89b51ea2'); am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5'][0].txid.should.equal('45202ffdeb8344af4dec07cddf0478485dc65cc7d08303e45959630c89b51ea2'); + Object.keys(am.mempoolSpentIndex).length.should.equal(14); am.mempoolInputIndex['1JT7KDYwT9JY9o2vyqcKNSJgTWeKfV3ui8'].length.should.equal(12); am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5'].length.should.equal(1); }); - }); - describe('#resetMempoolIndex', function() { - var am; - var db = {}; - - before(function() { - var testnode = { - db: db, - services: { - bitcoind: { - getMempoolTransactions: sinon.stub().returns([txBuf]), - on: sinon.stub() - } - } - }; - am = new AddressService({node: testnode}); - am.updateMempoolIndex = sinon.stub(); - - }); - it('will reset the input and output indexes', function(done) { - am.resetMempoolIndex(function() { - am.updateMempoolIndex.callCount.should.equal(1); - done(); - }); + it('will remove the input and output indexes', function() { + am.removeMempoolIndex(tx); + should.not.exist(am.mempoolInputIndex['18Z29uNgWyUDtNyTKE1PaurbSR131EfANc']); + should.not.exist(am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5']); + Object.keys(am.mempoolSpentIndex).length.should.equal(0); + should.not.exist(am.mempoolInputIndex['1JT7KDYwT9JY9o2vyqcKNSJgTWeKfV3ui8']); + should.not.exist(am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5']); }); + }); describe('#getAddressSummary', function() { var node = {