From 3dd9aea3ddf242f6040c23be70fd51f2128e6d9d Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Fri, 1 Sep 2017 15:56:42 -0400 Subject: [PATCH] Fixed issue where new blocks were not indexed by header service. --- lib/services/block/index.js | 2 +- lib/services/header/index.js | 264 ++++++++++++++++------------- test/services/header/index.unit.js | 2 + 3 files changed, 146 insertions(+), 122 deletions(-) diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 4b1715dc..beee0282 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -616,7 +616,6 @@ BlockService.prototype._sync = function() { var self = this; - if (self.node.stopping) { return; } @@ -639,6 +638,7 @@ BlockService.prototype._sync = function() { return; } + self._header.lastBlockQueried = self._tip.hash; self._p2p.getBlocks({ startHash: self._tip.hash, endHash: hash }); }); diff --git a/lib/services/header/index.js b/lib/services/header/index.js index 1040642a..9479a583 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -28,6 +28,7 @@ var HeaderService = function(options) { this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; this._lastHeader = null; this.blockServiceSyncing = true; + this.lastBlockQueried = null; this._blockQueue = []; }; @@ -198,21 +199,32 @@ HeaderService.prototype.start = function(callback) { }, ], function(err) { - if (err) { - return callback(err); - } + if (err) { + return callback(err); + } - self._setListeners(); - self._bus = self.node.openBus({remoteAddress: 'localhost-header'}); - self._startHeaderSubscription(); - callback(); + self._setListeners(); + self._bus = self.node.openBus({remoteAddress: 'localhost-header'}); + self._startHeaderSubscription(); + + callback(); }); }; HeaderService.prototype.stop = function(callback) { + + if (this._headerInterval) { + clearInterval(this._headerInterval); + } + + if (this._blockProcessor) { + clearInterval(this._blockProcessor); + } + callback(); + }; HeaderService.prototype._startHeaderSubscription = function() { @@ -235,134 +247,121 @@ HeaderService.prototype.getPublishEvents = function() { }; +// Called by the p2p network when a block arrives. // If blocks arrive in rapid succession from the p2p network, -// then this handler could not be finished saving the previous block info -// this should be a rare case since blocks aren't usually mined milliseconds apart +// then this handler could be in the middle of saving a header leading to an erroroneous reorg HeaderService.prototype._queueBlock = function(_block) { var self = this; - // mode = syncing, this header already saved - if (self.blockServiceSyncing) { - - return self._broadcast(_block); - - } - - // mode = fully synced, live blocks arriving - if (!self._blockProcessor) { - self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000); - } - - // we just need to queue the block, the interval processor will take it from there - self._blockQueue.push(_block); - -}; - - -// this gets fired on a timer to process new blocks from the queue -// under normal circumstances, the queue will only have 1-2 items in it, probably less -// we need this in case there is a deluge of blocks from a peer, asynchronously -// we will also just process one block per call to keep things simple -HeaderService.prototype._processBlocks = function() { - - var self = this; - - // if we have no blocks to process, exit - var block = self._blockQueue.shift(); - - if (!block) { - return; - } - - // if the block service is busy, then exit and wait for the next call - if (self.blockServiceSyncing) { - return; - } - - // clear the interval because this process might span intervals - // we don't want to turn this back on until we know the block header has been persisted - clearInterval(self._blockProcessor); - - // let's see if this de-queued block is in order and not reorging the chain - var prevHash = bcoin.util.revHex(block.prevBlock); - var newBlock = prevHash === self._lastHeader.hash; - - - // common case, blocks arriving in order and not reorging - if (newBlock) { - return self._onSyncBlock(block); - } - - // if we are here, we've got special things to do. This is almost always a reorg sitch - self._detectAndHandleReorg(block); - -}; - -HeaderService.prototype._onSyncBlock = function(block) { - - var self = this; - - - self._syncBlock(block, function(err) { - - if (err) { - log.error(err); - self.node.stop(); - } - - self._broadcast(block); - - // at this point we know the block header has been persisted, but - // not necessarily that the services are done syncing - self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000); - - }); -}; - -HeaderService.prototype._detectAndHandleReorg = function(block) { - - var self = this; - - // this is the rare case that a block comes to us out of order or is a reorg'ed block - self._detectReorg(block, function(err, reorg) { + // in all cases, we want to ensure the header is saved + self._persistHeader(_block, function(err) { if (err) { log.error(err); return self.node.stop(); } - // if not reorg, then this block is out of order and we can't do anything with it - // TODO: does this ever happen? Can this ever happen? - if (!reorg) { - return log.warn('Block: ' + block.rhash() + ' arrived out of order, skipping.'); + if (self.blockServiceSyncing) { + // if we are syncing, we can broadcast right away + return self._broadcast(_block); } - var header = block.toHeaders().toJSON(); - header.timestamp = header.ts; - header.prevHash = header.prevBlock; + // queue the block for _processBlock to process later + self._blockQueue.push(_block); - self._handleReorg(block, header, function(err) { + // if we don't already have a timer, set one here to get things going + if (!self._blockProcessor) { - if (err) { - log.error(err); - return self.node.stop(); - } + self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000); - self._onSyncBlock(block); - - }); + } }); + +}; + + +// under normal circumstances, the queue will only have 1-2 items in it, probably less +// we need this in case there is a deluge of blocks from a peer, asynchronously +HeaderService.prototype._processBlocks = function() { + + // if we have no blocks to process, exit + var block = this._blockQueue.shift(); + + if (!block) { + return; + } + + // if the block service is busy, then exit and wait for the next call + // we can leave this interval handler enabled because the block service should set the + // syncing flag while is it processing blocks + if (this.blockServiceSyncing) { + return; + } + + this._broadcast(block); +}; + +HeaderService.prototype._persistHeader = function(block, callback) { + + var self = this; + + self._detectReorg(block, function(err, reorgStatus, historicalBlock) { + + if (err) { + return callback(err); + } + + // common case + if (historicalBlock) { + return callback(); + } + + // new block, not causing a reorg + if (!reorgStatus && !historicalBlock) { + + return self._syncBlock(block, function(err) { + + if (err) { + return callback(err); + } + + if (!self.blockServiceSyncing && !self._blockProcessor) { + self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000); + } + + callback(); + + }); + } + + // new block causing a reorg + self._handleReorg(block, self._formatHeader(block), function(err) { + + if (err) { + return callback(err); + } + + }); + }); + +}; + +HeaderService.prototype._formatHeader = function(block) { + + var header = block.toHeaders().toJSON(); + header.timestamp = header.ts; + header.prevHash = header.prevBlock; + return header; + }; HeaderService.prototype._syncBlock = function(block, callback) { var self = this; - var header = block.toHeaders().toJSON(); - header.timestamp = header.ts; - header.prevHash = header.prevBlock; + var header = self._formatHeader(block); log.debug('Header Service: new block: ' + block.rhash()); @@ -392,9 +391,11 @@ HeaderService.prototype._onHeader = function(header) { header.height = this._lastHeader.height + 1; header.chainwork = this._getChainwork(header, this._lastHeader).toString(16, 64); + if (!header.timestamp) { header.timestamp = header.time; } + this._lastHeader = header; return [ @@ -529,11 +530,11 @@ HeaderService.prototype._syncComplete = function() { HeaderService.prototype._setBestHeader = function() { - var bestHeader = this._lastHeader; - this._tip.height = bestHeader.height; - this._tip.hash = bestHeader.hash; + var bestHeader = this._lastHeader; + this._tip.height = bestHeader.height; + this._tip.hash = bestHeader.hash; - log.debug('Header Service: ' + bestHeader.hash + ' is the best block hash.'); + log.debug('Header Service: ' + bestHeader.hash + ' is the best block hash.'); }; HeaderService.prototype._getHeader = function(height, hash, callback) { @@ -572,8 +573,22 @@ HeaderService.prototype._getHeader = function(height, hash, callback) { HeaderService.prototype._detectReorg = function(block, callback) { - assert(block, 'Block is needed to detect reorg.'); + var self = this; + var prevHash = bcoin.util.revHex(block.prevBlock); + var nextBlock = prevHash === self._lastHeader.hash; + + // this is the last block the block service asked for + if (prevHash === this.lastBlockQueried) { + return callback(null, false, true); + } + + // new block, not a historical block + if (nextBlock) { + return callback(null, false, false); + } + + // could really be a reorg block var key = this._encoding.encodeHeaderHashKey(bcoin.util.revHex(block.prevBlock)); this._db.get(key, function(err, val) { @@ -584,10 +599,10 @@ HeaderService.prototype._detectReorg = function(block, callback) { // is this block's prevHash already referenced in the database? If so, reorg if (val) { - return callback(null, true); + return callback(null, true, false); } - callback(null, false); + callback(null, false, false); }); @@ -626,7 +641,12 @@ HeaderService.prototype._handleReorg = function(block, header, callback) { return callback(err || new Error('Missing headers')); } - var hash = headers.getIndex(self._originalTip.height).hash; + var header = headers.getIndex(self._originalTip.height); + + assert(header, 'Atempted to get reorg header for the original tip: ' + self._originalTip.hash + + ' at height: ' + self._originalTip.height + ' but could not find in the database.'); + + var hash = header.hash; if (block && header) { hash = block.rhash(); @@ -674,7 +694,9 @@ HeaderService.prototype._sync = function() { var self = this; var progress; - if (self._bestHeight === 0) { + var bestHeight = Math.max(self._bestHeight, self._lastHeader.height); + + if (bestHeight === 0) { progress = 0; } else { progress = (self._tip.height / self._bestHeight*100.00).toFixed(2); @@ -739,7 +761,7 @@ HeaderService.prototype.getNextHash = function(tip, callback) { }; HeaderService.prototype.getLastHeader = function() { - assert(this._lastHeader, 'Last headers should be populated.'); + assert(this._lastHeader, 'Last header should be populated.'); return this._lastHeader; }; diff --git a/test/services/header/index.unit.js b/test/services/header/index.unit.js index 4ff052c7..b423da78 100644 --- a/test/services/header/index.unit.js +++ b/test/services/header/index.unit.js @@ -102,6 +102,7 @@ describe('Header Service', function() { headerService._tip = { height: 120 }; var getHeaders = sandbox.stub(); headerService._p2p = { getHeaders: getHeaders }; + headerService._lastHeader = { height: 124 }; headerService._startSync(); expect(getHeaders.calledOnce).to.be.true; }); @@ -115,6 +116,7 @@ describe('Header Service', function() { headerService._tip = { height: 121, hash: 'a' }; var getHeaders = sandbox.stub(); headerService._p2p = { getHeaders: getHeaders }; + headerService._lastHeader = { height: 124 }; headerService._sync(); expect(getHeaders.calledOnce).to.be.true; });