add queue back to handleBlock.

This commit is contained in:
Christopher Jeffrey 2016-02-17 13:04:26 -08:00
parent a208083dc5
commit 7a04ac25d0
2 changed files with 43 additions and 17 deletions

View File

@ -43,6 +43,7 @@ function Chain(options) {
this.blockdb = options.blockdb; this.blockdb = options.blockdb;
this.locked = false; this.locked = false;
this.pending = []; this.pending = [];
this.orphanLimit = options.orphanLimit || 20 * 1024 * 1024;
this.orphan = { this.orphan = {
map: {}, map: {},
@ -1019,7 +1020,7 @@ Chain.prototype.add = function add(initial, peer, callback) {
// Failsafe for large orphan chains. Do not // Failsafe for large orphan chains. Do not
// allow more than 20mb stored in memory. // allow more than 20mb stored in memory.
if (self.orphan.size > 20971520) { if (self.orphan.size > self.orphanLimit) {
Object.keys(self.orphan.bmap).forEach(function(hash) { Object.keys(self.orphan.bmap).forEach(function(hash) {
self.emit('unresolved', self.orphan.bmap[hash], peer); self.emit('unresolved', self.orphan.bmap[hash], peer);
}); });

View File

@ -84,6 +84,9 @@ function Pool(options) {
interval: options.loadInterval || 5000 interval: options.loadInterval || 5000
}; };
this._pendingBlocks = [];
this._locked = false;
this.requestTimeout = options.requestTimeout || 600000; this.requestTimeout = options.requestTimeout || 600000;
this.chain = new bcoin.chain({ this.chain = new bcoin.chain({
@ -606,13 +609,35 @@ Pool.prototype._prehandleBlock = function _prehandleBlock(block, peer, callback)
Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
var self = this; var self = this;
callback = utils.asyncify(callback); if (this._locked) {
this._pendingBlocks.push([block, peer, callback]);
return;
}
this._locked = true;
function done(err, result) {
var item;
utils.nextTick(function() {
callback(err, result);
self._locked = false;
if (self._pendingBlocks.length === 0)
return;
item = self._pendingBlocks.shift();
self._handleBlock(item[0], item[1], item[2]);
});
}
this._prehandleBlock(block, peer, function(err) { this._prehandleBlock(block, peer, function(err) {
var requested; var requested;
if (err) if (err)
return callback(err); return done(err);
// Fulfill our request. // Fulfill our request.
requested = self._response(block); requested = self._response(block);
@ -622,7 +647,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
if (self.block.invalid[block.hash('hex')]) { if (self.block.invalid[block.hash('hex')]) {
utils.debug('Peer is sending an invalid chain (%s)', peer.host); utils.debug('Peer is sending an invalid chain (%s)', peer.host);
self.setMisbehavior(peer, 100); self.setMisbehavior(peer, 100);
return callback(null, false); return done(null, false);
} }
// Ensure this is not a continuation // Ensure this is not a continuation
@ -632,7 +657,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
'Peer is sending an invalid continuation chain (%s)', 'Peer is sending an invalid continuation chain (%s)',
peer.host); peer.host);
self.setMisbehavior(peer, 100); self.setMisbehavior(peer, 100);
return callback(null, false); return done(null, false);
} }
// Ignore if we already have. // Ignore if we already have.
@ -640,7 +665,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
utils.debug('Already have block %s (%s)', utils.debug('Already have block %s (%s)',
self.chain.getHeight(block), peer.host); self.chain.getHeight(block), peer.host);
self.setMisbehavior(peer, 1); self.setMisbehavior(peer, 1);
return callback(null, false); return done(null, false);
} }
// Make sure the block is valid. // Make sure the block is valid.
@ -650,7 +675,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
block.rhash, peer.host); block.rhash, peer.host);
self.block.invalid[block.hash('hex')] = true; self.block.invalid[block.hash('hex')] = true;
self.setMisbehavior(peer, 100); self.setMisbehavior(peer, 100);
return callback(null, false); return done(null, false);
} }
// Someone is sending us blocks without // Someone is sending us blocks without
@ -666,14 +691,14 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
if (!self.chain.hasBlock(block.prevBlock)) { if (!self.chain.hasBlock(block.prevBlock)) {
// Special case for genesis block. // Special case for genesis block.
if (block.isGenesis()) if (block.isGenesis())
return callback(null, false); return done(null, false);
// Make sure the peer doesn't send us // Make sure the peer doesn't send us
// more than 200 orphans every 3 minutes. // more than 200 orphans every 3 minutes.
if (self.isOrphaning(peer)) { if (self.isOrphaning(peer)) {
utils.debug('Peer is orphaning (%s)', peer.host); utils.debug('Peer is orphaning (%s)', peer.host);
self.setMisbehavior(peer, 100); self.setMisbehavior(peer, 100);
return callback(null, false); return done(null, false);
} }
// NOTE: If we were to emit new orphans here, we // NOTE: If we were to emit new orphans here, we
@ -682,7 +707,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
// the height until later. // the height until later.
self._addIndex(block, peer, function(err, added) { self._addIndex(block, peer, function(err, added) {
if (err) if (err)
return callback(err); return done(err);
// Resolve orphan chain. // Resolve orphan chain.
self.peers.load.loadBlocks( self.peers.load.loadBlocks(
@ -692,7 +717,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
utils.debug('Handled orphan %s (%s)', block.rhash, peer.host); utils.debug('Handled orphan %s (%s)', block.rhash, peer.host);
return callback(null, false); return done(null, false);
}); });
return; return;
@ -701,12 +726,12 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
if (!self.chain.hasBlock(block.prevBlock)) { if (!self.chain.hasBlock(block.prevBlock)) {
// Special case for genesis block. // Special case for genesis block.
if (block.isGenesis()) if (block.isGenesis())
return callback(null, false); return done(null, false);
// Increase banscore by 10 if we're using getheaders. // Increase banscore by 10 if we're using getheaders.
if (!self.options.multiplePeers) { if (!self.options.multiplePeers) {
if (self.setMisbehavior(peer, 10)) if (self.setMisbehavior(peer, 10))
return callback(null, false); return done(null, false);
} }
} }
} }
@ -714,12 +739,12 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
// Add to index and emit/save // Add to index and emit/save
self._addIndex(block, peer, function(err, added) { self._addIndex(block, peer, function(err, added) {
if (err) if (err)
return callback(err); return done(err);
if (added) if (added)
return callback(null, true); return done(null, true);
return callback(null, false); return done(null, false);
}); });
}); });
}; };
@ -749,7 +774,7 @@ Pool.prototype._addIndex = function _addIndex(block, peer, callback) {
peer._blockQueue.length, peer._blockQueue.length,
self.chain.currentTarget(), self.chain.currentTarget(),
self.peers.all.length, self.peers.all.length,
self.chain.pending.length); self._pendingBlocks.length);
} }
return callback(null, true); return callback(null, true);