Fixed edge case where new peer has unexpectedly low number of blocks.

This commit is contained in:
Chris Kleeschulte 2017-09-19 08:48:49 -04:00
parent de163ad4c9
commit 66e82a3fe7
No known key found for this signature in database
GPG Key ID: 33195D27EF6BDB7F
3 changed files with 166 additions and 63 deletions

View File

@ -470,6 +470,14 @@ BlockService.prototype._onAllHeaders = function() {
// once the header service has all of its headers, we know we can check our
// own tip for consistency and make sure our it is on the mainchain
var self = this;
if (self._syncing) {
return;
}
if (!self._initialSync) {
return self._sync();
}
self._checkTip(function(err) {
if(err) {
@ -480,6 +488,7 @@ BlockService.prototype._onAllHeaders = function() {
self._startSync();
});
};
@ -606,7 +615,7 @@ BlockService.prototype.onBlock = function(block, callback) {
BlockService.prototype._onBlock = function(block) {
if (this.node.stopping || this._reorging) {
if (this.node.stopping || this._reorging || this._tip.hash === block.rhash()) {
return;
}
@ -629,7 +638,7 @@ BlockService.prototype._setListeners = function() {
var self = this;
self._header.once('headers', self._onAllHeaders.bind(self));
self._header.on('headers', self._onAllHeaders.bind(self));
self._header.on('reorg', function(block, headers) {
@ -655,7 +664,7 @@ BlockService.prototype._setListeners = function() {
}, function(err) {
if (err) {
log.error(err);
self.nodes.stop();
self.node.stop();
}
});

View File

@ -30,7 +30,6 @@ var HeaderService = function(options) {
this.blockServiceSyncing = true;
this.lastBlockQueried = null;
this._initialSync = true;
this._blockQueue = [];
};
inherits(HeaderService, BaseService);
@ -206,6 +205,9 @@ HeaderService.prototype.start = function(callback) {
return callback(err);
}
// set block worker queue, concurrency 1
self._blockProcessor = async.queue(self._processBlocks.bind(self));
self._setListeners();
self._bus = self.node.openBus({remoteAddress: 'localhost-header'});
self._startHeaderSubscription();
@ -223,11 +225,6 @@ HeaderService.prototype.stop = function(callback) {
this._headerInterval = null;
}
if (this._blockProcessor) {
clearInterval(this._blockProcessor);
this._blockProcessor = null;
}
callback();
};
@ -252,56 +249,43 @@ HeaderService.prototype.getPublishEvents = function() {
};
// block handler for blocks from the p2p network
HeaderService.prototype._queueBlock = function(block) {
// this block was queried by the block service and, thus, we already have it
if (block.rhash() === this.lastBlockQueried) {
return;
}
// queue the block for _processBlock to process later
// we won't start processing blocks until we have all of our headers
// so if loading headers takes a really long time, we could have a long
// list of blocks. Warning will be logged if this happens.
this._blockQueue.push(block);
};
// we need this in case there is a deluge of blocks from a peer, asynchronously
HeaderService.prototype._processBlocks = function() {
var self = this;
var block = self._blockQueue.shift();
if (self._blockQueue.length > 2) {
// normally header sync is pretty quick, within a minute or two.
// under normal circumstances, we won't queue many blocks
log.warn('Header Service: Block queue has: ' + self._blockQueue.length + ' items.');
}
if (!block) {
if (block.rhash() === self.lastBlockQueried) {
return;
}
assert(block.rhash() !== self._lastHeader.hash, 'Trying to save a header that has already been saved.');
clearInterval(self._blockProcessor);
self._blockProcessor = null;
self._persistHeader(block, function(err) {
self._blockProcessor.push(block, function(err) {
if (err) {
log.error(err);
return self.node.stop();
}
log.info('Header Service: completed processing block: ' + block.rhash() + ' prev hash: ' + bcoin.util.revHex(block.prevBlock));
});
};
HeaderService.prototype._processBlocks = function(block, callback) {
var self = this;
assert(block.rhash() !== self._lastHeader.hash, 'Trying to save a header that has already been saved.');
self._persistHeader(block, function(err) {
if (err) {
return callback(err);
}
if (!self.blockServiceSyncing) {
self._broadcast(block);
}
self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000);
callback();
});
@ -324,7 +308,7 @@ HeaderService.prototype._persistHeader = function(block, callback) {
}
log.warn('Header Service: Reorganization detected, current tip hash: ' +
self._tip.hash + ' new block causing the reorg: ' + block.rhash() +
self._tip.hash + ', new block causing the reorg: ' + block.rhash() +
' common ancestor hash: ' + commonHeader.hash);
self._handleReorg(block, commonHeader, function(err) {
@ -389,6 +373,7 @@ HeaderService.prototype._onHeader = function(header) {
header.timestamp = header.time;
}
console.log('about to set last header from onHeader function');
this._lastHeader = header;
return [
@ -425,6 +410,8 @@ HeaderService.prototype._onHeaders = function(headers) {
header = header.toObject();
assert(self._lastHeader.hash === header.prevHash, 'headers not in order: ' + self._lastHeader.hash + ' -and- ' + header.prevHash + ' Last header at height: ' + self._lastHeader.height);
var ops = self._onHeader(header);
dbOps = dbOps.concat(ops);
@ -475,11 +462,6 @@ HeaderService.prototype._onHeadersSave = function(err) {
}
if (self._initialSync) {
// we'll turn the block processor on right after we've sync'ed headers
self._blockProcessor = setInterval(self._processBlocks.bind(self), 1000);
}
self._startBlockSubscription();
self._setBestHeader();
@ -635,6 +617,7 @@ HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader,
// setting our tip to the common ancestor
this._tip.hash = commonHeader.hash;
this._tip.height = commonHeader.height;
console.log('about to set last header from reorg function');
this._lastHeader = commonHeader;
this._db.batch(ops, callback);
@ -642,13 +625,11 @@ HeaderService.prototype._onReorg = function(reorgHeader, headers, commonHeader,
HeaderService.prototype._setListeners = function() {
this._p2p.once('bestHeight', this._onBestHeight.bind(this));
this._p2p.on('bestHeight', this._onBestHeight.bind(this));
};
HeaderService.prototype._onBestHeight = function(height) {
assert(height >= this._tip.height, 'Our peer does not seem to be fully synced: best height: ' +
height + ' tip height: ' + this._tip.height);
log.debug('Header Service: Best Height is: ' + height);
this._bestHeight = height;
this._startSync();
@ -656,11 +637,122 @@ HeaderService.prototype._onBestHeight = function(height) {
HeaderService.prototype._startSync = function() {
this._numNeeded = this._bestHeight - this._tip.height;
var self = this;
// if our tip height is less than the best height of this peer, then:
// 1. the peer is not fully synced.
// 2. the peer has reorg'ed and we need to handle this
log.info('Header Service: Gathering: ' + this._numNeeded + ' ' + 'header(s) from the peer-to-peer network.');
// unsub from listening for blocks
// ensure the blockProcessor is finished processing blocks (empty queue)
// then proceed with gathering new set(s) of headers
self._bus.unsubscribe('p2p/block');
async.retry(function(next) {
next(self._blockProcessor.length !== 0);
}, function() {
self._numNeeded = self._bestHeight - self._tip.height;
if (self._numNeeded > 0) {
log.info('Header Service: Gathering: ' + self._numNeeded + ' ' + 'header(s) from the peer-to-peer network.');
self._sync();
} else if (self._numNeeded < 0) {
self._handleLowTipHeight();
}
});
};
HeaderService.prototype._findReorgConditionInNewPeer = function(callback) {
var self = this;
var allHeaders = ;
var headerCount = 0;
var tipHash = self.GENESIS_HASH;
self._bus.removeAllListeners();
self._bus.on('p2p/headers', function(headers) {
headers.forEach(function(header) {
allHeaders[header.hash] = header.prevHash;
headerCount++;
});
if (headerCount < self._bestHeight) {
return self._p2p.getHeaders({ startHash: tipHash });
}
self.getAllHeaders(function(err, allHeaders) {
if (err) {
return callback(err);
}
if (allHeaders.get(
});
});
};
HeaderService.prototype._handleLowTipHeight = function() {
var self = this;
log.warn('Header Service: Connected Peer has a best height (' + self._bestHeight + ') which is lower than our tip height (' +
self._tip.height + '). This means that this peer is not fully synchronized with the network -or- the peer has reorganized itself.' +
' Checking the new peer\'s header for a reorganization event.');
self._findReorgConditionInNewPeer(function(err, reorgInfo) {
if (err) {
log.error(err);
return self.node.stop();
}
// case 1: there is a reorg condition in the new peer
if (reorgInfo) {
return self._p2p.getP2PBlock({
filter: {
startHash: reorgInfo.commonHeader.hash,
endHash: 0
},
blockHash: reorgInfo.blockHash
}, function(block) {
self._handleReorg(block, reorgInfo.commonHeader, function(err) {
if(err) {
log.error(err);
return self.node.stop();
}
self._syncBlock(block, function(err) {
if (err) {
log.error(err);
return self.node.stop();
}
self._sync();
});
});
});
}
// case 2: the new peer is just not synced up, so we must wait
self._sync();
});
this._sync();
};
@ -690,12 +782,14 @@ HeaderService.prototype._sync = function() {
self._p2p.getHeaders({ startHash: self._tip.hash });
// when connecting to a peer that isn't yet responding to getHeaders, we will start a interval timer
// to retry until we can get headers, this may be a very long interval
self._headerInterval = setInterval(function() {
log.info('Header Service: retrying get headers since ' + self._tip.hash);
self._p2p.getHeaders({ startHash: self._tip.hash });
}, 2000);
if (!self._headerInterval) {
// when connecting to a peer that isn't yet responding to getHeaders, we will start a interval timer
// to retry until we can get headers, this may be a very long interval
self._headerInterval = setInterval(function() {
log.info('Header Service: retrying get headers since ' + self._tip.hash);
self._p2p.getHeaders({ startHash: self._tip.hash });
}, 2000);
}
};

View File

@ -51,7 +51,7 @@ P2P.prototype.getNumberOfPeers = function() {
P2P.prototype.getP2PBlock = function(opts, callback) {
// opts is { filter: {<start and end hashes>}, blockHash: block hash we want }
this._currentRequest = opts.filter;
this._currentRequest = { opts: opts, callback: callback };
var peer = this._getPeer();
@ -344,7 +344,7 @@ P2P.prototype._onPeerReady = function(peer, addr) {
if (this._currentRequest) {
log.info('Restarting last query');
this.clearInventoryCache();
this.getP2PBlock(this._currentRequest);
this.getP2PBlock(this._currentRequest.opts, this._currentRequest.callback);
}
if (bestHeight >= 0) {