diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 055f37b5..c2b14302 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -28,7 +28,6 @@ var BlockService = function(options) { this._blockCount = 0; this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; this._initialSync = true; - this._reorgQueue = []; }; inherits(BlockService, BaseService); @@ -50,18 +49,29 @@ BlockService.prototype.getAPIMethods = function() { }; BlockService.prototype.getInfo = function(callback) { - callback(null, { - blocks: this.getTip().height, - connections: this._p2p.getNumberOfPeers(), - timeoffset: 0, - proxy: '', - testnet: this.node.network === 'livenet' ? false: true, - errors: '', - network: this.node.network, - relayFee: 0, - version: 'bitcore-1.1.2', - protocolversion: 700001, - difficulty: this._header.getCurrentDifficulty() + var self = this; + + // check to ensure the header service is all up-to-date + async.retry(function(next) { + var ready = self._header.getBestHeight() < self.getTip().height; + next(ready); + }, function(err) { + if (err) { + return callback(err); + } + callback(null, { + blocks: self.getTip().height, + connections: self._p2p.getNumberOfPeers(), + timeoffset: 0, + proxy: '', + testnet: self.node.network === 'livenet' ? false: true, + errors: '', + network: self.node.network, + relayFee: 0, + version: 'bitcore-1.1.2', + protocolversion: 700001, + difficulty: self._header.getCurrentDifficulty() + }); }); }; @@ -189,14 +199,7 @@ BlockService.prototype._checkTip = function(callback) { return callback(err || new Error('All headers not found.')); } - assert(self._reorgQueue.length === 0, 'Reorg Queue is not the expected length.'); - - var hash = header.hash; - if (self._reorgQueue.indexOf(hash) === -1) { - self._reorgQueue.push(hash); - } - - self._handleReorg(hash, headers, callback); + self._handleReorg(header.hash, headers, callback); }); @@ -280,79 +283,41 @@ BlockService.prototype._detectReorg = function(block) { return false; }; +BlockService.prototype._getOldHeaders = function(commonAncestorHeader, allHeaders) { + + var header = allHeaders.get(this._tip.hash); + assert(header, 'Header needed to find old blocks during reorg.'); + var headers = []; + + while (header.height > commonAncestorHeader.height) { + headers.push(header); + header = allHeaders.get(header.prevHash); + } + + return headers; + +}; + BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback) { + // the prev hash of the passed in "hash" should also point to a block who has the same prev hash. + var self = this; - var count = 0; - var _oldTip = self._tip.hash; - var _newTip = hash; - var oldBlocks = []; - assert(_oldTip, 'We don\'t have a tip hash to reorg away from!'); + var reorgHeader = allHeaders.get(hash); + assert(reorgHeader, 'No reorg header found, cannot find common ancestor.'); - async.whilst( - // test case - function() { + var prevHash = reorgHeader.prevHash; - return _oldTip !== _newTip && ++count < allHeaders.size; + self.getBlock(prevHash, function(err, block) { - }, - // get block - function(next) { + if (err) { + return callback(err); + } - // old tip (our current tip) has to be in database - self._db.get(self._encoding.encodeBlockKey(_oldTip), function(err, data) { - - if (err || !data) { - return next(err || new Error('missing block')); - } - - // once we've found the old tip, we will find its prev and check to see if matches new tip's prev - var block = self._encoding.decodeBlockValue(data); - - // for the purposes of building the old blocks list - - // apply the block's height - block.__height = self._tip.height; - - // apply the block's timestamp - self._timestamp.getTimestamp(block.rhash(), function(err, timestamp) { - - if (err || !timestamp) { - return next(err || new Error('missing timestamp')); - } - - block.__ts = timestamp; - - // we will squirrel away the block because our services will need to remove it after we've found the common ancestor - oldBlocks.push(block); - - // this is our current tip's prev hash - _oldTip = bcoin.util.revHex(block.prevBlock); - - // our current headers have the correct state of the chain, so consult that for its prev hash - var header = allHeaders.get(_newTip); - - assert(header, 'The expected hash is not in the headers list.'); - - // set new tip to the prev hash - _newTip = header.prevHash; - next(); - - }); - }); - - }, function(err) { - - if (err) { - return callback(err); - } - - assert(_newTip === _oldTip, 'A common ancestor hash could not be found.'); - - callback(null, _newTip, oldBlocks); - - }); + var commonAncestorHeader = allHeaders.get(block.rhash()); + callback(null, commonAncestorHeader); + }); }; BlockService.prototype._getBlock = function(hash, callback) { @@ -402,18 +367,15 @@ BlockService.prototype._handleReorg = function(reorgHash, allHeaders, callback) var self = this; - self._reorging = true; - var reorgHeader = allHeaders.get(reorgHash); assert(reorgHeader, 'We were asked to reorg to a non-existent hash.'); - self._findCommonAncestor(reorgHash, allHeaders, function(err, commonAncestorHash, oldBlocks) { + self._findCommonAncestor(reorgHash, allHeaders, function(err, commonAncestorHeader) { if (err) { return callback(err); } - var commonAncestorHeader = allHeaders.get(commonAncestorHash); assert(commonAncestorHeader, 'A common ancestor hash was found, but its header could not he found.'); // if we are syncing and we haven't sync'ed to the common ancestor hash, we can safely ignore this reorg @@ -421,23 +383,25 @@ BlockService.prototype._handleReorg = function(reorgHash, allHeaders, callback) return callback(); } - log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash + - ' at height: ' + commonAncestorHeader.height + '. Removing: ' + oldBlocks.length + ' block(s) from our indexes.'); + var reorgHeaders = self._getOldHeaders(commonAncestorHeader, allHeaders); - self._processReorg(commonAncestorHeader, oldBlocks, function(err) { + assert(reorgHeaders, 'Expected to have reorg headers to remove'); + + log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash + + ' and height: ' + commonAncestorHeader.height + '. Removing a total of: ' + reorgHeaders.length + ' block(s).'); + + self._processReorg(commonAncestorHeader, reorgHeaders, function(err) { if(err) { return callback(err); } - self._reorging = false; callback(); }); }); - }; // this JUST rewinds the chain back to the common ancestor block, nothing more @@ -491,48 +455,59 @@ BlockService.prototype._onAllHeaders = function() { }; - -BlockService.prototype._processReorg = function(commonAncestorHeader, oldBlocks, callback) { - +BlockService.prototype._processReorg = function(commonAncestorHeader, reorgHeaders, callback) { var self = this; var operations = []; var services = self.node.services; - async.eachSeries( - services, - function(mod, next) { - if(mod.onReorg) { - mod.onReorg.call(mod, [commonAncestorHeader, oldBlocks], function(err, ops) { - if (err) { - return next(err); - } - if (ops) { - operations = operations.concat(ops); - } - next(); - }); - } else { - setImmediate(next); - } - }, + // process one block at a time just in case we have a huge amount of blocks to back out + async.eachSeries(reorgHeaders, function(header, next) { - function(err) { - - if (err) { - return callback(err); - } - - self._db.batch(operations, function(err) { - - if (err) { - return callback(err); + async.waterfall([ + function(next) { + self.getBlock(header.hash, next); + }, + function(block, next) { + if (!block) { + return next(new Error('block not found for reorg.')); } + self._timestamp.getTimestamp(header.hash, function(err, timestamp) { + if (err || !timestamp) { + return next(err || new Error('timestamp missing from reorg.')); + } + block.__height = header.height; + block.__ts = timestamp; + next(null, block); + }); + }, + function(block, next) { + async.eachSeries(services, function(mod, next) { + if(mod.onReorg) { + mod.onReorg.call(mod, [commonAncestorHeader, [block]], function(err, ops) { + if (err) { + return next(err); + } + if (ops) { + operations = operations.concat(ops); + } + self._onReorg(commonAncestorHeader, [block], next); + }); + } else { + setImmediate(next); + } + }, next); + } + ], next); + }, function(err) { - self._onReorg(commonAncestorHeader, oldBlocks, callback); - - }); + if (err) { + return callback(err); } - ); + + self._db.batch(operations, callback); + + }); + }; BlockService.prototype._processBlock = function(block) { @@ -613,9 +588,9 @@ BlockService.prototype.onBlock = function(block, callback) { }); }; -BlockService.prototype._onBlock = function(block) { +BlockService.prototype.newBlock = function(block) { - if (this.node.stopping || this._reorging || this._tip.hash === block.rhash()) { + if (this.node.stopping || this._tip.hash === block.rhash()) { return; } @@ -635,41 +610,19 @@ BlockService.prototype._onBlock = function(block) { }; BlockService.prototype._setListeners = function() { - var self = this; - self._header.on('headers', self._onAllHeaders.bind(self)); +}; - self._header.on('reorg', function(block, headers) { - - log.debug('Block Service: detected a reorg from the header service.'); - - if (self._reorgQueue.indexOf(block) === -1) { - self._reorgQueue.push(block); +BlockService.prototype.newReorg = function(block, headers, callback) { + var self = this; + log.info('Block Service: detected a reorg from the header service.'); + self._handleReorg(block.rhash(), headers, function(err) { + if (err) { + return callback(err); } - - var _block; - - async.whilst(function() { - _block = self._reorgQueue.shift(); - return _block; - }, function(next) { - self._handleReorg(_block.rhash(), headers, function(err) { - if (err) { - return next(err); - } - self._onBlock(_block); - next(); - }); - }, function(err) { - if (err) { - log.error(err); - self.node.stop(); - } - }); - + self.newBlock(block, callback); }); - }; BlockService.prototype._setTip = function(tip) { @@ -687,21 +640,6 @@ BlockService.prototype._startSync = function() { this._sync(); }; -BlockService.prototype._startSubscriptions = function() { - - if (this._subscribed) { - return; - } - - this._subscribed = true; - if (!this._bus) { - this._bus = this.node.openBus({remoteAddress: 'localhost-block'}); - } - - this._bus.on('header/block', this._onBlock.bind(this)); - this._bus.subscribe('header/block'); -}; - BlockService.prototype._sync = function() { var self = this; @@ -742,7 +680,7 @@ BlockService.prototype._sync = function() { endHash: nextHash }, blockHash: targetHash - }, self._onBlock.bind(self)); + }, self.newBlock.bind(self)); }); @@ -754,7 +692,6 @@ BlockService.prototype._sync = function() { log.info('Block Service: The best block hash is: ' + self._tip.hash + ' at height: ' + self._tip.height); - this._startSubscriptions(); }; module.exports = BlockService; diff --git a/lib/services/header/index.js b/lib/services/header/index.js index f55da4a6..bf607da9 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -27,7 +27,6 @@ var HeaderService = function(options) { this._checkpoint = options.checkpoint || 2000; // set to -1 to resync all headers. this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; this._lastHeader = null; - this.blockServiceSyncing = true; this.lastBlockQueried = null; this._initialSync = true; }; @@ -281,11 +280,12 @@ HeaderService.prototype._processBlocks = function(block, callback) { return callback(err); } - if (!self.blockServiceSyncing) { - self._broadcast(block); - } - - callback(); + async.eachSeries(self.node.services, function(mod, next) { + if (!mod.newBlock) { + return setImmediate(next); + } + mod.newBlock.call(mod, block, next); + }, callback); }); @@ -307,10 +307,6 @@ HeaderService.prototype._persistHeader = function(block, callback) { } - log.warn('Header Service: Reorganization detected, current tip hash: ' + - self._tip.hash + ', new block causing the reorg: ' + block.rhash() + - ' common ancestor hash: ' + commonHeader.hash); - self._handleReorg(block, commonHeader, function(err) { if(err) { @@ -409,7 +405,8 @@ HeaderService.prototype._onHeaders = function(headers) { header = header.toObject(); - assert(self._lastHeader.hash === header.prevHash, 'headers not in order: ' + self._lastHeader.hash + ' -and- ' + header.prevHash + ' Last header at height: ' + self._lastHeader.height); + assert(self._lastHeader.hash === header.prevHash, 'headers not in order: ' + self._lastHeader.hash + + ' -and- ' + header.prevHash + ' Last header at height: ' + self._lastHeader.height); var ops = self._onHeader(header); @@ -567,7 +564,8 @@ HeaderService.prototype._detectReorg = function(block, callback) { return callback(null, header); } - log.warn('Block: ' + block.rhash() + 'references: ' + prevHash + ' as its previous block, yet we have not stored this block in our data set, thus ignoring this block.'); + log.warn('Block: ' + block.rhash() + 'references: ' + prevHash + + ' as its previous block, yet we have not stored this block in our data set, thus ignoring this block.'); callback(null, false); }); @@ -577,6 +575,12 @@ HeaderService.prototype._detectReorg = function(block, callback) { HeaderService.prototype._handleReorg = function(block, commonHeader, callback) { var self = this; + + log.warn('Header Service: Reorganization detected, current tip hash: ' + + self._tip.hash + ', new block causing the reorg: ' + block.rhash() + + ' common ancestor hash: ' + commonHeader.hash + ' and height: ' + + commonHeader.height); + var reorgHeader = self._formatHeader(block); self.getAllHeaders(function(err, headers) { @@ -595,11 +599,12 @@ HeaderService.prototype._handleReorg = function(block, commonHeader, callback) { return callback(err); } - // emit the fact that there is a reorg even though the block - // service may not have reached this point in its sync. - // Let the block service sort that our - self.emit('reorg', block, headers); - return callback(); + async.eachSeries(self.node.services, function(mod, next) { + if (!mod.newReorg) { + return setImmediate(next); + } + mod.newReorg.call(mod, block, headers, next); + }, callback); }); }); @@ -689,8 +694,6 @@ HeaderService.prototype._findReorgConditionInNewPeer = function(callback) { var self = this; - self._removeAllSubscriptions(); - var newPeerHeaders = new utils.SimpleMap(); var headerCount = 0; @@ -757,6 +760,8 @@ HeaderService.prototype._handleLowTipHeight = function() { self._tip.height + '). This means that this peer is not fully synchronized with the network -or- the peer has reorganized itself.' + ' Checking the new peer\'s headers for a reorganization event.'); + self._removeAllSubscriptions(); + self._findReorgConditionInNewPeer(function(err, reorgInfo) { if (err) { @@ -794,6 +799,7 @@ HeaderService.prototype._handleLowTipHeight = function() { return self.node.stop(); } + self._startHeaderSubscription(); self._sync(); }); diff --git a/lib/services/mempool/index.js b/lib/services/mempool/index.js index 8de50038..302affcf 100644 --- a/lib/services/mempool/index.js +++ b/lib/services/mempool/index.js @@ -1,7 +1,6 @@ 'use strict'; var BaseService = require('../../service'); var util = require('util'); -var utils = require('../../utils'); var Encoding = require('./encoding'); var index = require('../../'); var log = index.log; @@ -94,7 +93,9 @@ MempoolService.prototype.onReorg = function(args, callback) { } } - callback(null, removalOps); + setImmediate(function() { + callback(null, removalOps); + }); }; MempoolService.prototype._startSubscriptions = function() { diff --git a/lib/services/transaction/index.js b/lib/services/transaction/index.js index 37cf7fb6..43ce75cb 100644 --- a/lib/services/transaction/index.js +++ b/lib/services/transaction/index.js @@ -317,7 +317,9 @@ TransactionService.prototype.onReorg = function(args, callback) { } } - callback(null, removalOps); + setImmediate(function() { + callback(null, removalOps); + }); }; diff --git a/test/services/block/index.unit.js b/test/services/block/index.unit.js index 0a96bbc2..9d5b9d42 100644 --- a/test/services/block/index.unit.js +++ b/test/services/block/index.unit.js @@ -43,16 +43,44 @@ describe('Block Service', function() { }); }); + describe('#_getOldBlocks', function() { + + it('should get the old block list (blocks to be removed from indexes)', function(done) { + + blockService._tip = { height: 10, hash: 'cc' }; + + var header = { height: 8, hash: 'aa' }; + var getStub = sandbox.stub(); + + getStub.onCall(0).returns({ hash: 'bb', height: 9 }); + getStub.onCall(1).returns({ hash: 'aa', height: 8 }); + + var allHeaders = { get: getStub }; + sandbox.stub(blockService, 'getBlock').callsArgWith(1, null, {}); + + blockService._timestamp = { getTimestamp: sinon.stub().callsArgWith(1, null, 1234) }; + + blockService._getOldBlocks(header, allHeaders, function(err, blocks) { + if (err) { + return done(err); + } + expect(blocks.length).to.equal(1); + expect(blocks[0].__height).to.equal(9); + expect(blocks[0].__ts).to.equal(1234); + done(); + }); + + }); + }); + describe('#_findCommonAncestor', function() { it('should find the common ancestor between the current chain and the new chain', function(done) { - blockService._tip = { hash: block2.rhash(), height: 70901 }; - var data = blockService._encoding.encodeBlockValue(block2); - blockService._db = { get: sandbox.stub().callsArgWith(1, null, data) }; - blockService._timestamp = { getTimestamp: sandbox.stub().callsArgWith(1, null, 1234) }; - var commonAncestorHash = bcoin.util.revHex(block2.prevBlock); - var allHeaders = { get: sandbox.stub().returns({ prevHash: commonAncestorHash }), size: 1e6 }; + sandbox.stub(blockService, 'getBlock').callsArgWith(1, null, block1); + sandbox.stub(blockService, '_getOldBlocks').callsArgWith(2, null, [block2]); + + var allHeaders = { get: sandbox.stub().returns({ hash: 'somecommonancestor', prevHash: block1.rhash() }), size: 1e6 }; blockService._findCommonAncestor('aa', allHeaders, function(err, commonAncestorHashActual, oldBlockList) { @@ -60,9 +88,7 @@ describe('Block Service', function() { return done(err); } - block2.__ts = 1234; - block2.__height = 70901; - expect(commonAncestorHashActual).to.equal(commonAncestorHash); + expect(commonAncestorHashActual).to.equal('somecommonancestor'); expect(oldBlockList).to.deep.equal([block2]); done(); @@ -123,11 +149,9 @@ describe('Block Service', function() { it('should set listeners for headers, reorg', function() { var on = sandbox.stub(); - var once = sandbox.stub(); - blockService._header = { on: on, once: once }; + blockService._header = { on: on }; blockService._setListeners(); - expect(on.calledOnce).to.be.true; - expect(once.calledOnce).to.be.true; + expect(on.calledTwice).to.be.true; }); }); diff --git a/test/services/header/index.unit.js b/test/services/header/index.unit.js index 374afbdc..ae2466d5 100644 --- a/test/services/header/index.unit.js +++ b/test/services/header/index.unit.js @@ -10,6 +10,7 @@ var utils = require('../../../lib/utils'); var Block = require('bitcore-lib').Block; var BN = require('bn.js'); var Emitter = require('events').EventEmitter; +var bcoin = require('bcoin'); describe('Header Service', function() { @@ -101,11 +102,15 @@ describe('Header Service', function() { it('should start the sync process', function() { headerService._bestHeight = 123; + var unsub = sinon.stub(); headerService._tip = { height: 120 }; + headerService._bus = { unsubscribe: unsub }; + headerService._blockProcessor = { length: 0 }; var getHeaders = sandbox.stub(); headerService._p2p = { getHeaders: getHeaders }; headerService._lastHeader = { height: 124 }; headerService._startSync(); + expect(unsub.calledOnce).to.be.true; expect(getHeaders.calledOnce).to.be.true; }); @@ -133,7 +138,11 @@ describe('Header Service', function() { var onHeader = sandbox.stub(headerService, '_onHeader'); var saveHeaders = sandbox.stub(headerService, '_saveHeaders'); headerService._tip = { height: 123, hash: 'aa' }; + + headerService._lastHeader = { hash: header.prevHash }; + headerService._onHeaders(headers); + expect(onHeader.calledOnce).to.be.true; expect(saveHeaders.calledOnce).to.be.true;