Fixed edge case reorg issues.

This commit is contained in:
Chris Kleeschulte 2017-09-24 12:34:27 -04:00
parent b160814706
commit a5f9d1a6d0
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
6 changed files with 505 additions and 501 deletions

View File

@ -10,6 +10,7 @@ var utils = require('../../utils');
var assert = require('assert');
var constants = require('../../constants');
var bcoin = require('bcoin');
var _ = require('lodash');
var BlockService = function(options) {
@ -23,7 +24,6 @@ var BlockService = function(options) {
this._subscriptions = {};
this._subscriptions.block = [];
this._subscriptions.reorg = [];
this._blockCount = 0;
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network];
@ -168,35 +168,62 @@ BlockService.prototype.getRawBlock = function(hash, callback) {
};
BlockService.prototype._checkTip = function(callback) {
// check to see if our own tip is no longer in the main chain
// (there was a reorg while we were shut down)
var self = this;
self._header.getBlockHeader(self._tip.height, function(err, header) {
if (err || !header) {
return callback(err || new Error('Header at height: ' + self._tip.height + ' was not found.'));
if (err) {
return callback(err);
}
header = header || self._header.getLastHeader();
if (header.hash === self._tip.hash) {
log.info('Block Service: saved tip is good to go.');
return callback();
}
// means the header service no longer tracks our tip on the main chain
// so we should consider ourselves in a reorg situation
self._header.getAllHeaders(function(err, headers) {
if(err || !headers) {
return callback(err || new Error('All headers not found.'));
self._findCommonAncestor(function(err, commonAncestorHash) {
if(err) {
return callback(err);
}
self._handleReorg(header.hash, headers, callback);
self._handleReorg(commonAncestorHash, callback);
});
});
};
BlockService.prototype._findCommonAncestor = function(callback) {
var self = this;
var hash = self._tip.hash;
self._header.getAllHeaders(function(err, headers) {
if(err || !headers) {
return callback(err || new Error('headers required.'));
}
async.until(function() {
return headers.get(hash);
}, function(next) {
self._getBlock(hash, function(err, block) {
if(err) {
return next(err);
}
hash = bcoin.util.revHex(block.prevBlock);
next();
});
}, function(err) {
if(err) {
return callback(err);
}
callback(null, hash);
});
});
};
BlockService.prototype.start = function(callback) {
var self = this;
@ -216,9 +243,12 @@ BlockService.prototype.start = function(callback) {
return callback(err);
}
self._blockProcessor = async.queue(self._onBlock.bind(self));
self._setListeners();
assert(tip.height >= 0, 'tip is not initialized');
self._setTip(tip);
self._setListeners();
self._bus = self.node.openBus({remoteAddress: 'localhost-block'});
callback();
});
@ -230,9 +260,24 @@ BlockService.prototype.stop = function(callback) {
};
BlockService.prototype.subscribe = function(name, emitter) {
this._subscriptions[name].push(emitter);
log.info(emitter.remoteAddress, 'subscribe:', 'block/' + name, 'total:', this._subscriptions[name].length);
};
BlockService.prototype._queueBlock = function(block) {
var self = this;
self._blockProcessor.push(block, function(err) {
if (err) {
return self._handleError(err);
}
log.debug('Block Service: completed processing block: ' + block.rhash() +
' prev hash: ' + bcoin.util.revHex(block.prevBlock) + ' height: ' + self._tip.height);
});
};
@ -267,48 +312,7 @@ BlockService.prototype._broadcast = function(subscribers, name, entity) {
};
BlockService.prototype._detectReorg = function(block) {
var prevHash = bcoin.util.revHex(block.prevBlock);
if (this._tip.hash !== prevHash) {
return true;
}
return false;
};
BlockService.prototype._getOldHeaders = function(commonAncestorHeader, allHeaders) {
var header = allHeaders.get(this._tip.hash);
assert(header, 'Header needed to find old blocks during reorg.');
var headers = [];
while (header.height > commonAncestorHeader.height) {
headers.push(header);
header = allHeaders.get(header.prevHash);
}
return headers;
};
BlockService.prototype._findCommonAncestor = function(hash, allHeaders, callback) {
// the prev hash of the passed in "hash" should also point to a block who has the same prev hash.
var self = this;
var reorgHeader = allHeaders.get(hash);
assert(reorgHeader, 'No reorg header found, cannot find common ancestor.');
var prevHash = reorgHeader.prevHash;
self.getBlock(prevHash, function(err, block) {
if (err) {
return callback(err);
}
var commonAncestorHeader = allHeaders.get(block.rhash());
callback(null, commonAncestorHeader);
});
return bcoin.util.revHex(block.prevBlock) !== this._tip.hash;
};
BlockService.prototype._getBlock = function(hash, callback) {
@ -354,53 +358,13 @@ BlockService.prototype._getHash = function(blockArg, callback) {
};
BlockService.prototype._handleReorg = function(reorgHash, allHeaders, callback) {
BlockService.prototype.onReorg = function(args, callback) {
var self = this;
var reorgHeader = allHeaders.get(reorgHash);
assert(reorgHeader, 'We were asked to reorg to a non-existent hash.');
var block = args[1][0];
self._findCommonAncestor(reorgHash, allHeaders, function(err, commonAncestorHeader) {
if (err) {
return callback(err);
}
assert(commonAncestorHeader, 'A common ancestor hash was found, but its header could not he found.');
// if we are syncing and we haven't sync'ed to the common ancestor hash, we can safely ignore this reorg
if (self._tip.height < commonAncestorHeader.height) {
return callback();
}
var reorgHeaders = self._getOldHeaders(commonAncestorHeader, allHeaders);
assert(reorgHeaders, 'Expected to have reorg headers to remove');
log.info('Block Service: A common ancestor block was found to at hash: ' + commonAncestorHeader.hash +
' and height: ' + commonAncestorHeader.height + '. Removing a total of: ' + reorgHeaders.length + ' block(s).');
self._processReorg(commonAncestorHeader, reorgHeaders, function(err) {
if(err) {
return callback(err);
}
callback();
});
});
};
// this JUST rewinds the chain back to the common ancestor block, nothing more
BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, callback) {
// set the tip to the common ancestor in case something goes wrong with the reorg
var self = this;
self._setTip({ hash: commonAncestorHeader.hash, height: commonAncestorHeader.height });
self._setTip({ hash: block.rhash(), height: self._tip.height - 1 });
var tipOps = utils.encodeTip(self._tip, self.name);
var removalOps = [{
@ -409,148 +373,249 @@ BlockService.prototype._onReorg = function(commonAncestorHeader, oldBlockList, c
value: tipOps.value
}];
// remove all the old blocks that we reorg from
oldBlockList.forEach(function(block) {
removalOps.push({
type: 'del',
key: self._encoding.encodeBlockKey(block.rhash()),
removalOps.push({
type: 'del',
key: self._encoding.encodeBlockKey(block.rhash()),
});
setImmediate(function() {
callback(null, removalOps);
});
};
BlockService.prototype._onReorg = function(commonAncestorHash, block, callback) {
var self = this;
var services = self.node.services;
async.mapSeries(services, function(service, next) {
if(!service.onReorg) {
return setImmediate(next);
}
service.onReorg.call(service, [commonAncestorHash, [block]], next);
}, callback);
};
BlockService.prototype._removeAllSubscriptions = function() {
this._bus.unsubscribe('p2p/block');
this._bus.removeAllListeners();
this._subscribedBlock = false;
};
BlockService.prototype.onHeaders = function(callback) {
var self = this;
async.retry(function(next) {
next(self._blockProcessor.length() !== 0);
}, function() {
self._checkTip(function(err) {
if(err) {
return callback(err);
}
self._startSync();
callback();
});
});
self._db.batch(removalOps, callback);
};
BlockService.prototype._onAllHeaders = function() {
// once the header service has all of its headers, we know we can check our
// own tip for consistency and make sure our it is on the mainchain
var self = this;
BlockService.prototype._startBlockSubscription = function() {
if (!self._initialSync) {
return self._sync();
if (this._subscribedBlock) {
return;
}
self._checkTip(function(err) {
this._subscribedBlock = true;
if(err) {
log.error(err);
return self.node.stop();
}
self._startSync();
});
log.info('Block Service: starting p2p block subscription.');
this._bus.on('p2p/block', this._queueBlock.bind(this));
this._bus.subscribe('p2p/block');
};
BlockService.prototype._processReorg = function(commonAncestorHeader, reorgHeaders, callback) {
BlockService.prototype._handleReorg = function(commonAncestorHash, callback) {
var self = this;
log.warn('Block Service: chain reorganization detected, current height/hash: ' + self._tip.height + '/' +
self._tip.hash + ' common ancestor hash: ' + commonAncestorHash);
var operations = [];
var services = self.node.services;
var tip = self._tip;
var blockCount = 0;
// process one block at a time just in case we have a huge amount of blocks to back out
async.eachSeries(reorgHeaders, function(header, next) {
// we don't know how many blocks we need to remove until we've reached the common ancestor
async.whilst(
function() {
return tip.hash !== commonAncestorHash;
},
async.waterfall([
function(next) {
self.getBlock(header.hash, next);
},
function(block, next) {
if (!block) {
return next(new Error('block not found for reorg.'));
}
self._timestamp.getTimestamp(header.hash, function(err, timestamp) {
if (err || !timestamp) {
return next(err || new Error('timestamp missing from reorg.'));
}
block.__height = header.height;
block.__ts = timestamp;
function(next) {
async.waterfall([
self._getReorgBlock.bind(self, tip),
function(block, next) {
tip = {
hash: bcoin.util.revHex(block.prevBlock),
height: tip.height - 1
};
next(null, block);
});
},
function(block, next) {
async.eachSeries(services, function(mod, next) {
if(mod.onReorg) {
mod.onReorg.call(mod, [commonAncestorHeader, [block]], function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
operations = operations.concat(ops);
}
self._onReorg(commonAncestorHeader, [block], next);
});
} else {
setImmediate(next);
}
}, next);
}
], next);
}, function(err) {
},
if (err) {
return callback(err);
}
function(block, next) {
self._onReorg(commonAncestorHash, block, next);
}
self._db.batch(operations, callback);
});
};
BlockService.prototype._processBlock = function(block) {
var self = this;
var operations = [];
var services = self.node.services;
async.eachSeries(
services,
function(mod, next) {
if(mod.onBlock) {
mod.onBlock.call(mod, block, function(err, ops) {
if (err) {
return next(err);
}
if (ops) {
operations = operations.concat(ops);
}
next();
});
} else {
setImmediate(next);
}
], function(err, ops) {
if(err) {
return next(err);
}
blockCount++;
operations = operations.concat(ops);
next();
});
},
function(err) {
if (err) {
return self._handleError(err);
return callback(err);
}
self._db.batch(operations, function(err) {
log.info('Block Service: removed ' + blockCount + ' block(s) during the reorganization event.');
self._db.batch(_.compact(_.flattenDeep(operations)), callback);
if (err) {
return self._handleError(err);
}
});
};
self._tip.height = self._tip.height + 1;
self._tip.hash = block.rhash();
var tipOps = utils.encodeTip(self._tip, self.name);
BlockService.prototype._getReorgBlock = function(tip, callback) {
self._db.put(tipOps.key, tipOps.value, function(err) {
var self = this;
if (err) {
return self._handleError(err);
}
self._getBlock(tip.hash, function(err, block) {
self._syncing = false;
self._reorging = false;
self._sync();
});
});
if (err || !block) {
return callback(err || new Error('block not found for reorg.'));
}
);
self._timestamp.getTimestamp(tip.hash, function(err, timestamp) {
if (err || !timestamp) {
return callback(err || new Error('timestamp missing from reorg.'));
}
block.__height = tip.height;
block.__ts = timestamp;
callback(null, block);
});
});
};
BlockService.prototype._onBlock = function(block, callback) {
var self = this;
self._getBlock(block.rhash(), function(err, _block) {
if(err) {
return self._handleError(err);
}
if (_block) {
log.debug('Block Service: not syncing, block already in database.');
return setImmediate(callback);
}
self._processBlock(block, callback);
});
};
BlockService.prototype._processBlock = function(block, callback) {
var self = this;
if (self.node.stopping) {
return callback();
}
log.debug('Block Service: new block: ' + block.rhash());
// common case
if (!self._detectReorg(block)) {
return setImmediate(function() {
self._saveBlock(block, callback);
});
}
// reorg
self._handleReorg(bcoin.util.revHex(block.prevBlock), function(err) {
if(err) {
return callback(err);
}
self._saveBlock(block, callback);
});
};
BlockService.prototype._saveBlock = function(block, callback) {
var self = this;
block.__height = self._tip.height + 1;
var services = self.node.services;
async.mapSeries(services, function(service, next) {
if(!service.onBlock) {
return setImmediate(next);
}
service.onBlock.call(service, block, next);
}, function(err, ops) {
if (err) {
return callback(err);
}
self._db.batch(_.compact(_.flattenDeep(ops)), function(err) {
if (err) {
return callback(err);
}
self._setTip({ hash: block.rhash(), height: self._tip.height + 1 });
var tipOps = utils.encodeTip(self._tip, self.name);
self._db.put(tipOps.key, tipOps.value, function(err) {
if(err) {
return callback(err);
}
callback();
});
});
});
};
BlockService.prototype._onBestHeight = function(height) {
log.info('Block Service: Best Height is: ' + height);
this._removeAllSubscriptions();
};
BlockService.prototype._setListeners = function() {
this._p2p.on('bestHeight', this._onBestHeight.bind(this));
};
BlockService.prototype._handleError = function(err) {
@ -560,6 +625,20 @@ BlockService.prototype._handleError = function(err) {
}
};
BlockService.prototype._syncBlock = function(block) {
var self = this;
self._saveBlock(block, function(err) {
if(err) {
return self._handleError(err);
}
if (self._tip.height < self._header.getLastHeader().height) {
return self.emit('next block');
}
self.emit('synced');
});
};
BlockService.prototype.onBlock = function(block, callback) {
var self = this;
@ -572,117 +651,88 @@ BlockService.prototype.onBlock = function(block, callback) {
});
};
BlockService.prototype.newBlock = function(block) {
if (this.node.stopping || this._tip.hash === block.rhash()) {
return;
}
// this service must receive blocks in order
var prevHash = bcoin.util.revHex(block.prevBlock);
if (this._tip.hash !== prevHash) {
return log.warn('received a block that was not asked for and not linked directly to our tip.');
}
log.debug('Block Service: new block: ' + block.rhash());
block.__height = this._tip.height + 1;
this._processBlock(block);
};
BlockService.prototype._setListeners = function() {
this._header.on('headers', this._onAllHeaders.bind(this));
};
BlockService.prototype.newReorg = function(block, headers, callback) {
var self = this;
self._reorging = true;
log.info('Block Service: detected a reorg from the header service.');
self._handleReorg(block.rhash(), headers, function(err) {
if (err) {
return callback(err);
}
self.newBlock(block);
async.retry({ interval: 100, times: 1000 }, function(next) {
next(self._reorging);
}, function(err) {
if (err) {
return callback(err);
}
callback();
});
});
};
BlockService.prototype._setTip = function(tip) {
log.debug('Block Service: Setting tip to height: ' + tip.height);
log.debug('Block Service: Setting tip to hash: ' + tip.hash);
this._tip = tip;
};
BlockService.prototype._onSynced = function() {
var self = this;
self._logProgress();
self._initialSync = false;
self._startBlockSubscription();
log.info('Block Service: The best block hash is: ' + self._tip.hash +
' at height: ' + self._tip.height);
};
BlockService.prototype._startSync = function() {
this._numNeeded = this._header.getLastHeader().height - this._tip.height;
var numNeeded = Math.max(this._header.getLastHeader().height - this._tip.height, 0);
log.info('Block Service: Gathering: ' + this._numNeeded + ' block(s) from the peer-to-peer network.');
log.info('Block Service: Gathering: ' + numNeeded + ' block(s) from the peer-to-peer network.');
if (numNeeded > 0) {
this.on('next block', this._sync.bind(this));
this.on('synced', this._onSynced.bind(this));
return this._sync();
}
this._onSynced();
this._sync();
};
BlockService.prototype._sync = function() {
var self = this;
if (self.node.stopping || self._syncing) {
if (self.node.stopping) {
return;
}
self._syncing = true;
var lastHeaderIndex = self._header.getLastHeader().height;
if (self._tip.height < lastHeaderIndex) {
if (self._tip.height % 144 === 0) {
log.info('Block Service: Blocks download progress: ' +
self._tip.height + '/' + lastHeaderIndex +
' (' + self._syncPercentage() + '%)');
}
return self._header.getNextHash(self._tip, function(err, targetHash, nextHash) {
if(err) {
log.error(err);
self.node.stop();
return;
}
self._header.lastBlockQueried = targetHash;
// to ensure that we can receive blocks that were previously delivered
// this will lead to duplicate transactions being sent
self._p2p.clearInventoryCache();
console.log('target hash: ' + targetHash);
self._p2p.getP2PBlock({
filter: {
startHash: self._tip.hash,
endHash: nextHash
},
blockHash: targetHash
}, self.newBlock.bind(self));
});
if (self._tip.height % 144 === 0) {
self._logProgress();
}
this._syncing = false;
this._initialSync = false;
log.info('Block Service: The best block hash is: ' + self._tip.hash +
' at height: ' + self._tip.height);
self._header.getNextHash(self._tip, function(err, targetHash, nextHash) {
if(err) {
return self._handleError(err);
}
// to ensure that we can receive blocks that were previously delivered
// this will lead to duplicate transactions being sent
self._p2p.clearInventoryCache();
self._p2p.getP2PBlock({
filter: {
startHash: self._tip.hash,
endHash: nextHash
},
blockHash: targetHash
}, self._syncBlock.bind(self));
});
};
BlockService.prototype._logProgress = function() {
if (!this._initialSync) {
return;
}
var progress;
var bestHeight = Math.max(this._header.getBestHeight(), this._tip.height);
if (bestHeight === 0) {
progress = 0;
} else {
progress = (this._tip.height/bestHeight*100.00).toFixed(2);
}
log.info('Block Service: download progress: ' + this._tip.height + '/' +
bestHeight + ' (' + progress + '%)');
};

View File

@ -27,7 +27,6 @@ var HeaderService = function(options) {
this._checkpoint = options.checkpoint || 2000; // set to -1 to resync all headers.
this.GENESIS_HASH = constants.BITCOIN_GENESIS_HASH[this.node.network];
this._lastHeader = null;
this.lastBlockQueried = null;
this._initialSync = true;
};
@ -209,8 +208,6 @@ HeaderService.prototype.start = function(callback) {
self._setListeners();
self._bus = self.node.openBus({remoteAddress: 'localhost-header'});
self._startHeaderSubscription();
callback();
});
@ -230,6 +227,11 @@ HeaderService.prototype.stop = function(callback) {
HeaderService.prototype._startHeaderSubscription = function() {
if (this._subscribedHeaders) {
return;
}
this._subscribedHeaders = true;
log.info('Header Service: subscribed to p2p headers.');
this._bus.on('p2p/headers', this._onHeaders.bind(this));
this._bus.subscribe('p2p/headers');
@ -251,15 +253,11 @@ HeaderService.prototype.getPublishEvents = function() {
HeaderService.prototype._queueBlock = function(block) {
var self = this;
if (block.rhash() === self.lastBlockQueried) {
return;
}
self._blockProcessor.push(block, function(err) {
if (err) {
log.error(err);
return self.node.stop();
return self._handleError(err);
}
log.debug('Header Service: completed processing block: ' + block.rhash() + ' prev hash: ' + bcoin.util.revHex(block.prevBlock));
@ -272,21 +270,21 @@ HeaderService.prototype._processBlocks = function(block, callback) {
var self = this;
assert(block.rhash() !== self._lastHeader.hash, 'Trying to save a header that has already been saved.');
if (self.node.stopping) {
return callback();
}
self._persistHeader(block, function(err) {
if (err) {
return callback(err);
self.getBlockHeader(block.rhash(), function(err, header) {
if(err) {
return self._handleError(err);
}
async.eachSeries(self.node.services, function(mod, next) {
if (!mod.newBlock) {
return setImmediate(next);
}
mod.newBlock.call(mod, block, next);
}, callback);
if (header) {
log.debug('Header Service: block already exists in data set.');
return callback();
}
self._persistHeader(block, callback);
});
};
@ -302,9 +300,7 @@ HeaderService.prototype._persistHeader = function(block, callback) {
}
if (!commonHeader) {
return self._syncBlock(block, callback);
}
self._handleReorg(block, commonHeader, function(err) {
@ -314,7 +310,6 @@ HeaderService.prototype._persistHeader = function(block, callback) {
}
self._syncBlock(block, callback);
});
});
@ -338,16 +333,7 @@ HeaderService.prototype._syncBlock = function(block, callback) {
log.debug('Header Service: new block: ' + block.rhash());
self._saveHeaders(self._onHeader(header), function(err) {
if (err) {
return callback(err);
}
self._onHeadersSave();
callback();
});
self._saveHeaders(self._onHeader(header), callback);
};
HeaderService.prototype._broadcast = function(block) {
@ -418,17 +404,21 @@ HeaderService.prototype._onHeaders = function(headers) {
self._saveHeaders(dbOps, function(err) {
if (err) {
log.error(err);
return self.node.stop();
return self._handleError(err);
}
self._onHeadersSave();
});
};
HeaderService.prototype._handleError = function(err) {
log.error(err);
this.node.stop();
};
HeaderService.prototype._saveHeaders = function(dbOps, callback) {
var tipOps = utils.encodeTip(this._tip, this.name);
var self = this;
var tipOps = utils.encodeTip(self._tip, self.name);
dbOps.push({
type: 'put',
@ -436,37 +426,52 @@ HeaderService.prototype._saveHeaders = function(dbOps, callback) {
value: tipOps.value
});
this._db.batch(dbOps, callback);
self._db.batch(dbOps, function(err) {
if(err) {
return callback(err);
}
self._onHeadersSave(callback);
});
};
HeaderService.prototype._onHeadersSave = function(err) {
HeaderService.prototype._onHeadersSave = function(callback) {
var self = this;
if (err) {
log.error(err);
self.node.stop();
return;
}
self._logProgress();
if (!self._syncComplete()) {
self._sync();
return;
return callback();
}
self._endHeaderSubscription();
self._startBlockSubscription();
self._setBestHeader();
if (!self._initialSync) {
return callback();
}
log.info('Header Service: sync complete.');
self._initialSync = false;
self.emit('headers');
async.eachSeries(self.node.services, function(service, next) {
if (service.onHeaders) {
return service.onHeaders.call(service, next);
}
setImmediate(next);
}, callback);
};
HeaderService.prototype._endHeaderSubscription = function() {
if (this._subscribedHeaders) {
this._subscribedHeaders = false;
log.info('Header Service: p2p header subscription no longer needed, unsubscribing.');
this._bus.unsubscribe('p2p/headers');
}
};
HeaderService.prototype._startBlockSubscription = function() {
if (this._subscribedBlock) {
@ -475,6 +480,7 @@ HeaderService.prototype._startBlockSubscription = function() {
this._subscribedBlock = true;
log.info('Header Service: starting p2p block subscription.');
this._bus.on('p2p/block', this._queueBlock.bind(this));
this._bus.subscribe('p2p/block');
@ -501,7 +507,6 @@ HeaderService.prototype._getHeader = function(height, hash, callback) {
return callback(new Error('invalid arguments'));
}
var key;
if (hash) {
key = self._encoding.encodeHeaderHashKey(hash);
@ -537,20 +542,7 @@ HeaderService.prototype._detectReorg = function(block, callback) {
return callback(null, false);
}
// first we check if the new block's prev hash is already in our data set
// if it is and this block isn't already in our data set too, then we have a reorg
async.waterfall([
function(next) {
self.getBlockHeader(block.rhash(), next);
},
function(header, next) {
if (header) {
return callback(null, false);
}
self.getBlockHeader(prevHash, next);
}
], function(err, header) {
self.getBlockHeader(prevHash, function(err, header) {
if (err) {
return callback(err);
}
@ -588,20 +580,7 @@ HeaderService.prototype._handleReorg = function(block, commonHeader, callback) {
headers.set(hash, reorgHeader); // appends to the end
// this will ensure our own headers collection is correct
self._onReorg(reorgHeader, headers, commonHeader, function(err) {
if (err) {
return callback(err);
}
async.eachSeries(self.node.services, function(mod, next) {
if (!mod.newReorg) {
return setImmediate(next);
}
mod.newReorg.call(mod, block, headers, next);
}, callback);
});
self._onReorg(reorgHeader, headers, commonHeader, callback);
});
};
@ -611,7 +590,9 @@ HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader,
var ops = [];
var startingHeight = this._tip.height;
var hash = this._tip.hash;
var headerCount = 0;
while(hash !== commonHeader.hash) {
headerCount++;
var header = headers.getIndex(startingHeight--);
assert(header, 'Expected to have a header at this height, but did not. Reorg failed.');
hash = header.prevHash;
@ -629,17 +610,16 @@ HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader,
this._tip.height = commonHeader.height;
this._lastHeader = commonHeader;
log.info('Header Service: removed ' + headerCount + ' header(s) during the reorganization event.');
this._db.batch(ops, callback);
};
HeaderService.prototype._setListeners = function() {
this._p2p.on('bestHeight', this._onBestHeight.bind(this));
};
HeaderService.prototype._onBestHeight = function(height) {
log.debug('Header Service: Best Height is: ' + height);
log.info('Header Service: Best Height is: ' + height);
this._bestHeight = height;
this._startSync();
};
@ -656,25 +636,37 @@ HeaderService.prototype._startSync = function() {
// ensure the blockProcessor is finished processing blocks (empty queue)
// then proceed with gathering new set(s) of headers
self._bus.unsubscribe('p2p/block');
self._initialSync = true;
log.debug('Header Service: starting sync routines, ensuring no pre-exiting subscriptions to p2p blocks.');
self._removeAllSubscriptions();
async.retry(function(next) {
next(self._blockProcessor.length !== 0);
next(self._blockProcessor.length() !== 0);
}, function() {
self._numNeeded = self._bestHeight - self._tip.height;
var numNeeded = self._bestHeight - self._tip.height;
// common case
if (self._numNeeded > 0) {
log.info('Header Service: Gathering: ' + self._numNeeded + ' ' + 'header(s) from the peer-to-peer network.');
self._sync();
} else if (self._numNeeded < 0) {
// this should be very uncommon
self._handleLowTipHeight();
if (numNeeded > 0) {
log.info('Header Service: Gathering: ' + numNeeded + ' ' + 'header(s) from the peer-to-peer network.');
return self._sync();
}
// next most common case
if (numNeeded === 0) {
log.info('Header Service: we seem to be already synced with the peer.');
return self._onHeadersSave(function(err) {
if(err) {
return self._handleError(err);
}
});
}
// this should be very uncommon
self._handleLowTipHeight();
});
};
@ -682,6 +674,8 @@ HeaderService.prototype._startSync = function() {
HeaderService.prototype._removeAllSubscriptions = function() {
this._bus.unsubscribe('p2p/headers');
this._bus.unsubscribe('p2p/block');
this._subscribedBlock = false;
this._subscribedHeaders = false;
this._bus.removeAllListeners();
};
@ -698,6 +692,8 @@ HeaderService.prototype._findReorgConditionInNewPeer = function(callback) {
return callback(err);
}
log.warn('Header Service: re-subscribing to p2p headers to gather new peer\'s headers.');
self._subscribedHeaders = true;
self._bus.subscribe('p2p/headers');
self._bus.on('p2p/headers', function(headers) {
@ -721,8 +717,6 @@ HeaderService.prototype._findReorgConditionInNewPeer = function(callback) {
if (oldHeader) {
self._removeAllSubscriptions();
// we found a common header, but no headers that at a greater height, this peer is not synced
if (!reorgInfo.blockHash) {
return callback();
@ -755,19 +749,22 @@ HeaderService.prototype._handleLowTipHeight = function() {
self._tip.height + '). This means that this peer is not fully synchronized with the network -or- the peer has reorganized itself.' +
' Checking the new peer\'s headers for a reorganization event.');
self._removeAllSubscriptions();
self._findReorgConditionInNewPeer(function(err, reorgInfo) {
if (err) {
log.error(err);
return self.node.stop();
return self._handleError(err);
}
// Our peer is not yet sync'ed.
// We will just turn on our block subscription and wait until we get a block we haven't seen
if (!reorgInfo) {
self._onHeadersSave();
log.info('Header Service: it appears that our peer is not yet synchronized with the network ' +
'(we have a strict superset of the peer\'s blocks). We will wait for more blocks to arrive...');
return self._onHeadersSave(function(err) {
if (err) {
return self._handleError(err);
}
});
}
// our peer has reorg'ed to lower overall height.
@ -783,19 +780,15 @@ HeaderService.prototype._handleLowTipHeight = function() {
self._handleReorg(block, reorgInfo.commonHeader, function(err) {
if(err) {
log.error(err);
return self.node.stop();
self._handleError(err);
}
self._syncBlock(block, function(err) {
if (err) {
log.error(err);
return self.node.stop();
self._handleError(err);
}
self._startHeaderSubscription();
});
});
@ -844,6 +837,7 @@ HeaderService.prototype._getP2PHeaders = function(hash) {
HeaderService.prototype._sync = function() {
this._startHeaderSubscription();
this._getP2PHeaders(this._tip.hash);
};

View File

@ -51,8 +51,6 @@ P2P.prototype.getNumberOfPeers = function() {
P2P.prototype.getP2PBlock = function(opts, callback) {
// opts is { filter: {<start and end hashes>}, blockHash: block hash we want }
this._currentRequest = { opts: opts, callback: callback };
var peer = this._getPeer();
var blockFilter = this._setResourceFilter(opts.filter, 'blocks');
@ -251,7 +249,7 @@ P2P.prototype._initPool = function() {
opts.listenAddr = false;
opts.maxPeers = this._maxPeers;
opts.network = this.node.network;
p2p.Pool.RetrySeconds = 5;
p2p.Pool.RetrySeconds = 3;
this._pool = new p2p.Pool(opts);
};
@ -341,13 +339,6 @@ P2P.prototype._onPeerReady = function(peer, addr) {
this._addPeer(peer);
var bestHeight = this._getBestHeight();
// if we have a current request, then we will restart that
if (this._currentRequest) {
log.info('Restarting last query');
this.clearInventoryCache();
this.getP2PBlock(this._currentRequest.opts, this._currentRequest.callback);
}
if (bestHeight >= 0) {
this.emit('bestHeight', bestHeight);
}

View File

@ -121,7 +121,7 @@ TimestampService.prototype.onBlock = function(block, callback) {
TimestampService.prototype.onReorg = function(args, callback) {
var self = this;
var commonAncestorHeader = args[0];
var commonAncestorHash = args[0];
var oldBlockList = args[1];
var removalOps = [];
@ -141,7 +141,7 @@ TimestampService.prototype.onReorg = function(args, callback) {
});
// look up the adjusted timestamp from our own database and set the lastTimestamp to it
self.getTimestamp(commonAncestorHeader.hash, function(err, timestamp) {
self.getTimestamp(commonAncestorHash, function(err, timestamp) {
if (err) {
return callback(err);
@ -158,13 +158,19 @@ TimestampService.prototype.getTimestampSync = function(hash) {
};
TimestampService.prototype.getTimestamp = function(hash, callback) {
var self = this;
self._db.get(self._encoding.encodeBlockTimestampKey(hash), function(err, data) {
if (err) {
return callback(err);
}
if (!data) {
return callback();
}
callback(null, self._encoding.decodeBlockTimestampValue(data));
});
};
TimestampService.prototype.getHash = function(timestamp, callback) {

View File

@ -6,6 +6,7 @@ var sinon = require('sinon');
var bcoin = require('bcoin');
var Block = bcoin.block;
var Encoding = require('../../../lib/services/block/encoding');
var utils = require('../../../lib/utils');
describe('Block Service', function() {
@ -43,53 +44,24 @@ describe('Block Service', function() {
});
});
describe('#_getOldBlocks', function() {
it('should get the old block list (blocks to be removed from indexes)', function(done) {
blockService._tip = { height: 10, hash: 'cc' };
var header = { height: 8, hash: 'aa' };
var getStub = sandbox.stub();
getStub.onCall(0).returns({ hash: 'bb', height: 9 });
getStub.onCall(1).returns({ hash: 'aa', height: 8 });
var allHeaders = { get: getStub };
sandbox.stub(blockService, 'getBlock').callsArgWith(1, null, {});
blockService._timestamp = { getTimestamp: sinon.stub().callsArgWith(1, null, 1234) };
blockService._getOldBlocks(header, allHeaders, function(err, blocks) {
if (err) {
return done(err);
}
expect(blocks.length).to.equal(1);
expect(blocks[0].__height).to.equal(9);
expect(blocks[0].__ts).to.equal(1234);
done();
});
});
});
describe('#_findCommonAncestor', function() {
it('should find the common ancestor between the current chain and the new chain', function(done) {
sandbox.stub(blockService, 'getBlock').callsArgWith(1, null, block1);
sandbox.stub(blockService, '_getOldBlocks').callsArgWith(2, null, [block2]);
sandbox.stub(blockService, '_getBlock').callsArgWith(1, null, block2);
blockService._tip = { hash: 'aa' };
var headers = new utils.SimpleMap();
headers.set('aa', 'someheader');
var allHeaders = { get: sandbox.stub().returns({ hash: 'somecommonancestor', prevHash: block1.rhash() }), size: 1e6 };
blockService._header = { getAllHeaders: sandbox.stub().callsArgWith(0, null, headers) };
blockService._findCommonAncestor('aa', allHeaders, function(err, commonAncestorHashActual, oldBlockList) {
blockService._findCommonAncestor(function(err, commonAncestorHashActual) {
if(err) {
return done(err);
}
expect(commonAncestorHashActual).to.equal('somecommonancestor');
expect(oldBlockList).to.deep.equal([block2]);
expect(commonAncestorHashActual).to.equal('aa');
done();
});
@ -127,31 +99,40 @@ describe('Block Service', function() {
describe('#_onBlock', function() {
it('should process blocks', function() {
var processBlock = sandbox.stub(blockService, '_processBlock');
blockService._tip = { hash: block1.rhash(), height: 1 };
blockService._header = { blockServiceSyncing: false };
blockService._onBlock(block2);
expect(processBlock.calledOnce).to.be.true;
it('should process blocks', function(done) {
var getBlock = sandbox.stub(blockService, '_getBlock').callsArgWith(1, null, null);
var processBlock = sandbox.stub(blockService, '_processBlock').callsArgWith(1, null);
blockService._onBlock(block2, function(err) {
if(err) {
return done(err);
}
expect(processBlock.calledOnce).to.be.true;
expect(getBlock.calledOnce).to.be.true;
done();
});
});
it('should not process blocks', function() {
it('should not process blocks', function(done) {
var getBlock = sandbox.stub(blockService, '_getBlock').callsArgWith(1, null, block2);
var processBlock = sandbox.stub(blockService, '_processBlock');
blockService._tip = { hash: block2.rhash(), height: 1 };
blockService._header = { blockServiceSyncing: false };
blockService._onBlock(block1);
expect(processBlock.calledOnce).to.be.false;
blockService._onBlock(block2, function(err) {
if(err) {
return done(err);
}
expect(getBlock.calledOnce).to.be.true;
expect(processBlock.called).to.be.false;
done();
});
});
});
describe('#_setListeners', function() {
it('should set listeners for headers, reorg', function() {
it('should set listeners for best height', function() {
var on = sandbox.stub();
blockService._header = { on: on };
blockService._p2p = { on: on };
blockService._setListeners();
expect(on.calledTwice).to.be.true;
expect(on.calledOnce).to.be.true;
});
});
@ -167,24 +148,10 @@ describe('Block Service', function() {
});
describe('#_startSubscriptions', function() {
it('should start the subscriptions if not already subscribed', function() {
var on = sinon.stub();
var subscribe = sinon.stub();
var openBus = sinon.stub().returns({ on: on, subscribe: subscribe });
blockService.node = { openBus: openBus };
blockService._startSubscriptions();
expect(blockService._subscribed).to.be.true;
expect(openBus.calledOnce).to.be.true;
expect(on.calledOnce).to.be.true;
expect(subscribe.calledOnce).to.be.true;
});
});
describe('#_startSync', function() {
it('should start the sync of blocks if type set', function() {
blockService._header = { getLastHeader: sinon.stub.returns({ height: 100 }) };
it('should start the sync of blocks', function() {
blockService._header = { getLastHeader: sinon.stub().returns({ height: 100 }) };
blockService._tip = { height: 98 };
var sync = sandbox.stub(blockService, '_sync');
blockService._startSync();
@ -200,6 +167,7 @@ describe('Block Service', function() {
var getServiceTip = sandbox.stub().callsArgWith(1, null, { height: 1, hash: 'aa' });
var setListeners = sandbox.stub(blockService, '_setListeners');
var setTip = sandbox.stub(blockService, '_setTip');
blockService.node = { openBus: sandbox.stub() };
blockService._db = { getPrefix: getPrefix, getServiceTip: getServiceTip };
blockService.start(function() {
expect(blockService._encoding).to.be.an.instanceof(Encoding);

View File

@ -51,7 +51,6 @@ describe('Header Service', function() {
headerService._db = { getPrefix: getPrefix, getServiceTip: getServiceTip, batch: sinon.stub() };
headerService.start(function() {
expect(_startHeaderSubscription.calledOnce).to.be.true;
expect(setGenesisBlock.calledOnce).to.be.true;
expect(getLastHeader.calledOnce).to.be.false;
expect(setListeners.calledOnce).to.be.true;
@ -101,31 +100,27 @@ describe('Header Service', function() {
describe('#_startSync', function() {
it('should start the sync process', function() {
headerService._bestHeight = 123;
var unsub = sinon.stub();
headerService._tip = { height: 120 };
headerService._bus = { unsubscribe: unsub };
headerService._blockProcessor = { length: 0 };
var getHeaders = sandbox.stub();
headerService._p2p = { getHeaders: getHeaders };
headerService._lastHeader = { height: 124 };
var removeAllSubs = sandbox.stub(headerService, '_removeAllSubscriptions');
headerService._blockProcessor = { length: sinon.stub().returns(0) };
headerService._bestHeight = 100;
headerService._tip = { height: 98 };
var sync = sandbox.stub(headerService, '_sync');
headerService._startSync();
expect(unsub.calledOnce).to.be.true;
expect(getHeaders.calledOnce).to.be.true;
expect(removeAllSubs.calledOnce).to.be.true;
expect(sync.calledOnce).to.be.true;
});
});
describe('#_sync', function() {
it('should sync header', function() {
headerService._numNeeded = 1000;
headerService._tip = { height: 121, hash: 'a' };
var getHeaders = sandbox.stub();
headerService._p2p = { getHeaders: getHeaders };
headerService._lastHeader = { height: 124 };
it('should sync headers', function() {
var startHeaderSub = sandbox.stub(headerService, '_startHeaderSubscription');
var getP2PHeaders = sandbox.stub(headerService, '_getP2PHeaders');
headerService._tip = { hash: 'aa' };
headerService._sync();
expect(getHeaders.calledOnce).to.be.true;
expect(getP2PHeaders.calledOnce).to.be.true;
expect(startHeaderSub.calledOnce).to.be.true;
});
});