From 09b365772c41ae209e52c63229bb1155752e1760 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Thu, 28 Sep 2017 11:01:57 -0400 Subject: [PATCH] more reorg stuff. --- lib/services/block/index.js | 57 +++++++++++++++++++++++------------- lib/services/header/index.js | 26 +++++++++++++--- 2 files changed, 59 insertions(+), 24 deletions(-) diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 5648d902..df4598d3 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -25,8 +25,9 @@ var BlockService = function(options) { this._blockCount = 0; this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; this._initialSync = true; - this._reorgBackToBlock = null; // use this to rewind your indexes to a specific point by height or hash + this._reorgBackToBlock = 1202419; // use this to rewind your indexes to a specific point by height or hash this._timeOfLastBlockReport = Date.now() - 30000; + this._blocksInQueue = 0; }; inherits(BlockService, BaseService); @@ -188,7 +189,6 @@ BlockService.prototype._checkTip = function(callback) { return callback(); } - log.warn('Block Service: reorganization spotted...'); self._findCommonAncestor(function(err, commonAncestorHash) { if(err) { return callback(err); @@ -361,6 +361,7 @@ BlockService.prototype._queueBlock = function(block) { var self = this; + self._blocksInQueue++; self._blockProcessor.push(block, function(err) { if (err) { @@ -375,6 +376,8 @@ BlockService.prototype._queueBlock = function(block) { log.debug('Block Service: completed processing block: ' + block.rhash() + ' prev hash: ' + bcoin.util.revHex(block.prevBlock) + ' height: ' + self._tip.height); + self._blocksInQueue--; + }); }; @@ -490,6 +493,12 @@ BlockService.prototype.onHeaders = function(callback) { var self = this; + // when this fires, we switch into initial sync mode (like first boot) + // this includes finishing any blocks that may be in the queue, then + // checking the tip, etc. + // we may be in a reorg situation at the time this function runs + // because the header service runs this function after it completes its + // reorg routines. self._removeAllSubscriptions(); self._resetTip(function(err) { @@ -498,7 +507,7 @@ BlockService.prototype.onHeaders = function(callback) { } async.retry(function(next) { - next(self._blockProcessor.length() !== 0); + next(self._blocksInQueue > 0); }, function() { @@ -535,6 +544,9 @@ BlockService.prototype._handleReorg = function(commonAncestorHash, callback) { var self = this; + // we want to ensure that we can reask for previously delievered inventory + self._p2p.clearInventoryCache(); + log.warn('Block Service: chain reorganization detected, current height/hash: ' + self._tip.height + '/' + self._tip.hash + ' common ancestor hash: ' + commonAncestorHash); @@ -643,18 +655,16 @@ BlockService.prototype._processBlock = function(block, callback) { // common case if (!self._detectReorg(block)) { - return setImmediate(function() { - self._saveBlock(block, callback); - }); + return self._saveBlock(block, callback); } - // reorg - self._handleReorg(bcoin.util.revHex(block.prevBlock), function(err) { - if(err) { - return callback(err); - } - self._saveBlock(block, callback); - }); + // reorg -- in this case, we will not handle the reorg right away + // instead, we will skip the block and wait for the eventual call to + // "onHeaders" function. When the header service calls this function, + // we will have a chance to clear out our block queue, check our tip, + // discover where to reorg to, reorg all the services that rely on + // blocks and sync from there. + return callback(); }; @@ -683,13 +693,16 @@ BlockService.prototype._saveBlock = function(block, callback) { return callback(err); } - self._setTip({ hash: block.rhash(), height: self._tip.height + 1 }); - var tipOps = utils.encodeTip(self._tip, self.name); + var tipOps = utils.encodeTip({ + hash: block.rhash(), + height: self._tip.height + 1 + }, self.name); self._db.put(tipOps.key, tipOps.value, function(err) { if(err) { return callback(err); } + self._setTip({ hash: block.rhash(), height: self._tip.height + 1 }); callback(); }); }); @@ -753,6 +766,8 @@ BlockService.prototype._startSync = function() { if (numNeeded > 0) { this.on('next block', this._sync.bind(this)); this.on('synced', this._onSynced.bind(this)); + clearInterval(this._reportInterval); + this._reportingInterval = setInterval(this._reportStatus.bind(this), 5000); return this._sync(); } @@ -760,6 +775,13 @@ BlockService.prototype._startSync = function() { }; +BlockService.prototype._reportStatus = function() { + if (this._tip.height % 144 === 0 || Date.now() - this._timeOfLastBlockReport > 10000) { + this._timeOfLastBlockReport = Date.now(); + this._logProgress(); + } +}; + BlockService.prototype._sync = function() { var self = this; @@ -768,11 +790,6 @@ BlockService.prototype._sync = function() { return; } - if (self._tip.height % 144 === 0 || Date.now() - self._timeOfLastBlockReport > 10000) { - self._timeOfLastBlockReport = Date.now(); - self._logProgress(); - } - self._header.getNextHash(self._tip, function(err, targetHash, nextHash) { if(err) { diff --git a/lib/services/header/index.js b/lib/services/header/index.js index 416337c6..7a6658d1 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -307,6 +307,10 @@ HeaderService.prototype._persistHeader = function(block, callback) { return self._syncBlock(block, callback); } + // this will lead to rechecking the network for more headers after our last header + // and calling on=Headers on each service that implements it + self._initialSync = true; + self._findCommonAncestor(block, function(err, commonHeader) { if (err || !commonHeader) { @@ -505,14 +509,16 @@ HeaderService.prototype._onHeadersSave = function(callback) { return callback(); } - self._endHeaderSubscription(); - self._startBlockSubscription(); + self._endHeaderSubscription(); // we don't need headers any more + self._startBlockSubscription(); // we need new blocks coming tu us aynchronuously + self._setBestHeader(); if (!self._initialSync) { return callback(); } + // this will happen after an inital start up and sync -and- also after a chain reorg log.info('Header Service: sync complete.'); self._initialSync = false; @@ -520,7 +526,7 @@ HeaderService.prototype._onHeadersSave = function(callback) { if (service.onHeaders) { return service.onHeaders.call(service, next); } - setImmediate(next); + next(); }, callback); }; @@ -623,15 +629,23 @@ HeaderService.prototype._handleReorg = function(block, commonHeader, callback) { HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader, callback) { // remove all headers with a height greater than commonHeader + + assert(this._tip.hash !== commonHeader.hash, 'Header Service: we were asked to reorg from and to the same hash.'); + var ops = []; var startingHeight = this._tip.height; var hash = this._tip.hash; var headerCount = 0; + while(hash !== commonHeader.hash) { + headerCount++; + var header = headers.getIndex(startingHeight--); + assert(header, 'Expected to have a header at this height, but did not. Reorg failed.'); hash = header.prevHash; + ops.push({ type: 'del', key: this._encoding.encodeHeaderHashKey(header.hash) @@ -640,7 +654,9 @@ HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader, type: 'del', key: this._encoding.encodeHeaderHeightKey(header.height) }); + } + // setting our tip to the common ancestor this._tip.hash = commonHeader.hash; this._tip.height = commonHeader.height; @@ -805,6 +821,7 @@ HeaderService.prototype._handleLowTipHeight = function() { // our peer has reorg'ed to lower overall height. // we should get the first block after the split, reorg back to this height and then continue. + // we should still have no listeners for anything, blocks, headers, etc. at this point self._p2p.getP2PBlock({ filter: { startHash: reorgInfo.commonHeader.hash, @@ -813,6 +830,7 @@ HeaderService.prototype._handleLowTipHeight = function() { blockHash: reorgInfo.blockHash }, function(block) { + self._initialSync = true; self._handleReorg(block, reorgInfo.commonHeader, function(err) { if(err) { @@ -873,7 +891,7 @@ HeaderService.prototype._getP2PHeaders = function(hash) { HeaderService.prototype._sync = function() { - this._startHeaderSubscription(); + this._startHeaderSubscription(); // ensures only one listener will ever be registered this._getP2PHeaders(this._tip.hash); };