Switch to remove items from mempool index as they leave asynchronously.

This commit is contained in:
Braydon Fuller 2015-10-29 17:13:36 -04:00
parent f0ec424161
commit 086ba5fcfc
2 changed files with 92 additions and 55 deletions

View File

@ -6,7 +6,6 @@ var async = require('async');
var index = require('../../');
var log = index.log;
var errors = index.errors;
var Transaction = require('../../transaction');
var bitcore = require('bitcore-lib');
var levelup = require('levelup');
var $ = bitcore.util.preconditions;
@ -35,6 +34,7 @@ var AddressService = function(options) {
this.subscriptions['address/balance'] = {};
this.node.services.bitcoind.on('tx', this.transactionHandler.bind(this));
this.node.services.bitcoind.on('txleave', this.transactionLeaveHandler.bind(this));
this.mempoolOutputIndex = {};
this.mempoolInputIndex = {};
@ -131,6 +131,17 @@ AddressService.prototype.transactionOutputHandler = function(messages, tx, outpu
}
};
/**
* This will handle data from the daemon "txleave" that a transaction has left the mempool.
* @param {Object} txInfo - The data from the daemon.on('txleave') event
* @param {Buffer} txInfo.buffer - The transaction buffer
* @param {String} txInfo.hash - The hash of the transaction
*/
AddressService.prototype.transactionLeaveHandler = function(txInfo) {
var tx = bitcore.Transaction().fromBuffer(txInfo.buffer);
this.removeMempoolIndex(tx);
};
/**
* This will handle data from the daemon "tx" event, go through each of the outputs
* and send messages by calling `transactionEventHandler` to any subscribers for a
@ -164,6 +175,72 @@ AddressService.prototype.transactionHandler = function(txInfo) {
}
};
AddressService.prototype.removeMempoolIndex = function(tx) {
var txid = tx.hash;
var outputLength = tx.outputs.length;
for (var outputIndex = 0; outputIndex < outputLength; outputIndex++) {
var output = tx.outputs[outputIndex];
if (!output.script) {
continue;
}
var addressInfo = this._extractAddressInfoFromScript(output.script);
if (!addressInfo) {
continue;
}
var addressStr = bitcore.Address({
hashBuffer: addressInfo.hashBuffer,
type: addressInfo.addressType,
network: this.node.network
}).toString();
// Remove from the mempool output index
if (this.mempoolOutputIndex[addressStr]) {
var txs = this.mempoolOutputIndex[addressStr];
for (var t = 0; t < txs.length; t++) {
if (txs[t].txid === txid && txs[t].outputIndex === outputIndex) {
txs.splice(t, 1);
}
}
if (txs.length === 0) {
delete this.mempoolOutputIndex[addressStr];
}
}
}
var inputLength = tx.inputs.length;
for (var inputIndex = 0; inputIndex < inputLength; inputIndex++) {
var input = tx.inputs[inputIndex];
// Remove from the mempool spent index
var spentIndexKey = [input.prevTxId.toString('hex'), input.outputIndex].join('-');
if (this.mempoolSpentIndex[spentIndexKey]) {
delete this.mempoolSpentIndex[spentIndexKey];
}
var address = input.script.toAddress(this.node.network);
if (!address) {
continue;
}
var inputAddressStr = address.toString();
// Remove from the mempool input index
if (this.mempoolInputIndex[inputAddressStr]) {
var inputTxs = this.mempoolInputIndex[inputAddressStr];
for (var x = 0; x < inputTxs.length; x++) {
if (inputTxs[x].txid === txid && inputTxs[x].inputIndex === inputIndex) {
inputTxs.splice(x, 1);
}
}
if (inputTxs.length === 0) {
delete this.mempoolInputIndex[inputAddressStr];
}
}
}
};
/**
* This function will update the mempool address index with the necessary
* information for further lookups. There are three indexes:
@ -239,11 +316,11 @@ AddressService.prototype.updateMempoolIndex = function(tx) {
if (!address) {
continue;
}
var addressStr = address.toString();
if (!this.mempoolInputIndex[addressStr]) {
this.mempoolInputIndex[addressStr] = [];
var inputAddressStr = address.toString();
if (!this.mempoolInputIndex[inputAddressStr]) {
this.mempoolInputIndex[inputAddressStr] = [];
}
this.mempoolInputIndex[addressStr].push({
this.mempoolInputIndex[inputAddressStr].push({
txid: tx.hash, // TODO use buffer
inputIndex: inputIndex
});
@ -251,30 +328,6 @@ AddressService.prototype.updateMempoolIndex = function(tx) {
};
/**
* This function is called by the Database Service when the database itself
* has finished synchronization. It will retrieve a copy of all transactions
* from the mempool and create an address index for fast look-ups. The previous
* index will be reset.
*/
AddressService.prototype.resetMempoolIndex = function(callback) {
var self = this;
var transactionBuffers = self.node.services.bitcoind.getMempoolTransactions();
this.mempoolInputIndex = {};
this.mempoolOutputIndex = {};
this.mempoolSpentIndex = {};
async.each(transactionBuffers, function(txBuffer, next) {
var tx = Transaction().fromBuffer(txBuffer);
self.updateMempoolIndex(tx);
setImmediate(next);
}, function(err) {
if (err) {
return callback(err);
}
callback();
});
};
/**
* This function is optimized to return address information about an output script
* without constructing a Bitcore Address instance.

View File

@ -1036,9 +1036,8 @@ describe('Address Service', function() {
});
});
});
describe('#updateMempoolIndex', function() {
describe('#updateMempoolIndex/#removeMempoolIndex', function() {
var am;
var db = {};
var tx = Transaction().fromBuffer(txBuf);
before(function() {
@ -1049,35 +1048,20 @@ describe('Address Service', function() {
am.updateMempoolIndex(tx);
am.mempoolInputIndex['18Z29uNgWyUDtNyTKE1PaurbSR131EfANc'][0].txid.should.equal('45202ffdeb8344af4dec07cddf0478485dc65cc7d08303e45959630c89b51ea2');
am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5'][0].txid.should.equal('45202ffdeb8344af4dec07cddf0478485dc65cc7d08303e45959630c89b51ea2');
Object.keys(am.mempoolSpentIndex).length.should.equal(14);
am.mempoolInputIndex['1JT7KDYwT9JY9o2vyqcKNSJgTWeKfV3ui8'].length.should.equal(12);
am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5'].length.should.equal(1);
});
});
describe('#resetMempoolIndex', function() {
var am;
var db = {};
before(function() {
var testnode = {
db: db,
services: {
bitcoind: {
getMempoolTransactions: sinon.stub().returns([txBuf]),
on: sinon.stub()
}
}
};
am = new AddressService({node: testnode});
am.updateMempoolIndex = sinon.stub();
});
it('will reset the input and output indexes', function(done) {
am.resetMempoolIndex(function() {
am.updateMempoolIndex.callCount.should.equal(1);
done();
});
it('will remove the input and output indexes', function() {
am.removeMempoolIndex(tx);
should.not.exist(am.mempoolInputIndex['18Z29uNgWyUDtNyTKE1PaurbSR131EfANc']);
should.not.exist(am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5']);
Object.keys(am.mempoolSpentIndex).length.should.equal(0);
should.not.exist(am.mempoolInputIndex['1JT7KDYwT9JY9o2vyqcKNSJgTWeKfV3ui8']);
should.not.exist(am.mempoolOutputIndex['12w93weN8oti3P1e5VYEuygqyujhADF7J5']);
});
});
describe('#getAddressSummary', function() {
var node = {