Fixed waiting for blocks to finish processing before processing a reorg.

This commit is contained in:
Chris Kleeschulte 2017-10-30 15:30:01 -04:00
parent 6a2c6c8d4f
commit bc8dee5810
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F

View File

@ -26,7 +26,8 @@ var BlockService = function(options) {
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network];
this._initialSync = false;
this._serviceIniting = false;
this._processingBlock = false;
this._blocksInQueue = 0;
this._recentBlockHashesCount = options.recentBlockHashesCount || 1000; // block service won't reorg past this point
this._recentBlockHashes = new LRU(this._recentBlockHashesCount);
@ -522,17 +523,19 @@ BlockService.prototype.onHeaders = function(callback) {
var self = this;
// at this point, we need to ensure the block header is in a very specific state
// 1. we should have no listeners of any kind that could cause side effects
// 2. we should have no pending events, yet to be fired, after this routine yields
// if this service is waiting on block-related callbacks to be fired in the event loop,
// then we need to wait for the _processingBlock flag to be set to false.
// when this flag is false, we know we aren't waiting on any new blocks or historical blocks
// that we asked for, but not yet received
self._initialSync = true;
self._serviceIniting = true;
self._removeAllSubscriptions();
// this should ensure that any handlers, yet to be fired, will fire
setImmediate(function() {
// a heavy block could take a really long time to index
async.retry({ interval: 1000, times: 100 }, function(next) {
return next(self._processingBlock);
}, function(err) {
if (err) {
return callback(err);
}
self._onHeaders(callback);
});
@ -542,8 +545,6 @@ BlockService.prototype._onHeaders = function(callback) {
var self = this;
self._serviceIniting = false;
// check whether or not we need build a new tip (unlikely)
self._resetTip(function(err) {
@ -560,6 +561,8 @@ BlockService.prototype._onHeaders = function(callback) {
}, function() {
self._removeAllSubscriptions();
// check to see if our current tip matches what the header service has.
// if this is a reorg during our initial block sync, it is especially important
// to see if we've synced past where the reorg/fork took place.
@ -571,6 +574,7 @@ BlockService.prototype._onHeaders = function(callback) {
// we've checked the tip, handled any reorg for ourselves, and we are ready to
// sync any new blocks that might exist after our tip
self._reorging = false;
self._startSync();
// once we start syncing, we can call back to the header service, so that it can
@ -832,19 +836,24 @@ BlockService.prototype._onBlock = function(block, callback) {
var self = this;
if (self._reorging || self._serviceIniting) {
if (self._reorging) {
self._processingBlock = false;
return callback();
}
self._processingBlock = true;
self._getBlock(block.rhash(), function(err, _block) {
if(err) {
self._processingBlock = false;
return self._handleError(err);
}
if (_block) {
self._processingBlock = false;
log.debug('Block Service: not syncing, block already in database.');
return setImmediate(callback);
return callback();
}
self._processBlock(block, callback);
@ -857,6 +866,7 @@ BlockService.prototype._processBlock = function(block, callback) {
var self = this;
if (self.node.stopping) {
self._processingBlock = false;
return callback();
}
@ -873,6 +883,7 @@ BlockService.prototype._processBlock = function(block, callback) {
// we will have a chance to clear out our block queue, check our tip,
// discover where to reorg to, reorg all the services that rely on
// blocks and sync from there.
self._processingBlock = false;
return callback();
};
@ -895,17 +906,26 @@ BlockService.prototype._saveBlock = function(block, callback) {
}, function(err, ops) {
if (err) {
self._processingBlock = false;
return callback(err);
}
self._db.batch(_.compact(_.flattenDeep(ops)), function(err) {
if (err) {
self._processingBlock = false;
return callback(err);
}
self._recentBlockHashes.set(block.rhash(), bcoin.util.revHex(block.prevBlock));
self._setTip({ hash: block.rhash(), height: self._tip.height + 1 }, callback);
self._setTip({ hash: block.rhash(), height: block.__height }, function(err) {
if (err) {
self._processingBlock = false;
return callback(err);
}
self._processingBlock = false;
callback();
});
});
});
@ -924,6 +944,7 @@ BlockService.prototype._syncBlock = function(block) {
clearTimeout(self._getBlocksTimer);
if (self._lastBlockSaved === block.rhash()) {
self._processingBlock = false;
return;
}
@ -991,10 +1012,6 @@ BlockService.prototype._onSynced = function() {
clearInterval(this._reportInterval);
}
if (this._serviceIniting) {
return;
}
self._logProgress();
self._initialSync = false;
self._startBlockSubscription();
@ -1024,19 +1041,23 @@ BlockService.prototype._sync = function() {
var self = this;
if (self.node.stopping) {
if (self.node.stopping || self._reorging) {
return;
}
self._processingBlock = true;
log.debug('Block Service: querying header service for next block using tip: ' + self._tip.hash);
self._header.getEndHash(self._tip, self._readAheadBlockCount, function(err, targetHash, endHash) {
if(err) {
self._processingBlock = false;
return self._handleError(err);
}
if (!targetHash && !endHash) {
self._processingBlock = false;
return self.emit('synced');
}
@ -1048,7 +1069,11 @@ BlockService.prototype._sync = function() {
// then we must assume we've reorg'ed very shortly after
// we made this call and we should re-compute where we are
self._getBlocksTimer = setTimeout(function() {
self.emit('next block');
log.debug('Block Service: block timeout, emitting for next block');
self._processingBlock = false;
if (!self._reorging) {
self.emit('next block');
}
}, 5000);
self._getBlocksTimer.unref();