From b4fdfa3d42511ebb0c4dde5ead84c874ab834699 Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Fri, 11 Aug 2017 18:00:19 -0400 Subject: [PATCH] Fixed minor sync issue. --- lib/services/block/index.js | 77 +++++++++++++++++------------------- lib/services/header/index.js | 46 +++++++++++++++------ 2 files changed, 71 insertions(+), 52 deletions(-) diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 57954569..352a08b8 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -24,10 +24,9 @@ var BlockService = function(options) { this._subscriptions.block = []; this._subscriptions.reorg = []; - this._unprocessedBlocks = []; - this._blockCount = 0; - this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.getNetworkName()]; + this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; + this._initialSync = true; }; inherits(BlockService, BaseService); @@ -212,8 +211,9 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback var count = 0; var _oldTip = this._tip.hash; var _newTip = hash; + var oldBlocks = []; - assert(_newTip && _oldTip, 'current chain and/or new chain do not exist in our list of chain tips.'); + assert(_oldTip, 'We don\'t have a tip hash to reorg away from!'); async.whilst( // test case @@ -225,22 +225,32 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback // get block function(next) { - // old tip has to be in database + // old tip (our current tip) has to be in database self._db.get(self._encoding.encodeBlockKey(_oldTip), function(err, data) { if (err || !data) { return next(err || new Error('missing block')); } + // once we've found the old tip, we will find its prev and check to see if matches new tip's prev var block = self._encoding.decodeBlockValue(data); + + // we will squirrel away the block because our services will need to remove it after we've found the common ancestor + oldBlocks.push(block); + + // this is our current tip's prev hash _oldTip = bcoin.util.revHex(block.prevBlock); + + // our current headers have the correct state of the chain, so consult that for its prev hash var header = allHeaders.get(_newTip); if (!header) { return next(new Error('Header missing from list of headers')); } + // set new tip to the prev hash _newTip = header.prevHash; + next(); }); @@ -251,15 +261,8 @@ BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback return callback(err); } - self._getOldBlocks(_newTip, function(err, oldBlocks) { - - if (err) { - return callback(err); - } - - callback(null, hash, _newTip, oldBlocks); - - }); + var commonAncestorHash = _newTip; + callback(null, hash, commonAncestorHash, oldBlocks); }); }; @@ -298,9 +301,11 @@ BlockService.prototype._getHash = function(blockArg) { BlockService.prototype._handleReorg = function(hash, allHeaders) { + // hash is the hash of the new block that we are reorging to. + assert(hash, 'We were asked to reorg to a non-existent hash.'); var self = this; - self._reorging = true; // while self is set, we won't be sending blocks + self._reorging = true; log.warn('Block Service: Chain reorganization detected! Our current block tip is: ' + self._tip.hash + ' the current block: ' + hash + '.'); @@ -319,7 +324,7 @@ BlockService.prototype._handleReorg = function(hash, allHeaders) { } var commonAncestorHeader = allHeaders.get(commonAncestorHash); - log.warn('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader + '.'); + log.warn('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader); self._broadcast(self.subscriptions.reorg, 'block/reorg', [commonAncestorHeader, oldBlocks]); @@ -393,18 +398,16 @@ BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList) { }; -BlockService.prototype._onAllHeaders = function(headers) { - this._bestHeight = headers.size; +BlockService.prototype._onAllHeaders = function() { this._startSync(); }; -BlockService.prototype._processBlock = function() { +BlockService.prototype._processBlock = function(block) { var self = this; var operations = []; var services = self.node.services; - var block = self._unprocessedBlocks.shift(); var headers = self._header.getAllHeaders(); async.eachSeries( @@ -471,36 +474,27 @@ BlockService.prototype.onBlock = function(block, callback) { setImmediate(function() { callback(null, [{ - type: 'put', - key: self._encoding.encodeBlockKey(block.rhash()), - value: self._encoding.encodeBlockValue(block) + type: 'put', + key: self._encoding.encodeBlockKey(block.rhash()), + value: self._encoding.encodeBlockValue(block) }]); }); }; -BlockService.prototype._onBlock = function(block, header) { +BlockService.prototype._onBlock = function(block) { if (this.node.stopping || this._reorging) { return; } + // this service must receive blocks in order + var prevHash = bcoin.util.revHex(block.prevBlock); + if (this._tip.hash !== prevHash) { + return; + } + log.debug('Block Service: new block: ' + block.rhash()); - - // if this block's height is not what we expect, do not process. - if (header.height !== this._tip.height + 1) { - log.debug('Block Service: New block appears to be a newer block than we can handle, skipping.'); - return; - } - - var reorg = this._detectReorg(block); - if (reorg) { - log.debug('Block Service: detected a reorg during onBlock handler'); - this._handleReorg(block.rhash(), this._header.getAllHeaders()); - return; - } - - this._unprocessedBlocks.push(block); - this._processBlock(); + this._processBlock(block); }; @@ -509,7 +503,7 @@ BlockService.prototype._setListeners = function() { var self = this; self._header.once('headers', self._onAllHeaders.bind(self)); self._header.on('reorg', function(hash, headers) { - if (!self._reorging) { + if (!self._reorging && !this._initialSync) { log.debug('Block Service: detected a reorg from the header service.'); self._handleReorg(hash, headers); } @@ -536,6 +530,7 @@ BlockService.prototype._startSync = function() { }; BlockService.prototype._startSubscriptions = function() { + if (this._subscribed) { return; } diff --git a/lib/services/header/index.js b/lib/services/header/index.js index 351831c1..d29d509c 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -11,6 +11,7 @@ var BN = require('bn.js'); var consensus = require('bcoin').consensus; var assert = require('assert'); var constants = require('../../constants'); +var bcoin = require('bcoin'); var HeaderService = function(options) { @@ -170,13 +171,21 @@ HeaderService.prototype.getPublishEvents = function() { HeaderService.prototype._onBlock = function(block) { var hash = block.rhash(); + var header = this._headers.get(hash); - log.debug('Header Service: new block: ' + hash); + if (!header) { + log.debug('Header Service: new block: ' + hash); - var header = block.toHeaders().toJSON(); - header.timestamp = header.ts; - header.prevHash = header.prevBlock; - this._saveHeaders([this._onHeader(header)]); + if (this._detectReorg()) { + this._handleReorg(); + return; + } + + header = block.toHeaders().toJSON(); + header.timestamp = header.ts; + header.prevHash = header.prevBlock; + this._saveHeaders([this._onHeader(header)]); + } for (var i = 0; i < this.subscriptions.block.length; i++) { var prevHeader = this._headers.get(header.prevHash); @@ -325,16 +334,31 @@ HeaderService.prototype._populateNextHashes = function() { }; -HeaderService.prototype._detectReorg = function() { +HeaderService.prototype._detectReorg = function(block) { + // this is a new block coming after we are sync'ed. + // for this not to be a reorg, this block's prev hash should be our tip + // in rare cases, we don't even have this block's parent in our collection + if (block) { + var prevHash = bcoin.util.revHex(block.prevBlock); + if (prevHash === this._tip.hash) { + return false; + } + + var parentHeader = this._headers.get(prevHash); + if (!parentHeader) { + log.warn('Block with hash: ' + block.rhash() + + ' is not in our header set. This could be a block delievered out of order or this block\'s parent: ' + + prevHash + ' has been orphaned before we synced our headers the last time.'); + return false; + } + + return true; + } // is our original tip's height and hash the same after we rewound by the checkpoint amount of blocks // and re-imported? If so, then we've reorg'ed since we've been shut down. var headerHash = this._headers.getIndex(this._originalTip.height).hash; - log.debug('Header Service: attempting to detect a reorg situation after fully syncing headers, original tip height is: ' + - this._originalTip.height + ' original tip hash: ' + this._originalTip.hash + ' headers list size: ' + this._headers.size + - ' hash at original tip height in headers list: ' + headerHash); - assert(headerHash, 'Expected a header to exist at height ' + this._originalTip.height); if (this._originalTip.hash !== headerHash) { @@ -401,7 +425,7 @@ HeaderService.prototype._getPersistedHeaders = function(callback) { var start = self._encoding.encodeHeaderKey(0); var end = self._encoding.encodeHeaderKey(startingHeight + 1); - log.debug('Getting persisted headers from genesis block to block ' + self._tip.height); + log.info('Getting persisted headers from genesis block to block ' + self._tip.height); var criteria = { gte: start,