From 66e82a3fe704862df203e544dc87476f491ac5d9 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Tue, 19 Sep 2017 08:48:49 -0400 Subject: [PATCH] Fixed edge case where new peer has unexpectedly low number of blocks. --- lib/services/block/index.js | 15 ++- lib/services/header/index.js | 210 +++++++++++++++++++++++++---------- lib/services/p2p/index.js | 4 +- 3 files changed, 166 insertions(+), 63 deletions(-) diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 6be4d1bd..055f37b5 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -470,6 +470,14 @@ BlockService.prototype._onAllHeaders = function() { // once the header service has all of its headers, we know we can check our // own tip for consistency and make sure our it is on the mainchain var self = this; + if (self._syncing) { + return; + } + + if (!self._initialSync) { + return self._sync(); + } + self._checkTip(function(err) { if(err) { @@ -480,6 +488,7 @@ BlockService.prototype._onAllHeaders = function() { self._startSync(); }); + }; @@ -606,7 +615,7 @@ BlockService.prototype.onBlock = function(block, callback) { BlockService.prototype._onBlock = function(block) { - if (this.node.stopping || this._reorging) { + if (this.node.stopping || this._reorging || this._tip.hash === block.rhash()) { return; } @@ -629,7 +638,7 @@ BlockService.prototype._setListeners = function() { var self = this; - self._header.once('headers', self._onAllHeaders.bind(self)); + self._header.on('headers', self._onAllHeaders.bind(self)); self._header.on('reorg', function(block, headers) { @@ -655,7 +664,7 @@ BlockService.prototype._setListeners = function() { }, function(err) { if (err) { log.error(err); - self.nodes.stop(); + self.node.stop(); } }); diff --git a/lib/services/header/index.js b/lib/services/header/index.js index 9458f625..ca35d76c 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -30,7 +30,6 @@ var HeaderService = function(options) { this.blockServiceSyncing = true; this.lastBlockQueried = null; this._initialSync = true; - this._blockQueue = []; }; inherits(HeaderService, BaseService); @@ -206,6 +205,9 @@ HeaderService.prototype.start = function(callback) { return callback(err); } + // set block worker queue, concurrency 1 + self._blockProcessor = async.queue(self._processBlocks.bind(self)); + self._setListeners(); self._bus = self.node.openBus({remoteAddress: 'localhost-header'}); self._startHeaderSubscription(); @@ -223,11 +225,6 @@ HeaderService.prototype.stop = function(callback) { this._headerInterval = null; } - if (this._blockProcessor) { - clearInterval(this._blockProcessor); - this._blockProcessor = null; - } - callback(); }; @@ -252,56 +249,43 @@ HeaderService.prototype.getPublishEvents = function() { }; -// block handler for blocks from the p2p network HeaderService.prototype._queueBlock = function(block) { - // this block was queried by the block service and, thus, we already have it - if (block.rhash() === this.lastBlockQueried) { - return; - } - - // queue the block for _processBlock to process later - // we won't start processing blocks until we have all of our headers - // so if loading headers takes a really long time, we could have a long - // list of blocks. Warning will be logged if this happens. - this._blockQueue.push(block); - -}; - - -// we need this in case there is a deluge of blocks from a peer, asynchronously -HeaderService.prototype._processBlocks = function() { - var self = this; - var block = self._blockQueue.shift(); - - if (self._blockQueue.length > 2) { - // normally header sync is pretty quick, within a minute or two. - // under normal circumstances, we won't queue many blocks - log.warn('Header Service: Block queue has: ' + self._blockQueue.length + ' items.'); - } - - if (!block) { + if (block.rhash() === self.lastBlockQueried) { return; } - assert(block.rhash() !== self._lastHeader.hash, 'Trying to save a header that has already been saved.'); - - clearInterval(self._blockProcessor); - self._blockProcessor = null; - - self._persistHeader(block, function(err) { + self._blockProcessor.push(block, function(err) { if (err) { log.error(err); return self.node.stop(); } + log.info('Header Service: completed processing block: ' + block.rhash() + ' prev hash: ' + bcoin.util.revHex(block.prevBlock)); + + }); + +}; + +HeaderService.prototype._processBlocks = function(block, callback) { + + var self = this; + + assert(block.rhash() !== self._lastHeader.hash, 'Trying to save a header that has already been saved.'); + + self._persistHeader(block, function(err) { + + if (err) { + return callback(err); + } + if (!self.blockServiceSyncing) { self._broadcast(block); } - self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000); + callback(); }); @@ -324,7 +308,7 @@ HeaderService.prototype._persistHeader = function(block, callback) { } log.warn('Header Service: Reorganization detected, current tip hash: ' + - self._tip.hash + ' new block causing the reorg: ' + block.rhash() + + self._tip.hash + ', new block causing the reorg: ' + block.rhash() + ' common ancestor hash: ' + commonHeader.hash); self._handleReorg(block, commonHeader, function(err) { @@ -389,6 +373,7 @@ HeaderService.prototype._onHeader = function(header) { header.timestamp = header.time; } + console.log('about to set last header from onHeader function'); this._lastHeader = header; return [ @@ -425,6 +410,8 @@ HeaderService.prototype._onHeaders = function(headers) { header = header.toObject(); + assert(self._lastHeader.hash === header.prevHash, 'headers not in order: ' + self._lastHeader.hash + ' -and- ' + header.prevHash + ' Last header at height: ' + self._lastHeader.height); + var ops = self._onHeader(header); dbOps = dbOps.concat(ops); @@ -475,11 +462,6 @@ HeaderService.prototype._onHeadersSave = function(err) { } - if (self._initialSync) { - // we'll turn the block processor on right after we've sync'ed headers - self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000); - } - self._startBlockSubscription(); self._setBestHeader(); @@ -635,6 +617,7 @@ HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader, // setting our tip to the common ancestor this._tip.hash = commonHeader.hash; this._tip.height = commonHeader.height; + console.log('about to set last header from reorg function'); this._lastHeader = commonHeader; this._db.batch(ops, callback); @@ -642,13 +625,11 @@ HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader, HeaderService.prototype._setListeners = function() { - this._p2p.once('bestHeight', this._onBestHeight.bind(this)); + this._p2p.on('bestHeight', this._onBestHeight.bind(this)); }; HeaderService.prototype._onBestHeight = function(height) { - assert(height >= this._tip.height, 'Our peer does not seem to be fully synced: best height: ' + - height + ' tip height: ' + this._tip.height); log.debug('Header Service: Best Height is: ' + height); this._bestHeight = height; this._startSync(); @@ -656,11 +637,122 @@ HeaderService.prototype._onBestHeight = function(height) { HeaderService.prototype._startSync = function() { - this._numNeeded = this._bestHeight - this._tip.height; + var self = this; + // if our tip height is less than the best height of this peer, then: + // 1. the peer is not fully synced. + // 2. the peer has reorg'ed and we need to handle this - log.info('Header Service: Gathering: ' + this._numNeeded + ' ' + 'header(s) from the peer-to-peer network.'); + // unsub from listening for blocks + // ensure the blockProcessor is finished processing blocks (empty queue) + // then proceed with gathering new set(s) of headers + + self._bus.unsubscribe('p2p/block'); + + async.retry(function(next) { + + next(self._blockProcessor.length !== 0); + + }, function() { + + self._numNeeded = self._bestHeight - self._tip.height; + + if (self._numNeeded > 0) { + log.info('Header Service: Gathering: ' + self._numNeeded + ' ' + 'header(s) from the peer-to-peer network.'); + self._sync(); + } else if (self._numNeeded < 0) { + self._handleLowTipHeight(); + } + + }); + +}; + +HeaderService.prototype._findReorgConditionInNewPeer = function(callback) { + var self = this; + + var allHeaders = ; + var headerCount = 0; + var tipHash = self.GENESIS_HASH; + + self._bus.removeAllListeners(); + + self._bus.on('p2p/headers', function(headers) { + + headers.forEach(function(header) { + allHeaders[header.hash] = header.prevHash; + headerCount++; + }); + + if (headerCount < self._bestHeight) { + return self._p2p.getHeaders({ startHash: tipHash }); + } + + self.getAllHeaders(function(err, allHeaders) { + + if (err) { + return callback(err); + } + + if (allHeaders.get( + + }); + + }); + + +}; + +HeaderService.prototype._handleLowTipHeight = function() { + var self = this; + + log.warn('Header Service: Connected Peer has a best height (' + self._bestHeight + ') which is lower than our tip height (' + + self._tip.height + '). This means that this peer is not fully synchronized with the network -or- the peer has reorganized itself.' + + ' Checking the new peer\'s header for a reorganization event.'); + + self._findReorgConditionInNewPeer(function(err, reorgInfo) { + + if (err) { + log.error(err); + return self.node.stop(); + } + + // case 1: there is a reorg condition in the new peer + if (reorgInfo) { + return self._p2p.getP2PBlock({ + filter: { + startHash: reorgInfo.commonHeader.hash, + endHash: 0 + }, + blockHash: reorgInfo.blockHash + }, function(block) { + + self._handleReorg(block, reorgInfo.commonHeader, function(err) { + + if(err) { + log.error(err); + return self.node.stop(); + } + + self._syncBlock(block, function(err) { + + if (err) { + log.error(err); + return self.node.stop(); + } + + self._sync(); + + }); + + }); + }); + + } + + // case 2: the new peer is just not synced up, so we must wait + self._sync(); + }); - this._sync(); }; @@ -690,12 +782,14 @@ HeaderService.prototype._sync = function() { self._p2p.getHeaders({ startHash: self._tip.hash }); - // when connecting to a peer that isn't yet responding to getHeaders, we will start a interval timer - // to retry until we can get headers, this may be a very long interval - self._headerInterval = setInterval(function() { - log.info('Header Service: retrying get headers since ' + self._tip.hash); - self._p2p.getHeaders({ startHash: self._tip.hash }); - }, 2000); + if (!self._headerInterval) { + // when connecting to a peer that isn't yet responding to getHeaders, we will start a interval timer + // to retry until we can get headers, this may be a very long interval + self._headerInterval = setInterval(function() { + log.info('Header Service: retrying get headers since ' + self._tip.hash); + self._p2p.getHeaders({ startHash: self._tip.hash }); + }, 2000); + } }; diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index 47fe7a58..09b56004 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -51,7 +51,7 @@ P2P.prototype.getNumberOfPeers = function() { P2P.prototype.getP2PBlock = function(opts, callback) { // opts is { filter: {}, blockHash: block hash we want } - this._currentRequest = opts.filter; + this._currentRequest = { opts: opts, callback: callback }; var peer = this._getPeer(); @@ -344,7 +344,7 @@ P2P.prototype._onPeerReady = function(peer, addr) { if (this._currentRequest) { log.info('Restarting last query'); this.clearInventoryCache(); - this.getP2PBlock(this._currentRequest); + this.getP2PBlock(this._currentRequest.opts, this._currentRequest.callback); } if (bestHeight >= 0) {