From ffa63fc146934b3477956433bab056b280a84fcb Mon Sep 17 00:00:00 2001 From: Chris Kleeschulte Date: Sun, 1 Oct 2017 19:15:20 -0400 Subject: [PATCH] Added reorg fixes. --- lib/services/block/index.js | 58 +++++++++++++++++++++++++++---- lib/services/header/index.js | 16 +++++++-- lib/services/p2p/index.js | 21 ++++++++--- package-lock.json | 32 ++++++++--------- package.json | 2 +- test/services/block/index.unit.js | 25 +++++++------ test/services/db/index.unit.js | 2 +- 7 files changed, 114 insertions(+), 42 deletions(-) diff --git a/lib/services/block/index.js b/lib/services/block/index.js index 69094e7e..cd912832 100644 --- a/lib/services/block/index.js +++ b/lib/services/block/index.js @@ -23,7 +23,7 @@ var BlockService = function(options) { this._timestamp = this.node.services.timestamp; this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; - this._initialSync = true; + this._initialSync = false; this._reorgBackToBlock = null; // use this to rewind your indexes to a specific point by height or hash this._timeOfLastBlockReport = Date.now() - 30000; this._blocksInQueue = 0; @@ -171,6 +171,7 @@ BlockService.prototype._checkTip = function(callback) { log.info('Block Service: checking the saved tip...'); if (self._reorgBackToBlock) { + self._reorgBackToBlock = false; log.warn('Block Service: we were asked to reorg back to block: ' + self._reorgBackToBlock); return self._reorgBackTo(callback); } @@ -188,6 +189,7 @@ BlockService.prototype._checkTip = function(callback) { return callback(); } + self._reorging = true; self._findCommonAncestor(function(err, commonAncestorHeader) { if(err) { return callback(err); @@ -494,6 +496,7 @@ BlockService.prototype._onReorg = function(commonAncestorHash, block, callback) BlockService.prototype._removeAllSubscriptions = function() { this._bus.unsubscribe('p2p/block'); this._bus.removeAllListeners(); + this.removeAllListeners(); // will remove listeners for 'next block' and 'synced' this._subscribedBlock = false; }; @@ -501,31 +504,45 @@ BlockService.prototype.onHeaders = function(callback) { var self = this; - // when this fires, we switch into initial sync mode (like first boot) - // this includes finishing any blocks that may be in the queue, then - // checking the tip, etc. - // we may be in a reorg situation at the time this function runs - // because the header service runs this function after it completes its - // reorg routines. + // this fires under 2 conditions: + // 1. on initial boot right after all headers are synced by the header service + // 2. right after the header service handles a reorg + + self._initialSync = true; + self._removeAllSubscriptions(); + // check whether or not we need build a new tip (unlikely) self._resetTip(function(err) { + if (err) { return callback(err); } + + // clear out the blocks queue, if any blocks exist there + // if this is a reorg sitch, then there may be blocks, but none of + // them will be saved. async.retry(function(next) { next(self._blocksInQueue > 0); }, function() { + // check to see if our current tip matches what the header service has. + // if this is a reorg during our initial block sync, it is especially important + // to see if we've synced past where the reorg/fork took place. self._checkTip(function(err) { if(err) { return callback(err); } + // we've checked the tip, handled any reorg for ourselves, and we are ready to + // sync any new blocks that might exist after our tip self._startSync(); + + // once we start syncing, we can call back to the header service, so that it can + // process the next block in its queue. callback(); }); @@ -760,14 +777,20 @@ BlockService.prototype._handleError = function(err) { BlockService.prototype._syncBlock = function(block) { var self = this; + clearTimeout(self._getBlocksTimer); + self._saveBlock(block, function(err) { + if(err) { return self._handleError(err); } + if (self._tip.height < self._header.getLastHeader().height) { return self.emit('next block'); } + self.emit('synced'); + }); }; @@ -837,6 +860,12 @@ BlockService.prototype._sync = function() { return; } + if (self._currentQuery === self._tip.hash) { + return; + } + + self._currentQuery = self._tip.hash; + self._header.getNextHash(self._tip, function(err, targetHash, nextHash) { if(err) { @@ -847,6 +876,21 @@ BlockService.prototype._sync = function() { // this will lead to duplicate transactions being sent self._p2p.clearInventoryCache(); + // if we don't get our callback called in due time, + // then we must assume we've reorg'ed very shortly after + // we made this call and we should re-compute where we are + self._getBlocksTimer = setTimeout(function() { + self._currentQuery = null; + self.emit('next block'); + }, 5000); + + self._getBlocksTimer.unref(); + + // TODO; research how different bitcoin implementation handle block + // locator objects. If you pass a block locator object that has one + // block hash and that block hash is not on the main chain, then will + // the peer send an inv for block 1 or no inv at all? + self._p2p.getP2PBlock({ filter: { startHash: self._tip.hash, diff --git a/lib/services/header/index.js b/lib/services/header/index.js index 7a6658d1..612ede65 100644 --- a/lib/services/header/index.js +++ b/lib/services/header/index.js @@ -28,6 +28,8 @@ var HeaderService = function(options) { this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network]; this._lastHeader = null; this._initialSync = true; + + this._slowMode = options.slowMode; }; inherits(HeaderService, BaseService); @@ -738,6 +740,7 @@ HeaderService.prototype._findReorgConditionInNewPeer = function(callback) { var newPeerHeaders = new utils.SimpleMap(); var headerCount = 0; + // TODO: these 2 list could be incredibly long lists and could cause a memory crash self.getAllHeaders(function(err, allHeaders) { if (err) { @@ -820,7 +823,7 @@ HeaderService.prototype._handleLowTipHeight = function() { } // our peer has reorg'ed to lower overall height. - // we should get the first block after the split, reorg back to this height and then continue. + // we should get the first block after the split, reorg back to this height - 1 and then continue. // we should still have no listeners for anything, blocks, headers, etc. at this point self._p2p.getP2PBlock({ filter: { @@ -831,6 +834,7 @@ HeaderService.prototype._handleLowTipHeight = function() { }, function(block) { self._initialSync = true; + self._handleReorg(block, reorgInfo.commonHeader, function(err) { if(err) { @@ -843,6 +847,8 @@ HeaderService.prototype._handleLowTipHeight = function() { self._handleError(err); } + // run start sync again. This time we should be back on track. + self._startSync(); }); }); @@ -903,7 +909,7 @@ HeaderService.prototype.getNextHash = function(tip, callback) { var numResultsNeeded = 2; // if the tip being passed in is the second to last block, then return 0 because there isn't a block - if (tip.height + 1 === self._tip.height) { + if (tip.height + 1 >= self._tip.height) { numResultsNeeded = 1; } @@ -940,6 +946,12 @@ HeaderService.prototype.getNextHash = function(tip, callback) { results[1] = 0; } + if (self._slowMode) { + return setTimeout(function() { + callback(null, results[0], results[1]); + }, self._slowMode); + } + callback(null, results[0], results[1]); }); diff --git a/lib/services/p2p/index.js b/lib/services/p2p/index.js index 8a7647d0..a2119fbd 100644 --- a/lib/services/p2p/index.js +++ b/lib/services/p2p/index.js @@ -51,13 +51,26 @@ P2P.prototype.getNumberOfPeers = function() { P2P.prototype.getP2PBlock = function(opts, callback) { // opts is { filter: {}, blockHash: block hash we want } - var peer = this._getPeer(); + var self = this; + var peer = self._getPeer(); - var blockFilter = this._setResourceFilter(opts.filter, 'blocks'); + var blockFilter = self._setResourceFilter(opts.filter, 'blocks'); - this.once(opts.blockHash, callback); + // there is a possibility that the main chain has reorganized after we last + // computed our expected block and before our peer computes what block to + // send us in response. + // In self case, we want to abandon self block and remove its listener. + // Our caller should also reset its expectations and re-compute its expected + // block and call us again. + // If we are wrong about the reorg, then the peer is just really slow and we ought + // to use self peer anyway. + setTimeout(function() { + self.removeListener(opts.blockHash, callback); + }, 5000); - peer.sendMessage(this.messages.GetBlocks(blockFilter)); + self.once(opts.blockHash, callback); + + peer.sendMessage(self.messages.GetBlocks(blockFilter)); }; P2P.prototype.getHeaders = function(filter) { diff --git a/package-lock.json b/package-lock.json index 9cf16a67..a98b8230 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "bitcore-node", - "version": "5.0.0-beta.12", + "version": "5.0.0-beta.13", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -2629,11 +2629,11 @@ } }, "leveldown": { - "version": "1.8.0", - "resolved": "https://registry.npmjs.org/leveldown/-/leveldown-1.8.0.tgz", - "integrity": "sha512-4r02OXouz24Sxs+i8ZNgDZxrRMRZq6BWS7pj6vjKEb8DFnEsveyaPIxF9Y3uHL5+jftWUDsmThT90epRrC6aGw==", + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/leveldown/-/leveldown-1.9.0.tgz", + "integrity": "sha512-3MwcrnCUIuFiKp/jSrG1UqDTV4k1yH8f5HH6T9dpqCKG+lRxcfo2KwAqbzTT+TTKfCbaATeHMy9mm1y6sI3ZvA==", "requires": { - "abstract-leveldown": "2.7.0", + "abstract-leveldown": "2.7.1", "bindings": "1.3.0", "fast-future": "1.0.2", "nan": "2.7.0", @@ -2641,9 +2641,9 @@ }, "dependencies": { "abstract-leveldown": { - "version": "2.7.0", - "resolved": "https://registry.npmjs.org/abstract-leveldown/-/abstract-leveldown-2.7.0.tgz", - "integrity": "sha512-maam3ZrTeORbXKEJUeJZkYOsorEwr060WitXuQlUuIFlg0RofyyHts49wtaVmShJ6l0wEWB0ZtPhf6QYBA7D2w==", + "version": "2.7.1", + "resolved": "https://registry.npmjs.org/abstract-leveldown/-/abstract-leveldown-2.7.1.tgz", + "integrity": "sha512-CUMZ9D4zNd0KHWJsce57ZbQKRJxh/6jqCFLDBf3E8IwtI7J/JbSs2pyI8YtO264XEMX4bssrAly53vvAv4qifQ==", "requires": { "xtend": "4.0.1" } @@ -4266,6 +4266,14 @@ "resolved": "https://registry.npmjs.org/statuses/-/statuses-1.3.1.tgz", "integrity": "sha1-+vUbnrdKrvOzrPStX2Gr8ky3uT4=" }, + "string_decoder": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.0.3.tgz", + "integrity": "sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==", + "requires": { + "safe-buffer": "5.1.1" + } + }, "string-length": { "version": "1.0.1", "resolved": "https://registry.npmjs.org/string-length/-/string-length-1.0.1.tgz", @@ -4285,14 +4293,6 @@ "strip-ansi": "3.0.1" } }, - "string_decoder": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.0.3.tgz", - "integrity": "sha512-4AH6Z5fzNNBcH+6XDMfA/BTt87skxqJlO0lAh3Dker5zThcAxG6mKz+iGu308UKoPPQ8Dcqx/4JhujzltRa+hQ==", - "requires": { - "safe-buffer": "5.1.1" - } - }, "stringstream": { "version": "0.0.5", "resolved": "https://registry.npmjs.org/stringstream/-/stringstream-0.0.5.tgz", diff --git a/package.json b/package.json index 1ba2385b..e4ffb2ba 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "node": ">=8.0.0" }, "author": "BitPay ", - "version": "5.0.0-beta.12", + "version": "5.0.0-beta.13", "main": "./index.js", "repository": "git://github.com/bitpay/bitcore-node.git", "homepage": "https://github.com/bitpay/bitcore-node", diff --git a/test/services/block/index.unit.js b/test/services/block/index.unit.js index 22ca27b9..ef44c2d4 100644 --- a/test/services/block/index.unit.js +++ b/test/services/block/index.unit.js @@ -51,17 +51,15 @@ describe('Block Service', function() { sandbox.stub(blockService, '_getBlock').callsArgWith(1, null, block2); blockService._tip = { hash: 'aa' }; var headers = new utils.SimpleMap(); - headers.set('aa', 'someheader'); - blockService._header = { getAllHeaders: sandbox.stub().callsArgWith(0, null, headers) }; - - blockService._findCommonAncestor(function(err, commonAncestorHashActual) { + blockService._header = { getBlockHeader: sandbox.stub().callsArgWith(1, null, { hash: 'bb' }) }; + blockService._findCommonAncestor(function(err, header) { if(err) { return done(err); } - expect(commonAncestorHashActual).to.equal('aa'); + expect(header).to.deep.equal({ hash: 'bb' }); done(); }); @@ -128,11 +126,16 @@ describe('Block Service', function() { describe('#_setTip', function() { - it('should set the tip if given a block', function() { - blockService._db = {}; + it('should set the tip if given a block', function(done) { + var saveTip = sandbox.stub(blockService, '_saveTip').callsArgWith(1, null); blockService._tip = { height: 99, hash: '00' }; - blockService._setTip({ height: 100, hash: 'aa' }); - expect(blockService._tip).to.deep.equal({ height: 100, hash: 'aa' }); + blockService._setTip({ height: 100, hash: 'aa' }, function(err) { + if(err) { + return done(err); + } + expect(blockService._tip).to.deep.equal({ height: 100, hash: 'aa' }); + done(); + }); }); }); @@ -151,11 +154,11 @@ describe('Block Service', function() { describe('#start', function() { - it('should get the prefix', function(done) { + it('should get the service started', function(done) { var getPrefix = sandbox.stub().callsArgWith(1, null, blockService._encoding); var getServiceTip = sandbox.stub().callsArgWith(1, null, { height: 1, hash: 'aa' }); var performSanityCheck = sandbox.stub(blockService, '_performSanityCheck').callsArgWith(1, null, { hash: 'aa', height: 123 }); - var setTip = sandbox.stub(blockService, '_setTip'); + var setTip = sandbox.stub(blockService, '_setTip').callsArgWith(1, null); blockService.node = { openBus: sandbox.stub() }; blockService._db = { getPrefix: getPrefix, getServiceTip: getServiceTip }; blockService.start(function() { diff --git a/test/services/db/index.unit.js b/test/services/db/index.unit.js index d2f2833c..482d5554 100644 --- a/test/services/db/index.unit.js +++ b/test/services/db/index.unit.js @@ -237,7 +237,7 @@ describe('DB', function() { it('should close the store if there is a store and it is open', function(done) { var close = sandbox.stub().callsArgWith(0, null); - dbService._store = { isOpen: sinon.stub().returns(true), close: close }; + dbService._store = { isClosed: sinon.stub().returns(false), close: close }; dbService.close(function(err) { if(err) {