Address: Fixed race condition with transaction event handlers

This commit is contained in:
Braydon Fuller 2015-11-03 17:11:35 -05:00
parent c5c8e21c6c
commit 0ea035c4f0
2 changed files with 49 additions and 21 deletions

View File

@ -223,8 +223,10 @@ AddressService.prototype.transactionLeaveHandler = function(txInfo) {
* @param {Buffer} txInfo.buffer - The transaction buffer
* @param {Boolean} txInfo.mempool - If the transaction was accepted in the mempool
* @param {String} txInfo.hash - The hash of the transaction
* @param {Function} [callback] - Optional callback
*/
AddressService.prototype.transactionHandler = function(txInfo) {
AddressService.prototype.transactionHandler = function(txInfo, callback) {
var self = this;
// Basic transaction format is handled by the daemon
// and we can safely assume the buffer is properly formatted.
@ -237,15 +239,31 @@ AddressService.prototype.transactionHandler = function(txInfo) {
this.transactionOutputHandler(messages, tx, i, !txInfo.mempool);
}
// Update mempool index
if (txInfo.mempool) {
this.updateMempoolIndex(tx, true);
if (!callback) {
callback = function(err) {
if (err) {
return log.error(err);
}
};
}
for (var key in messages) {
this.transactionEventHandler(messages[key]);
this.balanceEventHandler(null, messages[key].addressInfo);
function finish(err) {
if (err) {
return callback(err);
}
for (var key in messages) {
self.transactionEventHandler(messages[key]);
self.balanceEventHandler(null, messages[key].addressInfo);
}
callback();
}
if (txInfo.mempool) {
self.updateMempoolIndex(tx, true, finish);
} else {
setImmediate(finish);
}
};
/**
@ -254,7 +272,7 @@ AddressService.prototype.transactionHandler = function(txInfo) {
* @param {Transaction} - An instance of a Bitcore Transaction
* @param {Boolean} - Add/remove from the index
*/
AddressService.prototype.updateMempoolIndex = function(tx, add) {
AddressService.prototype.updateMempoolIndex = function(tx, add, callback) {
/* jshint maxstatements: 100 */
var operations = [];
@ -362,11 +380,15 @@ AddressService.prototype.updateMempoolIndex = function(tx, add) {
}
this.mempoolIndex.batch(operations, function(err) {
if (err) {
return log.error(err);
}
});
if (!callback) {
callback = function(err) {
if (err) {
return log.error(err);
}
};
}
this.mempoolIndex.batch(operations, callback);
};

View File

@ -357,24 +357,30 @@ describe('Address Service', function() {
});
describe('#transactionHandler', function() {
it('will pass outputs to transactionOutputHandler and call transactionEventHandler and balanceEventHandler', function() {
it('will pass outputs to transactionOutputHandler and call transactionEventHandler and balanceEventHandler', function(done) {
var txBuf = new Buffer('01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0704ffff001d0104ffffffff0100f2052a0100000043410496b538e853519c726a2c91e61ec11600ae1390813a627c66fb8be7947be63c52da7589379515d4e0a604f8141781e62294721166bf621e73a82cbf2342c858eeac00000000', 'hex');
var am = new AddressService({
var am1 = new AddressService({
mempoolMemoryIndex: true,
node: mocknode
});
var address = '12c6DSiU4Rq3P4ZxziKxzrL5LmMBrzjrJX';
var message = {};
am.transactionOutputHandler = function(messages) {
am1.transactionOutputHandler = function(messages) {
messages[address] = message;
};
am.transactionEventHandler = sinon.spy();
am.balanceEventHandler = sinon.spy();
am.transactionHandler({
am1.transactionEventHandler = sinon.stub();
am1.balanceEventHandler = sinon.stub();
am1.transactionHandler({
buffer: txBuf
}, function(err) {
if (err) {
throw err;
}
am1.transactionEventHandler.callCount.should.equal(1);
am1.balanceEventHandler.callCount.should.equal(1);
done();
});
am.transactionEventHandler.callCount.should.equal(1);
am.balanceEventHandler.callCount.should.equal(1);
});
});