diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 87f74f9d..945b8b8f 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -22,22 +22,21 @@ var BlockService = function(options) { this._p2p = this.node.services.p2p; this._header = this.node.services.header; this._timestamp = this.node.services.timestamp; + this._mempool = this.node.services.mempool; this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; this._initialSync = false; this._serviceIniting = false; this._blocksInQueue = 0; - this._recentBlockHashesCount = options.recentBlockHashesCount || 50; // if you expect this chain to reorg deeper than 50, set this + this._recentBlockHashesCount = options.recentBlockHashesCount || 1000; // block service won't reorg past this point this._recentBlockHashes = new LRU(this._recentBlockHashesCount); this._readAheadBlockCount = options.readAheadBlockCount || 2; // this is the number of blocks to direct the p2p service to read aheead - this._mempool = this.node.services.mempool; }; inherits(BlockService, BaseService); BlockService.dependencies = [ 'timestamp', 'p2p', 'db', 'header', 'mempool' ]; -// --- public prototype functions BlockService.prototype.getAPIMethods = function() { var methods = [ ['getInfo', this, this.getInfo, 0], @@ -178,68 +177,11 @@ BlockService.prototype._checkTip = function(callback) { return callback(); } - self._findCommonAncestorAndBlockHashesToRemove(function(err, commonAncestorHeader, hashesToRemove) { - - if (err) { - return callback(err); - } - - self._handleReorg(commonAncestorHeader, hashesToRemove, callback); - }); + self._handleReorg(callback); }); }; -BlockService.prototype._findCommonAncestorAndBlockHashesToRemove = function(callback) { - - var self = this; - - var hashes = [{ - hash: self._tip.hash, - height: self._tip.height - }]; - - var header; - var iterCount = 0; - - async.until(function() { - - return header || iterCount++ >= self._recentBlockHashesCount; - - }, function(next) { - - var hash = self._recentBlockHashes.get(hash); - - hashes.push({ - tip: hash, - height: hashes[hashes.length - 1].height - 1 - }); - - self._header.getBlockHeader(hash, function(err, _header) { - - if (err) { - return next(err); - } - - header = _header; - next(); - }); - - }, function(err) { - - if (err) { - return callback(err); - } - - // ensure the common ancestor hash is not in the blocks to remove hashes - hashes.pop(); - assert(hashes.length >= 1, 'Block Service: we expected to remove at least one block, but we did not have at least one block.'); - callback(null, header, hashes); - - }); - -}; - BlockService.prototype._resetTip = function(callback) { var self = this; @@ -373,10 +315,13 @@ BlockService.prototype.start = function(callback) { }); self._setTip(tip, function(err) { + if (err) { return callback(err); } + self._loadRecentBlockHashes(callback); + }); }); @@ -388,13 +333,15 @@ BlockService.prototype._loadRecentBlockHashes = function(callback) { var self = this; var hash = self._tip.hash; + var times = Math.min(self._tip.height, self._recentBlockHashesCount); - async.times(Math.min(self._tip.height, self._recentBlockHashesCount), function(n, next) { + async.timesSeries(times, function(n, next) { self.getBlock(hash, function(err, block) { - if (err) { - return callback(err); + if (err || !block) { + return callback(err || new Error('Block Service: attempted to retrieve block: ' + hash + + ' but was not in the index.')); } var prevHash = bcoin.util.revHex(block.prevBlock); @@ -410,7 +357,8 @@ BlockService.prototype._loadRecentBlockHashes = function(callback) { return callback(err); } - log.info('Block Service: loaded: ' + self._recentBlockHashesCount + ' hashes from the index.'); + assert(self._recentBlockHashes.length === times, 'Block Service: did not load enough recent block hashes from the index.'); + log.info('Block Service: loaded: ' + self._recentBlockHashes.length + ' hashes from the index.'); callback(); }); @@ -472,8 +420,6 @@ BlockService.prototype.syncPercentage = function(callback) { callback(null, this._syncPercentage()); }; -// --- start private prototype functions - BlockService.prototype._detectReorg = function(block) { return bcoin.util.revHex(block.prevBlock) !== this._tip.hash; }; @@ -656,57 +602,202 @@ BlockService.prototype._saveTip = function(tip, callback) { this._db.put(tipOps.key, tipOps.value, callback); }; -BlockService.prototype._handleReorg = function(commonAncestorHeader, hashesToRemove, callback) { +// the header service has the authoritative list of block headers. +// we know we have a tip that is not correct with respect to this. +// so we'll use our recent block hashes cache to find the hash that matches +// into the header service list. +BlockService.prototype._findLatestValidBlockHeader = function(callback) { + + var self = this; + + var blockServiceHash = self._tip.hash; + var blockServiceHeight = self._tip.height; + var iterCount = 0; + var header; + + async.until(function() { + + return iterCount++ >= self._recentBlockHashes.length || header; + + }, function(next) { + + self._header.getBlockHeader(blockServiceHash, function(err, _header) { + + if (err) { + return next(err); + } + + if (!_header) { + return next(); + } + + if (_header.hash === blockServiceHash && _header.height === blockServiceHeight--) { + header = _header; + } + + blockServiceHash = self._recentBlockHashes.get(blockServiceHash); + next(); + + }); + }, function(err) { + + if (err) { + return callback(err); + } + + // the header could be undefined + // this means that the header service has no record of + // any of our recent block hashes in its indexes. + // this means we've reorged deeper than recentBlockHashesCount number + // of blocks or the header service connected to the wrong chain. + // either way, we can't continue + assert(header, 'Block Service: we could not locate any of our recent block hashes in the header service ' + + 'index. Perhaps our header service sync\'ed to the wrong chain?'); + + assert(header.height <= self._tip.height, 'Block Service: we found a common ancestor header whose ' + + 'height was greater than our current tip. This should be impossible.'); + + callback(null, header); + + }); +}; + +BlockService.prototype._findBlocksToRemove = function(commonHeader, callback) { + + var self = this; + var hash = self._tip.hash; + var height = self._tip.height; + var blocks = []; + var iterCount = 0; + + async.until(function() { + + return iterCount++ >= self._recentBlockHashes.length || hash === commonHeader.hash; + + }, function(next) { + + self._getBlock(hash, function(err, block) { + + if (err || !block) { + return next(err || new Error('Block Service: block not found in index.')); + } + + self._timestamp.getTimestamp(block.rhash(), function(err, timestamp) { + + if (err || !timestamp) { + return callback(err || new Error('timestamp missing from reorg.')); + } + + block.__height = height; + block.__ts = timestamp; + + blocks.push(block); + + hash = bcoin.util.revHex(block.prevBlock); + height--; + + next(); + + }); + + }); + + }, function(err) { + + if (err) { + return callback(err); + } + + callback(null, blocks); + + }); + +}; + +BlockService.prototype._handleReorg = function(callback) { var self = this; // we want to ensure that we can reask for previously delievered inventory self._p2p.clearInventoryCache(); - log.warn('Block Service: chain reorganization detected, current height/hash: ' + self._tip.height + '/' + - self._tip.hash + ' common ancestor hash: ' + commonAncestorHeader.hash + ' at height: ' + commonAncestorHeader.height); + var commonAncestorHeader; + var blocksToRemove; async.series([ - self._setTip.bind(self, { hash: commonAncestorHeader.hash, height: commonAncestorHeader.height }), - self._processReorg.bind(self, commonAncestorHeader, hashesToRemove), + + function(next) { + + self._findLatestValidBlockHeader(function(err, _commonAncestorHeader) { + + if(err) { + return next(err); + } + + commonAncestorHeader = _commonAncestorHeader; + next(); + + }); + }, + + function(next) { + + self._findBlocksToRemove(commonAncestorHeader, function(err, _blocksToRemove) { + + if (err) { + return next(err); + } + + blocksToRemove = _blocksToRemove; + + assert(blocksToRemove.length >= 1 && blocksToRemove.length <= self._recentBlockHashes.length, + 'Block Service: the number of blocks to remove looks to be incorrect.'); + + log.warn('Block Service: chain reorganization detected, current height/hash: ' + self._tip.height + '/' + + self._tip.hash + ' common ancestor hash: ' + commonAncestorHeader.hash + ' at height: ' + commonAncestorHeader.height + + ' There are: ' + blocksToRemove.length + ' block(s) to remove.'); + next(); + }); + }, + + function(next) { + self._setTip({ hash: commonAncestorHeader.hash, height: commonAncestorHeader.height }, next); + }, + + function(next) { + self._processReorg(commonAncestorHeader, blocksToRemove, next); + } + ], callback); }; -BlockService.prototype._processReorg = function(commonAncestorHeader, hashesToRemove, callback) { +BlockService.prototype._processReorg = function(commonAncestorHeader, blocksToRemove, callback) { var self = this; var operations = []; var blockCount = 0; var bar = new utils.IndeterminateProgressBar(); - log.info('Block Service: Processing the reorganization.'); + async.eachSeries(blocksToRemove, function(block, next) { - async.eachSeries(hashesToRemove, function(tip, next) { + if (process.stdout.isTTY) { + bar.tick(); + } - if (process.stdout.isTTY) { - bar.tick(); + self._onReorg(commonAncestorHeader.hash, block, function(err, ops) { + + if (err) { + return next(err); } - self._getReorgBlock(tip, function(err, block) { + blockCount++; + operations = operations.concat(ops); + self._recentBlockHashes.del(block.rhash()); + next(); - if (err || !block) { - return next(err || new Error('Block Service: block should be in the index.')); - } + }); - self._onReorg(commonAncestorHeader.hash, block, function(err, ops) { - - if (err) { - return next(err); - } - - blockCount++; - operations = operations.concat(ops); - self._recentBlockHashes.del(tip.hash); - next(); - }); - - }); }, function(err) { if (err) { @@ -719,31 +810,6 @@ BlockService.prototype._processReorg = function(commonAncestorHeader, hashesToRe }); }; -BlockService.prototype._getReorgBlock = function(tip, callback) { - - var self = this; - - self._getBlock(tip.hash, function(err, block) { - - if (err || !block) { - return callback(err || new Error('block not found for reorg.')); - } - - self._timestamp.getTimestamp(tip.hash, function(err, timestamp) { - - if (err || !timestamp) { - return callback(err || new Error('timestamp missing from reorg.')); - } - - block.__height = tip.height; - block.__ts = timestamp; - callback(null, block); - }); - - }); - -}; - BlockService.prototype._onBlock = function(block, callback) { var self = this; diff --git a/lib/services/header/index.js b/lib/services/header/index.js index edf3fa7b..740775ab 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -41,7 +41,6 @@ HeaderService.dependencies = [ 'p2p', 'db' ]; HeaderService.MAX_CHAINWORK = new BN(1).ushln(256); HeaderService.STARTING_CHAINWORK = '0000000000000000000000000000000000000000000000000000000100010001'; -// --- public prototype functions HeaderService.prototype.subscribe = function(name, emitter) { this.subscriptions[name].push(emitter); log.info(emitter.remoteAddress, 'subscribe:', 'header/' + name, 'total:', this.subscriptions[name].length); @@ -604,6 +603,8 @@ HeaderService.prototype._getHeader = function(height, hash, callback) { }; +// we aren't go to get fancy with this, we are just going to wipe out the +// last 2000 or so headers and re-ask our peer for the last set of headers. HeaderService.prototype._detectReorg = function(block) { return bcoin.util.revHex(block.prevBlock) !== this._lastHeader.hash; }; @@ -820,7 +821,7 @@ HeaderService.prototype._handleLowTipHeight = function() { HeaderService.prototype._logProgress = function() { - if (!this._initialSync) { + if (!this._initialSync || this._lastTipHeightReported === this._tip.height) { return; } @@ -836,6 +837,7 @@ HeaderService.prototype._logProgress = function() { log.info('Header Service: download progress: ' + this._tip.height + '/' + bestHeight + ' (' + progress + '%)'); + this._lastTipHeightReported = this._tip.height; }; HeaderService.prototype._getP2PHeaders = function(hash) { diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index 68e1de35..6e3278d3 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -43,11 +43,11 @@ MempoolService.prototype.start = function(callback) { return callback(err); } self._encoding = new Encoding(prefix); - self._startSubscriptions(); if (self._flush) { return self._flushMempool(callback); } + log.info('Mempool Service: mempool disabled until full sync.'); callback(); }); }; @@ -137,14 +137,13 @@ MempoolService.prototype._startSubscriptions = function() { self._bus.on('p2p/transaction', self._onTransaction.bind(self)); self._bus.subscribe('p2p/transaction'); - self._p2p.on('bestHeight', function() { - log.info('Mempool Service: Geting mempool from peer.'); - self._p2p.getMempool(); - }); - }; MempoolService.prototype.enable = function() { + log.info('Mempool Service: Mempool enabled.'); + this._startSubscriptions(); + log.info('Mempool Service: Geting mempool from peer.'); + this._p2p.getMempool(); this._enabled = true; }; diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index 3e78acf8..46b98738 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -422,7 +422,10 @@ TransactionService.prototype.onReorg = function(args, callback) { }; TransactionService.prototype._getSpentInfo = function(input, callback) { - this._db.get(this._encoding.encodeSpentKey(input.prevout.txid(), input.prevout.index), callback); + if (!this.node.stopping) { + return this._db.get(this._encoding.encodeSpentKey(input.prevout.txid(), input.prevout.index), callback); + } + callback(); }; TransactionService.prototype._getSpentTxOperations = function(tx, callback) {