From 3c53b1284c1f934d109bb07b78332dee95441acc Mon Sep 17 00:00:00 2001 From: Patrick Nagurny Date: Fri, 3 Feb 2017 17:06:00 -0500 Subject: [PATCH] got reorg working and wrote test around it --- lib/services/bitcoind/index.js | 2 + lib/services/db/index.js | 27 ++- lib/services/db/reorg.js | 293 +++++++++++++++++++++++++++------ lib/services/db/sync.js | 31 +++- test/services/db/reorg.unit.js | 135 +++++++++++++++ 5 files changed, 431 insertions(+), 57 deletions(-) create mode 100644 test/services/db/reorg.unit.js diff --git a/lib/services/bitcoind/index.js b/lib/services/bitcoind/index.js index 091d666e..f3405bde 100644 --- a/lib/services/bitcoind/index.js +++ b/lib/services/bitcoind/index.js @@ -465,6 +465,8 @@ Bitcoin.prototype._initChain = function(callback) { return callback(self._wrapRPCError(err)); } + self.tiphash = response.result; + self.client.getBlock(response.result, function(err, response) { if (err) { return callback(self._wrapRPCError(err)); diff --git a/lib/services/db/index.js b/lib/services/db/index.js index ea34b4e1..aceb248e 100644 --- a/lib/services/db/index.js +++ b/lib/services/db/index.js @@ -17,6 +17,7 @@ var log = index.log; var Transaction = require('../../transaction'); var Service = require('../../service'); var Sync = require('./sync'); +var Reorg = require('./reorg'); /** * This service synchronizes a leveldb database with bitcoin block chain by connecting and @@ -168,11 +169,31 @@ DB.prototype.start = function(callback) { log.error(err); }); - this._sync.on('reorg', function() { - throw new Error('Reorg detected! Unhandled :(') + this._sync.on('reorg', function(block) { + log.warn('Reorg detected! Tip: ' + self.tip.hash + + ' Concurrent tip: ' + self.concurrentTip.hash + + ' Bitcoind tip: ' + self.node.services.bitcoind.tiphash); + + self.reorg = true; + + var reorg = new Reorg(self.node, self); + reorg.handleReorg(block, function(err) { + if(err) { + log.error('Reorg failed! ' + err); + return self.node.stop(function() {}); + } + + log.warn('Reorg successful! Tip: ' + self.tip.hash + + ' Concurrent tip: ' + self.concurrentTip.hash + + ' Bitcoind tip: ' + self.node.services.bitcoind.tiphash + ); + + self.reorg = false; + self._sync.sync(); + }); }); - this._sync.on('initialsync', function() { + this._sync.on('synced', function() { log.info('Initial sync complete'); }); diff --git a/lib/services/db/reorg.js b/lib/services/db/reorg.js index d27238a6..68e44d5e 100644 --- a/lib/services/db/reorg.js +++ b/lib/services/db/reorg.js @@ -5,68 +5,243 @@ var BufferUtil = bitcore.util.buffer; var log = index.log; var async = require('async'); -function Reorg(node) { +function Reorg(node, db) { this.node = node; - this.db = node.db; + this.db = db; } Reorg.prototype.handleReorg = function(block, callback) { var self = this; - self.findCommonAncestor(block, function(err, ancestorHash) { - if (err) { - return done(err); + self.handleConcurrentReorg(function(err) { + if(err) { + return callback(err); } - log.warn('Reorg common ancestor found:', ancestorHash); - // Rewind the chain to the common ancestor - async.whilst( - function() { - // Wait until the tip equals the ancestor hash - return self.db.tip.hash !== ancestorHash; - }, - function(removeDone) { - var concurrentTip = self.tip; + self.findCommonAncestorAndNewHashes(self.db.tip, block, function(err, commonAncestor, newHashes) { + if(err) { + return callback(err); + } - // TODO: expose prevHash as a string from bitcore - var prevHash = BufferUtil.reverse(tip.header.prevHash).toString('hex'); + self.rewindBothTips(commonAncestor, function(err) { + if(err) { + return callback(err); + } - self.getBlock(prevHash, function(err, previousTip) { - if (err) { - removeDone(err); - } - - // Undo the related indexes for this block - self.disconnectBlock(tip, function(err) { - if (err) { - return removeDone(err); - } - - // Set the new tip - previousTip.__height = self.tip.__height - 1; - self.tip = previousTip; - self.emit('removeblock', tip); - removeDone(); - }); - - }); - - }, - callback - ); + self.fastForwardBothTips(newHashes, callback); + }); + }); }); }; -// Rewind concurrentTip to tip -Reorg.prototype.rewindConcurrentTip = function(callback) { - -}; - -Reorg.prototype.findCommonAncestor = function(block, callback) { +Reorg.prototype.handleConcurrentReorg = function(callback) { var self = this; - var mainPosition = self.db.tip.hash; - var forkPosition = block.hash; + if(self.db.concurrentTip.hash === self.db.tip.hash) { + return callback(); + } + + self.findCommonAncestorAndNewHashes(self.db.concurrentTip, self.db.tip, function(err, commonAncestor, newHashes) { + if(err) { + return callback(err); + } + + self.rewindConcurrentTip(commonAncestor, function(err) { + if(err) { + return callback(err); + } + + self.fastForwardConcurrentTip(newHashes, callback); + }); + }); +}; + +Reorg.prototype.rewindConcurrentTip = function(commonAncestor, callback) { + var self = this; + + async.whilst( + function() { + return self.db.concurrentTip.hash !== commonAncestor; + }, + function(next) { + self.db.getConcurrentBlockOperations(self.db.concurrentTip, false, function(err, operations) { + if(err) { + return next(err); + } + + operations.push(self.db.getConcurrentTipOperation(self.db.concurrentTip, false)); + self.db.store.batch(operations, function(err) { + if(err) { + return next(err); + } + + var prevHash = BufferUtil.reverse(self.db.concurrentTip.header.prevHash).toString('hex'); + + self.node.services.bitcoind.getBlock(prevHash, function(err, block) { + if(err) { + return next(err); + } + + self.db.concurrentTip = block; + next(); + }); + }); + }); + + + }, + callback + ); +}; + +Reorg.prototype.fastForwardConcurrentTip = function(newHashes, callback) { + var self = this; + + async.eachSeries(newHashes, function(hash, next) { + self.node.services.bitcoind.getBlock(hash, function(err, block) { + if(err) { + return next(err); + } + + self.db.getConcurrentBlockOperations(block, true, function(err, operations) { + if(err) { + return next(err); + } + + operations.push(self.db.getConcurrentTipOperation(block, true)); + self.db.store.batch(operations, function(err) { + if(err) { + return next(err); + } + + self.db.concurrentTip = block; + next(); + }); + }); + }); + }, callback); +}; + +Reorg.prototype.rewindBothTips = function(commonAncestor, callback) { + var self = this; + + async.whilst( + function() { + return self.db.tip.hash !== commonAncestor; + }, + function(next) { + async.parallel( + [ + function(next) { + self.db.getConcurrentBlockOperations(self.db.concurrentTip, false, function(err, operations) { + if(err) { + return next(err); + } + + operations.push(self.db.getConcurrentTipOperation(self.db.concurrentTip, false)); + next(null, operations); + }); + }, + function(next) { + self.db.getSerialBlockOperations(self.db.tip, false, function(err, operations) { + if(err) { + return next(err); + } + + operations.push(self.db.getTipOperation(self.db.concurrentTip, false)); + next(null, operations); + }); + } + ], + function(err, results) { + if(err) { + return callback(err); + } + + var operations = results[0].concat(results[1]); + self.db.store.batch(operations, function(err) { + if(err) { + return next(err); + } + + var prevHash = BufferUtil.reverse(self.db.tip.header.prevHash).toString('hex'); + + self.node.services.bitcoind.getBlock(prevHash, function(err, block) { + if(err) { + return next(err); + } + + self.db.concurrentTip = block; + self.db.tip = block; + next(); + }); + }); + } + ); + }, + callback + ); +}; + +Reorg.prototype.fastForwardBothTips = function(newHashes, callback) { + var self = this; + + async.eachSeries(newHashes, function(hash, next) { + self.node.services.bitcoind.getBlock(hash, function(err, block) { + if(err) { + return next(err); + } + + async.parallel( + [ + function(next) { + self.db.getConcurrentBlockOperations(block, true, function(err, operations) { + if(err) { + return next(err); + } + + operations.push(self.db.getConcurrentTipOperation(block, true)); + next(null, operations); + }); + }, + function(next) { + self.db.getSerialBlockOperations(block, true, function(err, operations) { + if(err) { + return next(err); + } + + operations.push(self.db.getTipOperation(block, true)); + next(null, operations); + }); + } + ], + function(err, results) { + if(err) { + return next(err); + } + + var operations = results[0].concat(results[1]); + + self.db.store.batch(operations, function(err) { + if(err) { + return next(err); + } + + self.db.concurrentTip = block; + self.db.tip = block; + next(); + }); + } + ); + }); + }, callback); +}; + +Reorg.prototype.findCommonAncestorAndNewHashes = function(oldTip, newTip, callback) { + var self = this; + + var mainPosition = oldTip.hash; + var forkPosition = newTip.hash; var mainHashesMap = {}; var forkHashesMap = {}; @@ -75,6 +250,7 @@ Reorg.prototype.findCommonAncestor = function(block, callback) { forkHashesMap[forkPosition] = true; var commonAncestor = null; + var newHashes = [forkPosition]; async.whilst( function() { @@ -116,6 +292,7 @@ Reorg.prototype.findCommonAncestor = function(block, callback) { if(forkBlockHeader && forkBlockHeader.prevHash) { forkHashesMap[forkBlockHeader.prevHash] = true; forkPosition = forkBlockHeader.prevHash; + newHashes.unshift(forkPosition); } else { forkPosition = null; } @@ -146,7 +323,25 @@ Reorg.prototype.findCommonAncestor = function(block, callback) { ) }, function(err) { - done(err, commonAncestor); + if(err) { + return callback(err); + } + + // New hashes are those that are > common ancestor + var commonAncestorFound = false; + for(var i = newHashes.length - 1; i >= 0; i--) { + if(newHashes[i] === commonAncestor) { + commonAncestorFound = true; + } + + if(commonAncestorFound) { + newHashes.shift(); + } + } + + callback(null, commonAncestor, newHashes); } ); -}; \ No newline at end of file +}; + +module.exports = Reorg; \ No newline at end of file diff --git a/lib/services/db/sync.js b/lib/services/db/sync.js index c3dde482..3d316b00 100644 --- a/lib/services/db/sync.js +++ b/lib/services/db/sync.js @@ -66,7 +66,7 @@ inherits(Sync, EventEmitter); Sync.prototype.initialSync = function() { var self = this; - if(this.syncing) { + if(this.syncing || this.db.reorg) { return; } @@ -99,7 +99,7 @@ Sync.prototype.initialSync = function() { // the tip together Sync.prototype.sync = function() { var self = this; - if(this.syncing) { + if(this.syncing || this.db.reorg) { return; } @@ -128,11 +128,16 @@ Sync.prototype.stop = function() { Sync.prototype._handleErrors = function(stream) { var self = this; - stream.on('error', function(err) { + stream.on('error', function(err, block) { self.syncing = false; if(err.reorg) { - return self.emit('reorg'); + return self.emit('reorg', block); + } + + if(err.reorg2) { + // squelch this error + return; } self.emit('error', err); @@ -207,6 +212,8 @@ BlockStream.prototype._read = function() { BlockStream.prototype._process = function() { var self = this; + // TODO push null when we've fully synced + if(this.processing) { return; } @@ -270,7 +277,7 @@ ProcessSerial.prototype._write = function(block, enc, callback) { if(prevHash !== self.tip.hash) { var err = new Error('Reorg detected'); err.reorg = true; - return callback(err); + return self.emit('error', err, block); } if(check()) { @@ -363,6 +370,12 @@ ProcessConcurrent.prototype._flush = function(callback) { WriteStream.prototype._write = function(obj, enc, callback) { var self = this; + if(self.db.reorg) { + var err = new Error('reorg in process'); + err.reorg2 = true; + return callback(err); + } + self.db.store.batch(obj.operations, function(err) { if(err) { return callback(err); @@ -381,6 +394,13 @@ WriteStream.prototype._write = function(obj, enc, callback) { ProcessBoth.prototype._write = function(block, encoding, callback) { var self = this; + + if(self.db.reorg) { + var err = new Error('reorg in process'); + err.reorg2 = true; + return callback(err); + } + async.parallel([function(next) { self.db.getConcurrentBlockOperations(block, true, function(err, operations) { if(err) { @@ -408,6 +428,7 @@ ProcessBoth.prototype._write = function(block, encoding, callback) { } self.db.tip = block; self.db.concurrentTip = block; + console.log('Tip:', self.db.tip.__height); callback(); }); }); diff --git a/test/services/db/reorg.unit.js b/test/services/db/reorg.unit.js new file mode 100644 index 00000000..0da7a02f --- /dev/null +++ b/test/services/db/reorg.unit.js @@ -0,0 +1,135 @@ +'use strict'; + +var should = require('chai').should(); +var sinon = require('sinon'); +var bitcore = require('bitcore-lib'); +var BufferUtil = bitcore.util.buffer; +var Reorg = require('../../../lib/services/db/reorg'); + +describe('Reorg', function() { + describe('full-test', function() { + before(function() { + sinon.stub(BufferUtil, 'reverse', function(input) { + return { + toString: function() { + return input; + } + } + }); + }); + + after(function() { + BufferUtil.reverse.restore(); + }); + + it('should handle a reorg correctly', function(done) { + var tipBlocks = [ + {hash: 'main0', header: {prevHash: null}}, + {hash: 'main1', header: {prevHash: 'main0'}}, + {hash: 'fork1', header: {prevHash: 'main1'}}, + {hash: 'fork2', header: {prevHash: 'fork1'}} + ]; + + var concurrentBlocks = [ + {hash: 'main0', header: {prevHash: null}}, + {hash: 'main1', header: {prevHash: 'main0'}}, + {hash: 'fork1', header: {prevHash: 'main1'}}, + {hash: 'fork2', header: {prevHash: 'fork1'}}, + {hash: 'fork3', header: {prevHash: 'fork2'}} + ]; + + var bitcoindBlocks = [ + {hash: 'main0', header: {prevHash: null}}, + {hash: 'main1', header: {prevHash: 'main0'}}, + {hash: 'main2', header: {prevHash: 'main1'}}, + {hash: 'main3', header: {prevHash: 'main2'}} + ]; + + var allBlocks = tipBlocks.concat(concurrentBlocks, bitcoindBlocks); + + var db = { + tip: tipBlocks[3], + concurrentTip: concurrentBlocks[4], + store: { + batch: sinon.stub().callsArg(1) + }, + getConcurrentBlockOperations: sinon.stub().callsArgWith(2, null, []), + getSerialBlockOperations: sinon.stub().callsArgWith(2, null, []), + getConcurrentTipOperation: sinon.stub().returns(null), + getTipOperation: sinon.stub().returns(null) + }; + + var node = { + services: { + bitcoind: { + getBlock: function(hash, callback) { + var block; + for(var i = 0; i < allBlocks.length; i++) { + if(allBlocks[i].hash === hash) { + block = allBlocks[i]; + } + } + + setImmediate(function() { + if(!block) { + return callback(new Error('Block not found: ' + hash)); + } + + callback(null, block); + }); + }, + getBlockHeader: function(hash, callback) { + var header; + for(var i = 0; i < allBlocks.length; i++) { + if(allBlocks[i].hash === hash) { + header = allBlocks[i].header; + } + } + + setImmediate(function() { + if(!header) { + return callback(new Error('Block header not found: ' + hash)); + } + + callback(null, header); + }); + } + } + } + } + + var reorg = new Reorg(node, db); + + reorg.handleReorg(bitcoindBlocks[3], function(err) { + should.not.exist(err); + + db.tip.hash.should.equal('main3'); + db.concurrentTip.hash.should.equal('main3'); + + db.getConcurrentBlockOperations.callCount.should.equal(5); + db.getConcurrentBlockOperations.args[0][0].should.equal(concurrentBlocks[4]); + db.getConcurrentBlockOperations.args[0][1].should.equal(false); + db.getConcurrentBlockOperations.args[1][0].should.equal(concurrentBlocks[3]); + db.getConcurrentBlockOperations.args[1][1].should.equal(false); + db.getConcurrentBlockOperations.args[2][0].should.equal(concurrentBlocks[2]); + db.getConcurrentBlockOperations.args[2][1].should.equal(false); + db.getConcurrentBlockOperations.args[3][0].should.equal(bitcoindBlocks[2]); + db.getConcurrentBlockOperations.args[3][1].should.equal(true); + db.getConcurrentBlockOperations.args[4][0].should.equal(bitcoindBlocks[3]); + db.getConcurrentBlockOperations.args[4][1].should.equal(true); + + db.getSerialBlockOperations.callCount.should.equal(4); + db.getSerialBlockOperations.args[0][0].should.deep.equal(tipBlocks[3]); + db.getSerialBlockOperations.args[0][1].should.equal(false); + db.getSerialBlockOperations.args[1][0].should.deep.equal(tipBlocks[2]); + db.getSerialBlockOperations.args[1][1].should.equal(false); + db.getSerialBlockOperations.args[2][0].should.deep.equal(bitcoindBlocks[2]); + db.getSerialBlockOperations.args[2][1].should.equal(true); + db.getSerialBlockOperations.args[3][0].should.deep.equal(bitcoindBlocks[3]); + db.getSerialBlockOperations.args[3][1].should.equal(true); + + done(); + }); + }); + }); +}); \ No newline at end of file