more reorg stuff.

This commit is contained in:
Chris Kleeschulte 2017-09-28 11:01:57 -04:00
parent 67c2c07ae5
commit 09b365772c
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
2 changed files with 59 additions and 24 deletions

View File

@ -25,8 +25,9 @@ var BlockService = function(options) {
this._blockCount = 0;
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network];
this._initialSync = true;
this._reorgBackToBlock = null; // use this to rewind your indexes to a specific point by height or hash
this._reorgBackToBlock = 1202419; // use this to rewind your indexes to a specific point by height or hash
this._timeOfLastBlockReport = Date.now() - 30000;
this._blocksInQueue = 0;
};
inherits(BlockService, BaseService);
@ -188,7 +189,6 @@ BlockService.prototype._checkTip = function(callback) {
return callback();
}
log.warn('Block Service: reorganization spotted...');
self._findCommonAncestor(function(err, commonAncestorHash) {
if(err) {
return callback(err);
@ -361,6 +361,7 @@ BlockService.prototype._queueBlock = function(block) {
var self = this;
self._blocksInQueue++;
self._blockProcessor.push(block, function(err) {
if (err) {
@ -375,6 +376,8 @@ BlockService.prototype._queueBlock = function(block) {
log.debug('Block Service: completed processing block: ' + block.rhash() +
' prev hash: ' + bcoin.util.revHex(block.prevBlock) + ' height: ' + self._tip.height);
self._blocksInQueue--;
});
};
@ -490,6 +493,12 @@ BlockService.prototype.onHeaders = function(callback) {
var self = this;
// when this fires, we switch into initial sync mode (like first boot)
// this includes finishing any blocks that may be in the queue, then
// checking the tip, etc.
// we may be in a reorg situation at the time this function runs
// because the header service runs this function after it completes its
// reorg routines.
self._removeAllSubscriptions();
self._resetTip(function(err) {
@ -498,7 +507,7 @@ BlockService.prototype.onHeaders = function(callback) {
}
async.retry(function(next) {
next(self._blockProcessor.length() !== 0);
next(self._blocksInQueue > 0);
}, function() {
@ -535,6 +544,9 @@ BlockService.prototype._handleReorg = function(commonAncestorHash, callback) {
var self = this;
// we want to ensure that we can reask for previously delievered inventory
self._p2p.clearInventoryCache();
log.warn('Block Service: chain reorganization detected, current height/hash: ' + self._tip.height + '/' +
self._tip.hash + ' common ancestor hash: ' + commonAncestorHash);
@ -643,18 +655,16 @@ BlockService.prototype._processBlock = function(block, callback) {
// common case
if (!self._detectReorg(block)) {
return setImmediate(function() {
self._saveBlock(block, callback);
});
return self._saveBlock(block, callback);
}
// reorg
self._handleReorg(bcoin.util.revHex(block.prevBlock), function(err) {
if(err) {
return callback(err);
}
self._saveBlock(block, callback);
});
// reorg -- in this case, we will not handle the reorg right away
// instead, we will skip the block and wait for the eventual call to
// "onHeaders" function. When the header service calls this function,
// we will have a chance to clear out our block queue, check our tip,
// discover where to reorg to, reorg all the services that rely on
// blocks and sync from there.
return callback();
};
@ -683,13 +693,16 @@ BlockService.prototype._saveBlock = function(block, callback) {
return callback(err);
}
self._setTip({ hash: block.rhash(), height: self._tip.height + 1 });
var tipOps = utils.encodeTip(self._tip, self.name);
var tipOps = utils.encodeTip({
hash: block.rhash(),
height: self._tip.height + 1
}, self.name);
self._db.put(tipOps.key, tipOps.value, function(err) {
if(err) {
return callback(err);
}
self._setTip({ hash: block.rhash(), height: self._tip.height + 1 });
callback();
});
});
@ -753,6 +766,8 @@ BlockService.prototype._startSync = function() {
if (numNeeded > 0) {
this.on('next block', this._sync.bind(this));
this.on('synced', this._onSynced.bind(this));
clearInterval(this._reportInterval);
this._reportingInterval = setInterval(this._reportStatus.bind(this), 5000);
return this._sync();
}
@ -760,6 +775,13 @@ BlockService.prototype._startSync = function() {
};
BlockService.prototype._reportStatus = function() {
if (this._tip.height % 144 === 0 || Date.now() - this._timeOfLastBlockReport > 10000) {
this._timeOfLastBlockReport = Date.now();
this._logProgress();
}
};
BlockService.prototype._sync = function() {
var self = this;
@ -768,11 +790,6 @@ BlockService.prototype._sync = function() {
return;
}
if (self._tip.height % 144 === 0 || Date.now() - self._timeOfLastBlockReport > 10000) {
self._timeOfLastBlockReport = Date.now();
self._logProgress();
}
self._header.getNextHash(self._tip, function(err, targetHash, nextHash) {
if(err) {

View File

@ -307,6 +307,10 @@ HeaderService.prototype._persistHeader = function(block, callback) {
return self._syncBlock(block, callback);
}
// this will lead to rechecking the network for more headers after our last header
// and calling on=Headers on each service that implements it
self._initialSync = true;
self._findCommonAncestor(block, function(err, commonHeader) {
if (err || !commonHeader) {
@ -505,14 +509,16 @@ HeaderService.prototype._onHeadersSave = function(callback) {
return callback();
}
self._endHeaderSubscription();
self._startBlockSubscription();
self._endHeaderSubscription(); // we don't need headers any more
self._startBlockSubscription(); // we need new blocks coming tu us aynchronuously
self._setBestHeader();
if (!self._initialSync) {
return callback();
}
// this will happen after an inital start up and sync -and- also after a chain reorg
log.info('Header Service: sync complete.');
self._initialSync = false;
@ -520,7 +526,7 @@ HeaderService.prototype._onHeadersSave = function(callback) {
if (service.onHeaders) {
return service.onHeaders.call(service, next);
}
setImmediate(next);
next();
}, callback);
};
@ -623,15 +629,23 @@ HeaderService.prototype._handleReorg = function(block, commonHeader, callback) {
HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader, callback) {
// remove all headers with a height greater than commonHeader
assert(this._tip.hash !== commonHeader.hash, 'Header Service: we were asked to reorg from and to the same hash.');
var ops = [];
var startingHeight = this._tip.height;
var hash = this._tip.hash;
var headerCount = 0;
while(hash !== commonHeader.hash) {
headerCount++;
var header = headers.getIndex(startingHeight--);
assert(header, 'Expected to have a header at this height, but did not. Reorg failed.');
hash = header.prevHash;
ops.push({
type: 'del',
key: this._encoding.encodeHeaderHashKey(header.hash)
@ -640,7 +654,9 @@ HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader,
type: 'del',
key: this._encoding.encodeHeaderHeightKey(header.height)
});
}
// setting our tip to the common ancestor
this._tip.hash = commonHeader.hash;
this._tip.height = commonHeader.height;
@ -805,6 +821,7 @@ HeaderService.prototype._handleLowTipHeight = function() {
// our peer has reorg'ed to lower overall height.
// we should get the first block after the split, reorg back to this height and then continue.
// we should still have no listeners for anything, blocks, headers, etc. at this point
self._p2p.getP2PBlock({
filter: {
startHash: reorgInfo.commonHeader.hash,
@ -813,6 +830,7 @@ HeaderService.prototype._handleLowTipHeight = function() {
blockHash: reorgInfo.blockHash
}, function(block) {
self._initialSync = true;
self._handleReorg(block, reorgInfo.commonHeader, function(err) {
if(err) {
@ -873,7 +891,7 @@ HeaderService.prototype._getP2PHeaders = function(hash) {
HeaderService.prototype._sync = function() {
this._startHeaderSubscription();
this._startHeaderSubscription(); // ensures only one listener will ever be registered
this._getP2PHeaders(this._tip.hash);
};