From a5f9d1a6d04f415972b8b17cd3c2b9f5f8c34f1e Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Sun, 24 Sep 2017 12:34:27 -0400 Subject: [PATCH] Fixed edge case reorg issues. --- lib/services/block/index.js | 652 ++++++++++++++++------------- lib/services/header/index.js | 200 +++++---- lib/services/p2p/index.js | 11 +- lib/services/timestamp/index.js | 10 +- test/services/block/index.unit.js | 102 ++--- test/services/header/index.unit.js | 31 +- 6 files changed, 505 insertions(+), 501 deletions(-) diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 05a97f87..331636a9 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -10,6 +10,7 @@ var utils = require('../../utils'); var assert = require('assert'); var constants = require('../../constants'); var bcoin = require('bcoin'); +var _ = require('lodash'); var BlockService = function(options) { @@ -23,7 +24,6 @@ var BlockService = function(options) { this._subscriptions = {}; this._subscriptions.block = []; - this._subscriptions.reorg = []; this._blockCount = 0; this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; @@ -168,35 +168,62 @@ BlockService.prototype.getRawBlock = function(hash, callback) { }; BlockService.prototype._checkTip = function(callback) { - // check to see if our own tip is no longer in the main chain - // (there was a reorg while we were shut down) + var self = this; self._header.getBlockHeader(self._tip.height, function(err, header) { - if (err || !header) { - return callback(err || new Error('Header at height: ' + self._tip.height + ' was not found.')); + if (err) { + return callback(err); } + header = header || self._header.getLastHeader(); + if (header.hash === self._tip.hash) { + log.info('Block Service: saved tip is good to go.'); return callback(); } - // means the header service no longer tracks our tip on the main chain - // so we should consider ourselves in a reorg situation - self._header.getAllHeaders(function(err, headers) { - - if(err || !headers) { - return callback(err || new Error('All headers not found.')); + self._findCommonAncestor(function(err, commonAncestorHash) { + if(err) { + return callback(err); } - - self._handleReorg(header.hash, headers, callback); - + self._handleReorg(commonAncestorHash, callback); }); }); }; +BlockService.prototype._findCommonAncestor = function(callback) { + + var self = this; + var hash = self._tip.hash; + + self._header.getAllHeaders(function(err, headers) { + + if(err || !headers) { + return callback(err || new Error('headers required.')); + } + + async.until(function() { + return headers.get(hash); + }, function(next) { + self._getBlock(hash, function(err, block) { + if(err) { + return next(err); + } + hash = bcoin.util.revHex(block.prevBlock); + next(); + }); + }, function(err) { + if(err) { + return callback(err); + } + callback(null, hash); + }); + }); +}; + BlockService.prototype.start = function(callback) { var self = this; @@ -216,9 +243,12 @@ BlockService.prototype.start = function(callback) { return callback(err); } + self._blockProcessor = async.queue(self._onBlock.bind(self)); + + self._setListeners(); assert(tip.height >= 0, 'tip is not initialized'); self._setTip(tip); - self._setListeners(); + self._bus = self.node.openBus({remoteAddress: 'localhost-block'}); callback(); }); @@ -230,9 +260,24 @@ BlockService.prototype.stop = function(callback) { }; BlockService.prototype.subscribe = function(name, emitter) { - this._subscriptions[name].push(emitter); log.info(emitter.remoteAddress, 'subscribe:', 'block/' + name, 'total:', this._subscriptions[name].length); +}; + +BlockService.prototype._queueBlock = function(block) { + + var self = this; + + self._blockProcessor.push(block, function(err) { + + if (err) { + return self._handleError(err); + } + + log.debug('Block Service: completed processing block: ' + block.rhash() + + ' prev hash: ' + bcoin.util.revHex(block.prevBlock) + ' height: ' + self._tip.height); + + }); }; @@ -267,48 +312,7 @@ BlockService.prototype._broadcast = function(subscribers, name, entity) { }; BlockService.prototype._detectReorg = function(block) { - var prevHash = bcoin.util.revHex(block.prevBlock); - if (this._tip.hash !== prevHash) { - return true; - } - return false; -}; - -BlockService.prototype._getOldHeaders = function(commonAncestorHeader, allHeaders) { - - var header = allHeaders.get(this._tip.hash); - assert(header, 'Header needed to find old blocks during reorg.'); - var headers = []; - - while (header.height > commonAncestorHeader.height) { - headers.push(header); - header = allHeaders.get(header.prevHash); - } - - return headers; - -}; - -BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback) { - - // the prev hash of the passed in "hash" should also point to a block who has the same prev hash. - - var self = this; - - var reorgHeader = allHeaders.get(hash); - assert(reorgHeader, 'No reorg header found, cannot find common ancestor.'); - - var prevHash = reorgHeader.prevHash; - - self.getBlock(prevHash, function(err, block) { - - if (err) { - return callback(err); - } - - var commonAncestorHeader = allHeaders.get(block.rhash()); - callback(null, commonAncestorHeader); - }); + return bcoin.util.revHex(block.prevBlock) !== this._tip.hash; }; BlockService.prototype._getBlock = function(hash, callback) { @@ -354,53 +358,13 @@ BlockService.prototype._getHash = function(blockArg, callback) { }; -BlockService.prototype._handleReorg = function(reorgHash, allHeaders, callback) { +BlockService.prototype.onReorg = function(args, callback) { var self = this; - var reorgHeader = allHeaders.get(reorgHash); - assert(reorgHeader, 'We were asked to reorg to a non-existent hash.'); + var block = args[1][0]; - self._findCommonAncestor(reorgHash, allHeaders, function(err, commonAncestorHeader) { - - if (err) { - return callback(err); - } - - assert(commonAncestorHeader, 'A common ancestor hash was found, but its header could not he found.'); - - // if we are syncing and we haven't sync'ed to the common ancestor hash, we can safely ignore this reorg - if (self._tip.height < commonAncestorHeader.height) { - return callback(); - } - - var reorgHeaders = self._getOldHeaders(commonAncestorHeader, allHeaders); - - assert(reorgHeaders, 'Expected to have reorg headers to remove'); - - log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash + - ' and height: ' + commonAncestorHeader.height + '. Removing a total of: ' + reorgHeaders.length + ' block(s).'); - - self._processReorg(commonAncestorHeader, reorgHeaders, function(err) { - - if(err) { - return callback(err); - } - - callback(); - - }); - - }); - -}; - -// this JUST rewinds the chain back to the common ancestor block, nothing more -BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, callback) { - - // set the tip to the common ancestor in case something goes wrong with the reorg - var self = this; - self._setTip({ hash: commonAncestorHeader.hash, height: commonAncestorHeader.height }); + self._setTip({ hash: block.rhash(), height: self._tip.height - 1 }); var tipOps = utils.encodeTip(self._tip, self.name); var removalOps = [{ @@ -409,148 +373,249 @@ BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, c value: tipOps.value }]; - // remove all the old blocks that we reorg from - oldBlockList.forEach(function(block) { - removalOps.push({ - type: 'del', - key: self._encoding.encodeBlockKey(block.rhash()), + removalOps.push({ + type: 'del', + key: self._encoding.encodeBlockKey(block.rhash()), + }); + + setImmediate(function() { + callback(null, removalOps); + }); +}; + +BlockService.prototype._onReorg = function(commonAncestorHash, block, callback) { + + var self = this; + var services = self.node.services; + + async.mapSeries(services, function(service, next) { + + if(!service.onReorg) { + return setImmediate(next); + } + + service.onReorg.call(service, [commonAncestorHash, [block]], next); + + }, callback); + +}; + +BlockService.prototype._removeAllSubscriptions = function() { + this._bus.unsubscribe('p2p/block'); + this._bus.removeAllListeners(); + this._subscribedBlock = false; +}; + +BlockService.prototype.onHeaders = function(callback) { + + var self = this; + + async.retry(function(next) { + + next(self._blockProcessor.length() !== 0); + + }, function() { + + self._checkTip(function(err) { + + if(err) { + return callback(err); + } + + self._startSync(); + callback(); + }); }); - self._db.batch(removalOps, callback); - }; -BlockService.prototype._onAllHeaders = function() { - // once the header service has all of its headers, we know we can check our - // own tip for consistency and make sure our it is on the mainchain - var self = this; +BlockService.prototype._startBlockSubscription = function() { - if (!self._initialSync) { - return self._sync(); + if (this._subscribedBlock) { + return; } - self._checkTip(function(err) { + this._subscribedBlock = true; - if(err) { - log.error(err); - return self.node.stop(); - } - - self._startSync(); - - }); + log.info('Block Service: starting p2p block subscription.'); + this._bus.on('p2p/block', this._queueBlock.bind(this)); + this._bus.subscribe('p2p/block'); }; -BlockService.prototype._processReorg = function(commonAncestorHeader, reorgHeaders, callback) { +BlockService.prototype._handleReorg = function(commonAncestorHash, callback) { + var self = this; + + log.warn('Block Service: chain reorganization detected, current height/hash: ' + self._tip.height + '/' + + self._tip.hash + ' common ancestor hash: ' + commonAncestorHash); + var operations = []; - var services = self.node.services; + var tip = self._tip; + var blockCount = 0; - // process one block at a time just in case we have a huge amount of blocks to back out - async.eachSeries(reorgHeaders, function(header, next) { + // we don't know how many blocks we need to remove until we've reached the common ancestor + async.whilst( + function() { + return tip.hash !== commonAncestorHash; + }, - async.waterfall([ - function(next) { - self.getBlock(header.hash, next); - }, - function(block, next) { - if (!block) { - return next(new Error('block not found for reorg.')); - } - self._timestamp.getTimestamp(header.hash, function(err, timestamp) { - if (err || !timestamp) { - return next(err || new Error('timestamp missing from reorg.')); - } - block.__height = header.height; - block.__ts = timestamp; + function(next) { + async.waterfall([ + + self._getReorgBlock.bind(self, tip), + + function(block, next) { + tip = { + hash: bcoin.util.revHex(block.prevBlock), + height: tip.height - 1 + }; next(null, block); - }); - }, - function(block, next) { - async.eachSeries(services, function(mod, next) { - if(mod.onReorg) { - mod.onReorg.call(mod, [commonAncestorHeader, [block]], function(err, ops) { - if (err) { - return next(err); - } - if (ops) { - operations = operations.concat(ops); - } - self._onReorg(commonAncestorHeader, [block], next); - }); - } else { - setImmediate(next); - } - }, next); - } - ], next); - }, function(err) { + }, - if (err) { - return callback(err); - } + function(block, next) { + self._onReorg(commonAncestorHash, block, next); + } - self._db.batch(operations, callback); - - }); - -}; - -BlockService.prototype._processBlock = function(block) { - - var self = this; - var operations = []; - var services = self.node.services; - - async.eachSeries( - services, - function(mod, next) { - if(mod.onBlock) { - mod.onBlock.call(mod, block, function(err, ops) { - if (err) { - return next(err); - } - if (ops) { - operations = operations.concat(ops); - } - next(); - }); - } else { - setImmediate(next); - } + ], function(err, ops) { + if(err) { + return next(err); + } + blockCount++; + operations = operations.concat(ops); + next(); + }); }, function(err) { if (err) { - return self._handleError(err); + return callback(err); } - self._db.batch(operations, function(err) { + log.info('Block Service: removed ' + blockCount + ' block(s) during the reorganization event.'); + self._db.batch(_.compact(_.flattenDeep(operations)), callback); - if (err) { - return self._handleError(err); - } + }); +}; - self._tip.height = self._tip.height + 1; - self._tip.hash = block.rhash(); - var tipOps = utils.encodeTip(self._tip, self.name); +BlockService.prototype._getReorgBlock = function(tip, callback) { - self._db.put(tipOps.key, tipOps.value, function(err) { + var self = this; - if (err) { - return self._handleError(err); - } + self._getBlock(tip.hash, function(err, block) { - self._syncing = false; - self._reorging = false; - self._sync(); - }); - }); + if (err || !block) { + return callback(err || new Error('block not found for reorg.')); } - ); + + self._timestamp.getTimestamp(tip.hash, function(err, timestamp) { + + if (err || !timestamp) { + return callback(err || new Error('timestamp missing from reorg.')); + } + + block.__height = tip.height; + block.__ts = timestamp; + callback(null, block); + }); + + }); + +}; + +BlockService.prototype._onBlock = function(block, callback) { + + var self = this; + self._getBlock(block.rhash(), function(err, _block) { + + if(err) { + return self._handleError(err); + } + + if (_block) { + log.debug('Block Service: not syncing, block already in database.'); + return setImmediate(callback); + } + + self._processBlock(block, callback); + + }); +}; + +BlockService.prototype._processBlock = function(block, callback) { + + var self = this; + + if (self.node.stopping) { + return callback(); + } + + log.debug('Block Service: new block: ' + block.rhash()); + + // common case + if (!self._detectReorg(block)) { + return setImmediate(function() { + self._saveBlock(block, callback); + }); + } + + // reorg + self._handleReorg(bcoin.util.revHex(block.prevBlock), function(err) { + if(err) { + return callback(err); + } + self._saveBlock(block, callback); + }); + +}; + +BlockService.prototype._saveBlock = function(block, callback) { + + var self = this; + block.__height = self._tip.height + 1; + + var services = self.node.services; + + async.mapSeries(services, function(service, next) { + + if(!service.onBlock) { + return setImmediate(next); + } + + service.onBlock.call(service, block, next); + + }, function(err, ops) { + if (err) { + return callback(err); + } + + self._db.batch(_.compact(_.flattenDeep(ops)), function(err) { + if (err) { + return callback(err); + } + + self._setTip({ hash: block.rhash(), height: self._tip.height + 1 }); + var tipOps = utils.encodeTip(self._tip, self.name); + + self._db.put(tipOps.key, tipOps.value, function(err) { + if(err) { + return callback(err); + } + callback(); + }); + }); + }); +}; + +BlockService.prototype._onBestHeight = function(height) { + log.info('Block Service: Best Height is: ' + height); + this._removeAllSubscriptions(); +}; + +BlockService.prototype._setListeners = function() { + this._p2p.on('bestHeight', this._onBestHeight.bind(this)); }; BlockService.prototype._handleError = function(err) { @@ -560,6 +625,20 @@ BlockService.prototype._handleError = function(err) { } }; +BlockService.prototype._syncBlock = function(block) { + var self = this; + + self._saveBlock(block, function(err) { + if(err) { + return self._handleError(err); + } + if (self._tip.height < self._header.getLastHeader().height) { + return self.emit('next block'); + } + self.emit('synced'); + }); +}; + BlockService.prototype.onBlock = function(block, callback) { var self = this; @@ -572,117 +651,88 @@ BlockService.prototype.onBlock = function(block, callback) { }); }; -BlockService.prototype.newBlock = function(block) { - - if (this.node.stopping || this._tip.hash === block.rhash()) { - return; - } - - // this service must receive blocks in order - var prevHash = bcoin.util.revHex(block.prevBlock); - - if (this._tip.hash !== prevHash) { - return log.warn('received a block that was not asked for and not linked directly to our tip.'); - } - - log.debug('Block Service: new block: ' + block.rhash()); - block.__height = this._tip.height + 1; - this._processBlock(block); - -}; - -BlockService.prototype._setListeners = function() { - this._header.on('headers', this._onAllHeaders.bind(this)); -}; - -BlockService.prototype.newReorg = function(block, headers, callback) { - var self = this; - self._reorging = true; - log.info('Block Service: detected a reorg from the header service.'); - self._handleReorg(block.rhash(), headers, function(err) { - if (err) { - return callback(err); - } - self.newBlock(block); - - async.retry({ interval: 100, times: 1000 }, function(next) { - next(self._reorging); - }, function(err) { - if (err) { - return callback(err); - } - callback(); - }); - - }); -}; - BlockService.prototype._setTip = function(tip) { log.debug('Block Service: Setting tip to height: ' + tip.height); log.debug('Block Service: Setting tip to hash: ' + tip.hash); this._tip = tip; }; +BlockService.prototype._onSynced = function() { + var self = this; + self._logProgress(); + self._initialSync = false; + self._startBlockSubscription(); + log.info('Block Service: The best block hash is: ' + self._tip.hash + + ' at height: ' + self._tip.height); +}; + BlockService.prototype._startSync = function() { - this._numNeeded = this._header.getLastHeader().height - this._tip.height; + var numNeeded = Math.max(this._header.getLastHeader().height - this._tip.height, 0); - log.info('Block Service: Gathering: ' + this._numNeeded + ' block(s) from the peer-to-peer network.'); + log.info('Block Service: Gathering: ' + numNeeded + ' block(s) from the peer-to-peer network.'); + + if (numNeeded > 0) { + this.on('next block', this._sync.bind(this)); + this.on('synced', this._onSynced.bind(this)); + return this._sync(); + } + + this._onSynced(); - this._sync(); }; BlockService.prototype._sync = function() { var self = this; - if (self.node.stopping || self._syncing) { + if (self.node.stopping) { return; } - self._syncing = true; - - var lastHeaderIndex = self._header.getLastHeader().height; - - if (self._tip.height < lastHeaderIndex) { - - if (self._tip.height % 144 === 0) { - log.info('Block Service: Blocks download progress: ' + - self._tip.height + '/' + lastHeaderIndex + - ' (' + self._syncPercentage() + '%)'); - } - - return self._header.getNextHash(self._tip, function(err, targetHash, nextHash) { - - if(err) { - log.error(err); - self.node.stop(); - return; - } - - self._header.lastBlockQueried = targetHash; - - // to ensure that we can receive blocks that were previously delivered - // this will lead to duplicate transactions being sent - self._p2p.clearInventoryCache(); - -console.log('target hash: ' + targetHash); - self._p2p.getP2PBlock({ - filter: { - startHash: self._tip.hash, - endHash: nextHash - }, - blockHash: targetHash - }, self.newBlock.bind(self)); - - }); - + if (self._tip.height % 144 === 0) { + self._logProgress(); } - this._syncing = false; - this._initialSync = false; - log.info('Block Service: The best block hash is: ' + self._tip.hash + - ' at height: ' + self._tip.height); + self._header.getNextHash(self._tip, function(err, targetHash, nextHash) { + + if(err) { + return self._handleError(err); + } + + // to ensure that we can receive blocks that were previously delivered + // this will lead to duplicate transactions being sent + self._p2p.clearInventoryCache(); + + self._p2p.getP2PBlock({ + filter: { + startHash: self._tip.hash, + endHash: nextHash + }, + blockHash: targetHash + }, self._syncBlock.bind(self)); + + }); + +}; + +BlockService.prototype._logProgress = function() { + + if (!this._initialSync) { + return; + } + + var progress; + var bestHeight = Math.max(this._header.getBestHeight(), this._tip.height); + + if (bestHeight === 0) { + progress = 0; + } else { + progress = (this._tip.height/bestHeight*100.00).toFixed(2); + } + + log.info('Block Service: download progress: ' + this._tip.height + '/' + + bestHeight + ' (' + progress + '%)'); }; diff --git a/lib/services/header/index.js b/lib/services/header/index.js index c7377fca..de86d4a2 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -27,7 +27,6 @@ var HeaderService = function(options) { this._checkpoint = options.checkpoint || 2000; // set to -1 to resync all headers. this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; this._lastHeader = null; - this.lastBlockQueried = null; this._initialSync = true; }; @@ -209,8 +208,6 @@ HeaderService.prototype.start = function(callback) { self._setListeners(); self._bus = self.node.openBus({remoteAddress: 'localhost-header'}); - self._startHeaderSubscription(); - callback(); }); @@ -230,6 +227,11 @@ HeaderService.prototype.stop = function(callback) { HeaderService.prototype._startHeaderSubscription = function() { + if (this._subscribedHeaders) { + return; + } + this._subscribedHeaders = true; + log.info('Header Service: subscribed to p2p headers.'); this._bus.on('p2p/headers', this._onHeaders.bind(this)); this._bus.subscribe('p2p/headers'); @@ -251,15 +253,11 @@ HeaderService.prototype.getPublishEvents = function() { HeaderService.prototype._queueBlock = function(block) { var self = this; - if (block.rhash() === self.lastBlockQueried) { - return; - } self._blockProcessor.push(block, function(err) { if (err) { - log.error(err); - return self.node.stop(); + return self._handleError(err); } log.debug('Header Service: completed processing block: ' + block.rhash() + ' prev hash: ' + bcoin.util.revHex(block.prevBlock)); @@ -272,21 +270,21 @@ HeaderService.prototype._processBlocks = function(block, callback) { var self = this; - assert(block.rhash() !== self._lastHeader.hash, 'Trying to save a header that has already been saved.'); + if (self.node.stopping) { + return callback(); + } - self._persistHeader(block, function(err) { - - if (err) { - return callback(err); + self.getBlockHeader(block.rhash(), function(err, header) { + if(err) { + return self._handleError(err); } - async.eachSeries(self.node.services, function(mod, next) { - if (!mod.newBlock) { - return setImmediate(next); - } - mod.newBlock.call(mod, block, next); - }, callback); + if (header) { + log.debug('Header Service: block already exists in data set.'); + return callback(); + } + self._persistHeader(block, callback); }); }; @@ -302,9 +300,7 @@ HeaderService.prototype._persistHeader = function(block, callback) { } if (!commonHeader) { - return self._syncBlock(block, callback); - } self._handleReorg(block, commonHeader, function(err) { @@ -314,7 +310,6 @@ HeaderService.prototype._persistHeader = function(block, callback) { } self._syncBlock(block, callback); - }); }); @@ -338,16 +333,7 @@ HeaderService.prototype._syncBlock = function(block, callback) { log.debug('Header Service: new block: ' + block.rhash()); - self._saveHeaders(self._onHeader(header), function(err) { - - if (err) { - return callback(err); - } - - self._onHeadersSave(); - callback(); - }); - + self._saveHeaders(self._onHeader(header), callback); }; HeaderService.prototype._broadcast = function(block) { @@ -418,17 +404,21 @@ HeaderService.prototype._onHeaders = function(headers) { self._saveHeaders(dbOps, function(err) { if (err) { - log.error(err); - return self.node.stop(); + return self._handleError(err); } - self._onHeadersSave(); }); }; +HeaderService.prototype._handleError = function(err) { + log.error(err); + this.node.stop(); +}; + HeaderService.prototype._saveHeaders = function(dbOps, callback) { - var tipOps = utils.encodeTip(this._tip, this.name); + var self = this; + var tipOps = utils.encodeTip(self._tip, self.name); dbOps.push({ type: 'put', @@ -436,37 +426,52 @@ HeaderService.prototype._saveHeaders = function(dbOps, callback) { value: tipOps.value }); - this._db.batch(dbOps, callback); + self._db.batch(dbOps, function(err) { + if(err) { + return callback(err); + } + self._onHeadersSave(callback); + }); }; -HeaderService.prototype._onHeadersSave = function(err) { - +HeaderService.prototype._onHeadersSave = function(callback) { var self = this; - if (err) { - log.error(err); - self.node.stop(); - return; - } - self._logProgress(); if (!self._syncComplete()) { - self._sync(); - return; - + return callback(); } + self._endHeaderSubscription(); self._startBlockSubscription(); self._setBestHeader(); + if (!self._initialSync) { + return callback(); + } + + log.info('Header Service: sync complete.'); self._initialSync = false; - self.emit('headers'); + async.eachSeries(self.node.services, function(service, next) { + if (service.onHeaders) { + return service.onHeaders.call(service, next); + } + setImmediate(next); + }, callback); }; +HeaderService.prototype._endHeaderSubscription = function() { + if (this._subscribedHeaders) { + this._subscribedHeaders = false; + log.info('Header Service: p2p header subscription no longer needed, unsubscribing.'); + this._bus.unsubscribe('p2p/headers'); + } +}; + HeaderService.prototype._startBlockSubscription = function() { if (this._subscribedBlock) { @@ -475,6 +480,7 @@ HeaderService.prototype._startBlockSubscription = function() { this._subscribedBlock = true; + log.info('Header Service: starting p2p block subscription.'); this._bus.on('p2p/block', this._queueBlock.bind(this)); this._bus.subscribe('p2p/block'); @@ -501,7 +507,6 @@ HeaderService.prototype._getHeader = function(height, hash, callback) { return callback(new Error('invalid arguments')); } - var key; if (hash) { key = self._encoding.encodeHeaderHashKey(hash); @@ -537,20 +542,7 @@ HeaderService.prototype._detectReorg = function(block, callback) { return callback(null, false); } - // first we check if the new block's prev hash is already in our data set - // if it is and this block isn't already in our data set too, then we have a reorg - async.waterfall([ - function(next) { - self.getBlockHeader(block.rhash(), next); - }, - function(header, next) { - if (header) { - return callback(null, false); - } - self.getBlockHeader(prevHash, next); - } - ], function(err, header) { - + self.getBlockHeader(prevHash, function(err, header) { if (err) { return callback(err); } @@ -588,20 +580,7 @@ HeaderService.prototype._handleReorg = function(block, commonHeader, callback) { headers.set(hash, reorgHeader); // appends to the end // this will ensure our own headers collection is correct - self._onReorg(reorgHeader, headers, commonHeader, function(err) { - - if (err) { - return callback(err); - } - - async.eachSeries(self.node.services, function(mod, next) { - if (!mod.newReorg) { - return setImmediate(next); - } - mod.newReorg.call(mod, block, headers, next); - }, callback); - }); - + self._onReorg(reorgHeader, headers, commonHeader, callback); }); }; @@ -611,7 +590,9 @@ HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader, var ops = []; var startingHeight = this._tip.height; var hash = this._tip.hash; + var headerCount = 0; while(hash !== commonHeader.hash) { + headerCount++; var header = headers.getIndex(startingHeight--); assert(header, 'Expected to have a header at this height, but did not. Reorg failed.'); hash = header.prevHash; @@ -629,17 +610,16 @@ HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader, this._tip.height = commonHeader.height; this._lastHeader = commonHeader; + log.info('Header Service: removed ' + headerCount + ' header(s) during the reorganization event.'); this._db.batch(ops, callback); }; HeaderService.prototype._setListeners = function() { - this._p2p.on('bestHeight', this._onBestHeight.bind(this)); - }; HeaderService.prototype._onBestHeight = function(height) { - log.debug('Header Service: Best Height is: ' + height); + log.info('Header Service: Best Height is: ' + height); this._bestHeight = height; this._startSync(); }; @@ -656,25 +636,37 @@ HeaderService.prototype._startSync = function() { // ensure the blockProcessor is finished processing blocks (empty queue) // then proceed with gathering new set(s) of headers - self._bus.unsubscribe('p2p/block'); + self._initialSync = true; + log.debug('Header Service: starting sync routines, ensuring no pre-exiting subscriptions to p2p blocks.'); + self._removeAllSubscriptions(); async.retry(function(next) { - next(self._blockProcessor.length !== 0); + next(self._blockProcessor.length() !== 0); }, function() { - self._numNeeded = self._bestHeight - self._tip.height; + var numNeeded = self._bestHeight - self._tip.height; // common case - if (self._numNeeded > 0) { - log.info('Header Service: Gathering: ' + self._numNeeded + ' ' + 'header(s) from the peer-to-peer network.'); - self._sync(); - } else if (self._numNeeded < 0) { - // this should be very uncommon - self._handleLowTipHeight(); + if (numNeeded > 0) { + log.info('Header Service: Gathering: ' + numNeeded + ' ' + 'header(s) from the peer-to-peer network.'); + return self._sync(); } + // next most common case + if (numNeeded === 0) { + log.info('Header Service: we seem to be already synced with the peer.'); + return self._onHeadersSave(function(err) { + if(err) { + return self._handleError(err); + } + }); + } + + // this should be very uncommon + self._handleLowTipHeight(); + }); }; @@ -682,6 +674,8 @@ HeaderService.prototype._startSync = function() { HeaderService.prototype._removeAllSubscriptions = function() { this._bus.unsubscribe('p2p/headers'); this._bus.unsubscribe('p2p/block'); + this._subscribedBlock = false; + this._subscribedHeaders = false; this._bus.removeAllListeners(); }; @@ -698,6 +692,8 @@ HeaderService.prototype._findReorgConditionInNewPeer = function(callback) { return callback(err); } + log.warn('Header Service: re-subscribing to p2p headers to gather new peer\'s headers.'); + self._subscribedHeaders = true; self._bus.subscribe('p2p/headers'); self._bus.on('p2p/headers', function(headers) { @@ -721,8 +717,6 @@ HeaderService.prototype._findReorgConditionInNewPeer = function(callback) { if (oldHeader) { - self._removeAllSubscriptions(); - // we found a common header, but no headers that at a greater height, this peer is not synced if (!reorgInfo.blockHash) { return callback(); @@ -755,19 +749,22 @@ HeaderService.prototype._handleLowTipHeight = function() { self._tip.height + '). This means that this peer is not fully synchronized with the network -or- the peer has reorganized itself.' + ' Checking the new peer\'s headers for a reorganization event.'); - self._removeAllSubscriptions(); - self._findReorgConditionInNewPeer(function(err, reorgInfo) { if (err) { - log.error(err); - return self.node.stop(); + return self._handleError(err); } // Our peer is not yet sync'ed. // We will just turn on our block subscription and wait until we get a block we haven't seen if (!reorgInfo) { - self._onHeadersSave(); + log.info('Header Service: it appears that our peer is not yet synchronized with the network ' + + '(we have a strict superset of the peer\'s blocks). We will wait for more blocks to arrive...'); + return self._onHeadersSave(function(err) { + if (err) { + return self._handleError(err); + } + }); } // our peer has reorg'ed to lower overall height. @@ -783,19 +780,15 @@ HeaderService.prototype._handleLowTipHeight = function() { self._handleReorg(block, reorgInfo.commonHeader, function(err) { if(err) { - log.error(err); - return self.node.stop(); + self._handleError(err); } self._syncBlock(block, function(err) { if (err) { - log.error(err); - return self.node.stop(); + self._handleError(err); } - self._startHeaderSubscription(); - }); }); @@ -844,6 +837,7 @@ HeaderService.prototype._getP2PHeaders = function(hash) { HeaderService.prototype._sync = function() { + this._startHeaderSubscription(); this._getP2PHeaders(this._tip.hash); }; diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index 0248f19b..f780e4bf 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -51,8 +51,6 @@ P2P.prototype.getNumberOfPeers = function() { P2P.prototype.getP2PBlock = function(opts, callback) { // opts is { filter: {}, blockHash: block hash we want } - this._currentRequest = { opts: opts, callback: callback }; - var peer = this._getPeer(); var blockFilter = this._setResourceFilter(opts.filter, 'blocks'); @@ -251,7 +249,7 @@ P2P.prototype._initPool = function() { opts.listenAddr = false; opts.maxPeers = this._maxPeers; opts.network = this.node.network; - p2p.Pool.RetrySeconds = 5; + p2p.Pool.RetrySeconds = 3; this._pool = new p2p.Pool(opts); }; @@ -341,13 +339,6 @@ P2P.prototype._onPeerReady = function(peer, addr) { this._addPeer(peer); var bestHeight = this._getBestHeight(); - // if we have a current request, then we will restart that - if (this._currentRequest) { - log.info('Restarting last query'); - this.clearInventoryCache(); - this.getP2PBlock(this._currentRequest.opts, this._currentRequest.callback); - } - if (bestHeight >= 0) { this.emit('bestHeight', bestHeight); } diff --git a/lib/services/timestamp/index.js b/lib/services/timestamp/index.js index a4ab09f7..48417e8e 100644 --- a/lib/services/timestamp/index.js +++ b/lib/services/timestamp/index.js @@ -121,7 +121,7 @@ TimestampService.prototype.onBlock = function(block, callback) { TimestampService.prototype.onReorg = function(args, callback) { var self = this; - var commonAncestorHeader = args[0]; + var commonAncestorHash = args[0]; var oldBlockList = args[1]; var removalOps = []; @@ -141,7 +141,7 @@ TimestampService.prototype.onReorg = function(args, callback) { }); // look up the adjusted timestamp from our own database and set the lastTimestamp to it - self.getTimestamp(commonAncestorHeader.hash, function(err, timestamp) { + self.getTimestamp(commonAncestorHash, function(err, timestamp) { if (err) { return callback(err); @@ -158,13 +158,19 @@ TimestampService.prototype.getTimestampSync = function(hash) { }; TimestampService.prototype.getTimestamp = function(hash, callback) { + var self = this; + self._db.get(self._encoding.encodeBlockTimestampKey(hash), function(err, data) { if (err) { return callback(err); } + if (!data) { + return callback(); + } callback(null, self._encoding.decodeBlockTimestampValue(data)); }); + }; TimestampService.prototype.getHash = function(timestamp, callback) { diff --git a/test/services/block/index.unit.js b/test/services/block/index.unit.js index 9d5b9d42..800b9d9a 100644 --- a/test/services/block/index.unit.js +++ b/test/services/block/index.unit.js @@ -6,6 +6,7 @@ var sinon = require('sinon'); var bcoin = require('bcoin'); var Block = bcoin.block; var Encoding = require('../../../lib/services/block/encoding'); +var utils = require('../../../lib/utils'); describe('Block Service', function() { @@ -43,53 +44,24 @@ describe('Block Service', function() { }); }); - describe('#_getOldBlocks', function() { - - it('should get the old block list (blocks to be removed from indexes)', function(done) { - - blockService._tip = { height: 10, hash: 'cc' }; - - var header = { height: 8, hash: 'aa' }; - var getStub = sandbox.stub(); - - getStub.onCall(0).returns({ hash: 'bb', height: 9 }); - getStub.onCall(1).returns({ hash: 'aa', height: 8 }); - - var allHeaders = { get: getStub }; - sandbox.stub(blockService, 'getBlock').callsArgWith(1, null, {}); - - blockService._timestamp = { getTimestamp: sinon.stub().callsArgWith(1, null, 1234) }; - - blockService._getOldBlocks(header, allHeaders, function(err, blocks) { - if (err) { - return done(err); - } - expect(blocks.length).to.equal(1); - expect(blocks[0].__height).to.equal(9); - expect(blocks[0].__ts).to.equal(1234); - done(); - }); - - }); - }); - describe('#_findCommonAncestor', function() { it('should find the common ancestor between the current chain and the new chain', function(done) { - sandbox.stub(blockService, 'getBlock').callsArgWith(1, null, block1); - sandbox.stub(blockService, '_getOldBlocks').callsArgWith(2, null, [block2]); + sandbox.stub(blockService, '_getBlock').callsArgWith(1, null, block2); + blockService._tip = { hash: 'aa' }; + var headers = new utils.SimpleMap(); + headers.set('aa', 'someheader'); - var allHeaders = { get: sandbox.stub().returns({ hash: 'somecommonancestor', prevHash: block1.rhash() }), size: 1e6 }; + blockService._header = { getAllHeaders: sandbox.stub().callsArgWith(0, null, headers) }; - blockService._findCommonAncestor('aa', allHeaders, function(err, commonAncestorHashActual, oldBlockList) { + blockService._findCommonAncestor(function(err, commonAncestorHashActual) { if(err) { return done(err); } - expect(commonAncestorHashActual).to.equal('somecommonancestor'); - expect(oldBlockList).to.deep.equal([block2]); + expect(commonAncestorHashActual).to.equal('aa'); done(); }); @@ -127,31 +99,40 @@ describe('Block Service', function() { describe('#_onBlock', function() { - it('should process blocks', function() { - var processBlock = sandbox.stub(blockService, '_processBlock'); - blockService._tip = { hash: block1.rhash(), height: 1 }; - blockService._header = { blockServiceSyncing: false }; - blockService._onBlock(block2); - expect(processBlock.calledOnce).to.be.true; + it('should process blocks', function(done) { + var getBlock = sandbox.stub(blockService, '_getBlock').callsArgWith(1, null, null); + var processBlock = sandbox.stub(blockService, '_processBlock').callsArgWith(1, null); + blockService._onBlock(block2, function(err) { + if(err) { + return done(err); + } + expect(processBlock.calledOnce).to.be.true; + expect(getBlock.calledOnce).to.be.true; + done(); + }); }); - it('should not process blocks', function() { + it('should not process blocks', function(done) { + var getBlock = sandbox.stub(blockService, '_getBlock').callsArgWith(1, null, block2); var processBlock = sandbox.stub(blockService, '_processBlock'); - blockService._tip = { hash: block2.rhash(), height: 1 }; - blockService._header = { blockServiceSyncing: false }; - blockService._onBlock(block1); - expect(processBlock.calledOnce).to.be.false; + blockService._onBlock(block2, function(err) { + if(err) { + return done(err); + } + expect(getBlock.calledOnce).to.be.true; + expect(processBlock.called).to.be.false; + done(); + }); }); - }); describe('#_setListeners', function() { - it('should set listeners for headers, reorg', function() { + it('should set listeners for best height', function() { var on = sandbox.stub(); - blockService._header = { on: on }; + blockService._p2p = { on: on }; blockService._setListeners(); - expect(on.calledTwice).to.be.true; + expect(on.calledOnce).to.be.true; }); }); @@ -167,24 +148,10 @@ describe('Block Service', function() { }); - describe('#_startSubscriptions', function() { - it('should start the subscriptions if not already subscribed', function() { - var on = sinon.stub(); - var subscribe = sinon.stub(); - var openBus = sinon.stub().returns({ on: on, subscribe: subscribe }); - blockService.node = { openBus: openBus }; - blockService._startSubscriptions(); - expect(blockService._subscribed).to.be.true; - expect(openBus.calledOnce).to.be.true; - expect(on.calledOnce).to.be.true; - expect(subscribe.calledOnce).to.be.true; - }); - }); - describe('#_startSync', function() { - it('should start the sync of blocks if type set', function() { - blockService._header = { getLastHeader: sinon.stub.returns({ height: 100 }) }; + it('should start the sync of blocks', function() { + blockService._header = { getLastHeader: sinon.stub().returns({ height: 100 }) }; blockService._tip = { height: 98 }; var sync = sandbox.stub(blockService, '_sync'); blockService._startSync(); @@ -200,6 +167,7 @@ describe('Block Service', function() { var getServiceTip = sandbox.stub().callsArgWith(1, null, { height: 1, hash: 'aa' }); var setListeners = sandbox.stub(blockService, '_setListeners'); var setTip = sandbox.stub(blockService, '_setTip'); + blockService.node = { openBus: sandbox.stub() }; blockService._db = { getPrefix: getPrefix, getServiceTip: getServiceTip }; blockService.start(function() { expect(blockService._encoding).to.be.an.instanceof(Encoding); diff --git a/test/services/header/index.unit.js b/test/services/header/index.unit.js index ae2466d5..fd13ac47 100644 --- a/test/services/header/index.unit.js +++ b/test/services/header/index.unit.js @@ -51,7 +51,6 @@ describe('Header Service', function() { headerService._db = { getPrefix: getPrefix, getServiceTip: getServiceTip, batch: sinon.stub() }; headerService.start(function() { - expect(_startHeaderSubscription.calledOnce).to.be.true; expect(setGenesisBlock.calledOnce).to.be.true; expect(getLastHeader.calledOnce).to.be.false; expect(setListeners.calledOnce).to.be.true; @@ -101,31 +100,27 @@ describe('Header Service', function() { describe('#_startSync', function() { it('should start the sync process', function() { - headerService._bestHeight = 123; - var unsub = sinon.stub(); - headerService._tip = { height: 120 }; - headerService._bus = { unsubscribe: unsub }; - headerService._blockProcessor = { length: 0 }; - var getHeaders = sandbox.stub(); - headerService._p2p = { getHeaders: getHeaders }; - headerService._lastHeader = { height: 124 }; + var removeAllSubs = sandbox.stub(headerService, '_removeAllSubscriptions'); + headerService._blockProcessor = { length: sinon.stub().returns(0) }; + headerService._bestHeight = 100; + headerService._tip = { height: 98 }; + var sync = sandbox.stub(headerService, '_sync'); headerService._startSync(); - expect(unsub.calledOnce).to.be.true; - expect(getHeaders.calledOnce).to.be.true; + expect(removeAllSubs.calledOnce).to.be.true; + expect(sync.calledOnce).to.be.true; }); }); describe('#_sync', function() { - it('should sync header', function() { - headerService._numNeeded = 1000; - headerService._tip = { height: 121, hash: 'a' }; - var getHeaders = sandbox.stub(); - headerService._p2p = { getHeaders: getHeaders }; - headerService._lastHeader = { height: 124 }; + it('should sync headers', function() { + var startHeaderSub = sandbox.stub(headerService, '_startHeaderSubscription'); + var getP2PHeaders = sandbox.stub(headerService, '_getP2PHeaders'); + headerService._tip = { hash: 'aa' }; headerService._sync(); - expect(getHeaders.calledOnce).to.be.true; + expect(getP2PHeaders.calledOnce).to.be.true; + expect(startHeaderSub.calledOnce).to.be.true; }); });