Added reorg test.

This commit is contained in:
Chris Kleeschulte 2017-09-21 17:05:09 -04:00
parent d5e5904329
commit ada997c8bd
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
6 changed files with 192 additions and 213 deletions

View File

@ -28,7 +28,6 @@ var BlockService = function(options) {
this._blockCount = 0;
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network];
this._initialSync = true;
this._reorgQueue = [];
};
inherits(BlockService, BaseService);
@ -50,18 +49,29 @@ BlockService.prototype.getAPIMethods = function() {
};
BlockService.prototype.getInfo = function(callback) {
callback(null, {
blocks: this.getTip().height,
connections: this._p2p.getNumberOfPeers(),
timeoffset: 0,
proxy: '',
testnet: this.node.network === 'livenet' ? false: true,
errors: '',
network: this.node.network,
relayFee: 0,
version: 'bitcore-1.1.2',
protocolversion: 700001,
difficulty: this._header.getCurrentDifficulty()
var self = this;
// check to ensure the header service is all up-to-date
async.retry(function(next) {
var ready = self._header.getBestHeight() < self.getTip().height;
next(ready);
}, function(err) {
if (err) {
return callback(err);
}
callback(null, {
blocks: self.getTip().height,
connections: self._p2p.getNumberOfPeers(),
timeoffset: 0,
proxy: '',
testnet: self.node.network === 'livenet' ? false: true,
errors: '',
network: self.node.network,
relayFee: 0,
version: 'bitcore-1.1.2',
protocolversion: 700001,
difficulty: self._header.getCurrentDifficulty()
});
});
};
@ -189,14 +199,7 @@ BlockService.prototype._checkTip = function(callback) {
return callback(err || new Error('All headers not found.'));
}
assert(self._reorgQueue.length === 0, 'Reorg Queue is not the expected length.');
var hash = header.hash;
if (self._reorgQueue.indexOf(hash) === -1) {
self._reorgQueue.push(hash);
}
self._handleReorg(hash, headers, callback);
self._handleReorg(header.hash, headers, callback);
});
@ -280,79 +283,41 @@ BlockService.prototype._detectReorg = function(block) {
return false;
};
BlockService.prototype._getOldHeaders = function(commonAncestorHeader, allHeaders) {
var header = allHeaders.get(this._tip.hash);
assert(header, 'Header needed to find old blocks during reorg.');
var headers = [];
while (header.height > commonAncestorHeader.height) {
headers.push(header);
header = allHeaders.get(header.prevHash);
}
return headers;
};
BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback) {
// the prev hash of the passed in "hash" should also point to a block who has the same prev hash.
var self = this;
var count = 0;
var _oldTip = self._tip.hash;
var _newTip = hash;
var oldBlocks = [];
assert(_oldTip, 'We don\'t have a tip hash to reorg away from!');
var reorgHeader = allHeaders.get(hash);
assert(reorgHeader, 'No reorg header found, cannot find common ancestor.');
async.whilst(
// test case
function() {
var prevHash = reorgHeader.prevHash;
return _oldTip !== _newTip && ++count < allHeaders.size;
self.getBlock(prevHash, function(err, block) {
},
// get block
function(next) {
if (err) {
return callback(err);
}
// old tip (our current tip) has to be in database
self._db.get(self._encoding.encodeBlockKey(_oldTip), function(err, data) {
if (err || !data) {
return next(err || new Error('missing block'));
}
// once we've found the old tip, we will find its prev and check to see if matches new tip's prev
var block = self._encoding.decodeBlockValue(data);
// for the purposes of building the old blocks list
// apply the block's height
block.__height = self._tip.height;
// apply the block's timestamp
self._timestamp.getTimestamp(block.rhash(), function(err, timestamp) {
if (err || !timestamp) {
return next(err || new Error('missing timestamp'));
}
block.__ts = timestamp;
// we will squirrel away the block because our services will need to remove it after we've found the common ancestor
oldBlocks.push(block);
// this is our current tip's prev hash
_oldTip = bcoin.util.revHex(block.prevBlock);
// our current headers have the correct state of the chain, so consult that for its prev hash
var header = allHeaders.get(_newTip);
assert(header, 'The expected hash is not in the headers list.');
// set new tip to the prev hash
_newTip = header.prevHash;
next();
});
});
}, function(err) {
if (err) {
return callback(err);
}
assert(_newTip === _oldTip, 'A common ancestor hash could not be found.');
callback(null, _newTip, oldBlocks);
});
var commonAncestorHeader = allHeaders.get(block.rhash());
callback(null, commonAncestorHeader);
});
};
BlockService.prototype._getBlock = function(hash, callback) {
@ -402,18 +367,15 @@ BlockService.prototype._handleReorg = function(reorgHash, allHeaders, callback)
var self = this;
self._reorging = true;
var reorgHeader = allHeaders.get(reorgHash);
assert(reorgHeader, 'We were asked to reorg to a non-existent hash.');
self._findCommonAncestor(reorgHash, allHeaders, function(err, commonAncestorHash, oldBlocks) {
self._findCommonAncestor(reorgHash, allHeaders, function(err, commonAncestorHeader) {
if (err) {
return callback(err);
}
var commonAncestorHeader = allHeaders.get(commonAncestorHash);
assert(commonAncestorHeader, 'A common ancestor hash was found, but its header could not he found.');
// if we are syncing and we haven't sync'ed to the common ancestor hash, we can safely ignore this reorg
@ -421,23 +383,25 @@ BlockService.prototype._handleReorg = function(reorgHash, allHeaders, callback)
return callback();
}
log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash +
' at height: ' + commonAncestorHeader.height + '. Removing: ' + oldBlocks.length + ' block(s) from our indexes.');
var reorgHeaders = self._getOldHeaders(commonAncestorHeader, allHeaders);
self._processReorg(commonAncestorHeader, oldBlocks, function(err) {
assert(reorgHeaders, 'Expected to have reorg headers to remove');
log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash +
' and height: ' + commonAncestorHeader.height + '. Removing a total of: ' + reorgHeaders.length + ' block(s).');
self._processReorg(commonAncestorHeader, reorgHeaders, function(err) {
if(err) {
return callback(err);
}
self._reorging = false;
callback();
});
});
};
// this JUST rewinds the chain back to the common ancestor block, nothing more
@ -491,48 +455,59 @@ BlockService.prototype._onAllHeaders = function() {
};
BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks, callback) {
BlockService.prototype._processReorg = function(commonAncestorHeader, reorgHeaders, callback) {
var self = this;
var operations = [];
var services = self.node.services;
async.eachSeries(
services,
function(mod, next) {
if(mod.onReorg) {
mod.onReorg.call(mod, [commonAncestorHeader, oldBlocks], function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
operations = operations.concat(ops);
}
next();
});
} else {
setImmediate(next);
}
},
// process one block at a time just in case we have a huge amount of blocks to back out
async.eachSeries(reorgHeaders, function(header, next) {
function(err) {
if (err) {
return callback(err);
}
self._db.batch(operations, function(err) {
if (err) {
return callback(err);
async.waterfall([
function(next) {
self.getBlock(header.hash, next);
},
function(block, next) {
if (!block) {
return next(new Error('block not found for reorg.'));
}
self._timestamp.getTimestamp(header.hash, function(err, timestamp) {
if (err || !timestamp) {
return next(err || new Error('timestamp missing from reorg.'));
}
block.__height = header.height;
block.__ts = timestamp;
next(null, block);
});
},
function(block, next) {
async.eachSeries(services, function(mod, next) {
if(mod.onReorg) {
mod.onReorg.call(mod, [commonAncestorHeader, [block]], function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
operations = operations.concat(ops);
}
self._onReorg(commonAncestorHeader, [block], next);
});
} else {
setImmediate(next);
}
}, next);
}
], next);
}, function(err) {
self._onReorg(commonAncestorHeader, oldBlocks, callback);
});
if (err) {
return callback(err);
}
);
self._db.batch(operations, callback);
});
};
BlockService.prototype._processBlock = function(block) {
@ -613,9 +588,9 @@ BlockService.prototype.onBlock = function(block, callback) {
});
};
BlockService.prototype._onBlock = function(block) {
BlockService.prototype.newBlock = function(block) {
if (this.node.stopping || this._reorging || this._tip.hash === block.rhash()) {
if (this.node.stopping || this._tip.hash === block.rhash()) {
return;
}
@ -635,41 +610,19 @@ BlockService.prototype._onBlock = function(block) {
};
BlockService.prototype._setListeners = function() {
var self = this;
self._header.on('headers', self._onAllHeaders.bind(self));
};
self._header.on('reorg', function(block, headers) {
log.debug('Block Service: detected a reorg from the header service.');
if (self._reorgQueue.indexOf(block) === -1) {
self._reorgQueue.push(block);
BlockService.prototype.newReorg = function(block, headers, callback) {
var self = this;
log.info('Block Service: detected a reorg from the header service.');
self._handleReorg(block.rhash(), headers, function(err) {
if (err) {
return callback(err);
}
var _block;
async.whilst(function() {
_block = self._reorgQueue.shift();
return _block;
}, function(next) {
self._handleReorg(_block.rhash(), headers, function(err) {
if (err) {
return next(err);
}
self._onBlock(_block);
next();
});
}, function(err) {
if (err) {
log.error(err);
self.node.stop();
}
});
self.newBlock(block, callback);
});
};
BlockService.prototype._setTip = function(tip) {
@ -687,21 +640,6 @@ BlockService.prototype._startSync = function() {
this._sync();
};
BlockService.prototype._startSubscriptions = function() {
if (this._subscribed) {
return;
}
this._subscribed = true;
if (!this._bus) {
this._bus = this.node.openBus({remoteAddress: 'localhost-block'});
}
this._bus.on('header/block', this._onBlock.bind(this));
this._bus.subscribe('header/block');
};
BlockService.prototype._sync = function() {
var self = this;
@ -742,7 +680,7 @@ BlockService.prototype._sync = function() {
endHash: nextHash
},
blockHash: targetHash
}, self._onBlock.bind(self));
}, self.newBlock.bind(self));
});
@ -754,7 +692,6 @@ BlockService.prototype._sync = function() {
log.info('Block Service: The best block hash is: ' + self._tip.hash +
' at height: ' + self._tip.height);
this._startSubscriptions();
};
module.exports = BlockService;

View File

@ -27,7 +27,6 @@ var HeaderService = function(options) {
this._checkpoint = options.checkpoint || 2000; // set to -1 to resync all headers.
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network];
this._lastHeader = null;
this.blockServiceSyncing = true;
this.lastBlockQueried = null;
this._initialSync = true;
};
@ -281,11 +280,12 @@ HeaderService.prototype._processBlocks = function(block, callback) {
return callback(err);
}
if (!self.blockServiceSyncing) {
self._broadcast(block);
}
callback();
async.eachSeries(self.node.services, function(mod, next) {
if (!mod.newBlock) {
return setImmediate(next);
}
mod.newBlock.call(mod, block, next);
}, callback);
});
@ -307,10 +307,6 @@ HeaderService.prototype._persistHeader = function(block, callback) {
}
log.warn('Header Service: Reorganization detected, current tip hash: ' +
self._tip.hash + ', new block causing the reorg: ' + block.rhash() +
' common ancestor hash: ' + commonHeader.hash);
self._handleReorg(block, commonHeader, function(err) {
if(err) {
@ -409,7 +405,8 @@ HeaderService.prototype._onHeaders = function(headers) {
header = header.toObject();
assert(self._lastHeader.hash === header.prevHash, 'headers not in order: ' + self._lastHeader.hash + ' -and- ' + header.prevHash + ' Last header at height: ' + self._lastHeader.height);
assert(self._lastHeader.hash === header.prevHash, 'headers not in order: ' + self._lastHeader.hash +
' -and- ' + header.prevHash + ' Last header at height: ' + self._lastHeader.height);
var ops = self._onHeader(header);
@ -567,7 +564,8 @@ HeaderService.prototype._detectReorg = function(block, callback) {
return callback(null, header);
}
log.warn('Block: ' + block.rhash() + 'references: ' + prevHash + ' as its previous block, yet we have not stored this block in our data set, thus ignoring this block.');
log.warn('Block: ' + block.rhash() + 'references: ' + prevHash +
' as its previous block, yet we have not stored this block in our data set, thus ignoring this block.');
callback(null, false);
});
@ -577,6 +575,12 @@ HeaderService.prototype._detectReorg = function(block, callback) {
HeaderService.prototype._handleReorg = function(block, commonHeader, callback) {
var self = this;
log.warn('Header Service: Reorganization detected, current tip hash: ' +
self._tip.hash + ', new block causing the reorg: ' + block.rhash() +
' common ancestor hash: ' + commonHeader.hash + ' and height: ' +
commonHeader.height);
var reorgHeader = self._formatHeader(block);
self.getAllHeaders(function(err, headers) {
@ -595,11 +599,12 @@ HeaderService.prototype._handleReorg = function(block, commonHeader, callback) {
return callback(err);
}
// emit the fact that there is a reorg even though the block
// service may not have reached this point in its sync.
// Let the block service sort that our
self.emit('reorg', block, headers);
return callback();
async.eachSeries(self.node.services, function(mod, next) {
if (!mod.newReorg) {
return setImmediate(next);
}
mod.newReorg.call(mod, block, headers, next);
}, callback);
});
});
@ -689,8 +694,6 @@ HeaderService.prototype._findReorgConditionInNewPeer = function(callback) {
var self = this;
self._removeAllSubscriptions();
var newPeerHeaders = new utils.SimpleMap();
var headerCount = 0;
@ -757,6 +760,8 @@ HeaderService.prototype._handleLowTipHeight = function() {
self._tip.height + '). This means that this peer is not fully synchronized with the network -or- the peer has reorganized itself.' +
' Checking the new peer\'s headers for a reorganization event.');
self._removeAllSubscriptions();
self._findReorgConditionInNewPeer(function(err, reorgInfo) {
if (err) {
@ -794,6 +799,7 @@ HeaderService.prototype._handleLowTipHeight = function() {
return self.node.stop();
}
self._startHeaderSubscription();
self._sync();
});

View File

@ -1,7 +1,6 @@
'use strict';
var BaseService = require('../../service');
var util = require('util');
var utils = require('../../utils');
var Encoding = require('./encoding');
var index = require('../../');
var log = index.log;
@ -94,7 +93,9 @@ MempoolService.prototype.onReorg = function(args, callback) {
}
}
callback(null, removalOps);
setImmediate(function() {
callback(null, removalOps);
});
};
MempoolService.prototype._startSubscriptions = function() {

View File

@ -317,7 +317,9 @@ TransactionService.prototype.onReorg = function(args, callback) {
}
}
callback(null, removalOps);
setImmediate(function() {
callback(null, removalOps);
});
};

View File

@ -43,16 +43,44 @@ describe('Block Service', function() {
});
});
describe('#_getOldBlocks', function() {
it('should get the old block list (blocks to be removed from indexes)', function(done) {
blockService._tip = { height: 10, hash: 'cc' };
var header = { height: 8, hash: 'aa' };
var getStub = sandbox.stub();
getStub.onCall(0).returns({ hash: 'bb', height: 9 });
getStub.onCall(1).returns({ hash: 'aa', height: 8 });
var allHeaders = { get: getStub };
sandbox.stub(blockService, 'getBlock').callsArgWith(1, null, {});
blockService._timestamp = { getTimestamp: sinon.stub().callsArgWith(1, null, 1234) };
blockService._getOldBlocks(header, allHeaders, function(err, blocks) {
if (err) {
return done(err);
}
expect(blocks.length).to.equal(1);
expect(blocks[0].__height).to.equal(9);
expect(blocks[0].__ts).to.equal(1234);
done();
});
});
});
describe('#_findCommonAncestor', function() {
it('should find the common ancestor between the current chain and the new chain', function(done) {
blockService._tip = { hash: block2.rhash(), height: 70901 };
var data = blockService._encoding.encodeBlockValue(block2);
blockService._db = { get: sandbox.stub().callsArgWith(1, null, data) };
blockService._timestamp = { getTimestamp: sandbox.stub().callsArgWith(1, null, 1234) };
var commonAncestorHash = bcoin.util.revHex(block2.prevBlock);
var allHeaders = { get: sandbox.stub().returns({ prevHash: commonAncestorHash }), size: 1e6 };
sandbox.stub(blockService, 'getBlock').callsArgWith(1, null, block1);
sandbox.stub(blockService, '_getOldBlocks').callsArgWith(2, null, [block2]);
var allHeaders = { get: sandbox.stub().returns({ hash: 'somecommonancestor', prevHash: block1.rhash() }), size: 1e6 };
blockService._findCommonAncestor('aa', allHeaders, function(err, commonAncestorHashActual, oldBlockList) {
@ -60,9 +88,7 @@ describe('Block Service', function() {
return done(err);
}
block2.__ts = 1234;
block2.__height = 70901;
expect(commonAncestorHashActual).to.equal(commonAncestorHash);
expect(commonAncestorHashActual).to.equal('somecommonancestor');
expect(oldBlockList).to.deep.equal([block2]);
done();
@ -123,11 +149,9 @@ describe('Block Service', function() {
it('should set listeners for headers, reorg', function() {
var on = sandbox.stub();
var once = sandbox.stub();
blockService._header = { on: on, once: once };
blockService._header = { on: on };
blockService._setListeners();
expect(on.calledOnce).to.be.true;
expect(once.calledOnce).to.be.true;
expect(on.calledTwice).to.be.true;
});
});

View File

@ -10,6 +10,7 @@ var utils = require('../../../lib/utils');
var Block = require('bitcore-lib').Block;
var BN = require('bn.js');
var Emitter = require('events').EventEmitter;
var bcoin = require('bcoin');
describe('Header Service', function() {
@ -101,11 +102,15 @@ describe('Header Service', function() {
it('should start the sync process', function() {
headerService._bestHeight = 123;
var unsub = sinon.stub();
headerService._tip = { height: 120 };
headerService._bus = { unsubscribe: unsub };
headerService._blockProcessor = { length: 0 };
var getHeaders = sandbox.stub();
headerService._p2p = { getHeaders: getHeaders };
headerService._lastHeader = { height: 124 };
headerService._startSync();
expect(unsub.calledOnce).to.be.true;
expect(getHeaders.calledOnce).to.be.true;
});
@ -133,7 +138,11 @@ describe('Header Service', function() {
var onHeader = sandbox.stub(headerService, '_onHeader');
var saveHeaders = sandbox.stub(headerService, '_saveHeaders');
headerService._tip = { height: 123, hash: 'aa' };
headerService._lastHeader = { hash: header.prevHash };
headerService._onHeaders(headers);
expect(onHeader.calledOnce).to.be.true;
expect(saveHeaders.calledOnce).to.be.true;