move queue to the chain.

This commit is contained in:
Christopher Jeffrey 2016-02-17 12:52:22 -08:00
parent d1b470805e
commit a485a1d334
2 changed files with 56 additions and 52 deletions

View File

@ -41,6 +41,8 @@ function Chain(options) {
this.tip = null;
this.mempool = options.mempool;
this.blockdb = options.blockdb;
this.locked = false;
this.pending = [];
this.orphan = {
map: {},
@ -784,10 +786,12 @@ Chain.prototype.add = function add(initial, peer, callback) {
var code = Chain.codes.unchanged;
var total = 0;
callback = utils.asyncify(callback);
if (this.locked) {
this.pending.push([initial, peer, callback]);
return;
}
if (this._locked)
return callback(null, total);
this.locked = true;
(function next(block) {
var hash = block.hash('hex');
@ -891,8 +895,11 @@ Chain.prototype.add = function add(initial, peer, callback) {
if (existing) {
// We already have this block. Do regular
// orphan resolution (won't do anything).
// NOTE: Wrap this in a nextTick to avoid
// a stack overflow if there are a lot of
// existing blocks.
if (existing.hash === hash)
return handleOrphans();
return utils.nextTick(handleOrphans);
// A valid block with an already existing
// height came in, that spells fork. We
@ -912,19 +919,21 @@ Chain.prototype.add = function add(initial, peer, callback) {
// The block has equal chainwork (an
// alternate tip). Reset the chain, find
// a new peer, and wait to see who wins.
self._locked = true;
return self._removeBlock(existing.hash, function(err) {
self._locked = false;
if (err)
return done(err);
self.resetHeight(entry.height - 1);
self.resetHeight(existing.height - 1);
self.emit('fork', {
height: prevHeight + 1,
height: existing.height,
expected: existing.hash,
received: hash,
checkpoint: false
}, peer);
code = Chain.codes.forked;
return done(null, code);
});
}
@ -1007,6 +1016,8 @@ Chain.prototype.add = function add(initial, peer, callback) {
})(initial);
function done(err, code) {
var item;
// Failsafe for large orphan chains. Do not
// allow more than 20mb stored in memory.
if (self.orphan.size > 20971520) {
@ -1021,15 +1032,33 @@ Chain.prototype.add = function add(initial, peer, callback) {
}
if (code !== Chain.codes.okay) {
if (0)
if (!(self.options.multiplePeers && code === Chain.codes.newOrphan))
utils.debug('Chain Error: %s', Chain.msg(code));
}
if (err)
return callback(err);
// We intentionally did not asyncify the
// callback so if it calls chain.add, it
// still gets added to the queue. The
// chain.add below needs to be in a nextTick
// so we don't cause a stack overflow if
// these end up being all sync chain.adds.
utils.nextTick(function() {
if (err)
callback(err);
else
callback(null, total);
return callback(null, total);
self.locked = false;
// Start resolving the queue
// (I love asynchronous IO).
if (self.pending.length === 0)
return;
item = self.pending.shift();
self.chain.add(item[0], item[1], item[2]);
});
}
};

View File

@ -84,9 +84,6 @@ function Pool(options) {
interval: options.loadInterval || 5000
};
this._pendingBlocks = [];
this._locked = false;
this.requestTimeout = options.requestTimeout || 600000;
this.chain = new bcoin.chain({
@ -609,35 +606,13 @@ Pool.prototype._prehandleBlock = function _prehandleBlock(block, peer, callback)
Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
var self = this;
if (this._locked) {
this._pendingBlocks.push([block, peer, callback]);
return;
}
this._locked = true;
callback = utils.asyncify(callback);
function done(err, result) {
var item;
callback(err, result);
self._locked = false;
if (self._pendingBlocks.length === 0)
return;
item = self._pendingBlocks.shift();
self._handleBlock(item[0], item[1], item[2]);
}
this._prehandleBlock(block, peer, function(err) {
var requested;
if (err)
return done(err);
return callback(err);
// Fulfill our request.
requested = self._response(block);
@ -655,7 +630,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
if (self.block.invalid[block.hash('hex')]) {
utils.debug('Peer is sending an invalid chain (%s)', peer.host);
self.setMisbehavior(peer, 100);
return done(null, false);
return callback(null, false);
}
// Ensure this is not a continuation
@ -665,7 +640,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
'Peer is sending an invalid continuation chain (%s)',
peer.host);
self.setMisbehavior(peer, 100);
return done(null, false);
return callback(null, false);
}
// Ignore if we already have.
@ -673,7 +648,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
utils.debug('Already have block %s (%s)',
self.chain.getHeight(block), peer.host);
self.setMisbehavior(peer, 1);
return done(null, false);
return callback(null, false);
}
// Make sure the block is valid.
@ -683,7 +658,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
block.rhash, peer.host);
self.block.invalid[block.hash('hex')] = true;
self.setMisbehavior(peer, 100);
return done(null, false);
return callback(null, false);
}
// Someone is sending us blocks without
@ -699,14 +674,14 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
if (!self.chain.hasBlock(block.prevBlock)) {
// Special case for genesis block.
if (block.isGenesis())
return done(null, false);
return callback(null, false);
// Make sure the peer doesn't send us
// more than 200 orphans every 3 minutes.
if (self.isOrphaning(peer)) {
utils.debug('Peer is orphaning (%s)', peer.host);
self.setMisbehavior(peer, 100);
return done(null, false);
return callback(null, false);
}
// NOTE: If we were to emit new orphans here, we
@ -715,7 +690,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
// the height until later.
self._addIndex(block, peer, function(err, added) {
if (err)
return done(err);
return callback(err);
// Resolve orphan chain.
self.peers.load.loadBlocks(
@ -725,7 +700,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
utils.debug('Handled orphan %s (%s)', block.rhash, peer.host);
return done(null, false);
return callback(null, false);
});
return;
@ -734,12 +709,12 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
if (!self.chain.hasBlock(block.prevBlock)) {
// Special case for genesis block.
if (block.isGenesis())
return done(null, false);
return callback(null, false);
// Increase banscore by 10 if we're using getheaders.
if (!self.options.multiplePeers) {
if (self.setMisbehavior(peer, 10))
return done(null, false);
return callback(null, false);
}
}
}
@ -747,12 +722,12 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
// Add to index and emit/save
self._addIndex(block, peer, function(err, added) {
if (err)
return done(err);
return callback(err);
if (added)
return done(null, true);
return callback(null, true);
return done(null, false);
return callback(null, false);
});
});
};
@ -782,7 +757,7 @@ Pool.prototype._addIndex = function _addIndex(block, peer, callback) {
peer._blockQueue.length,
self.chain.currentTarget(),
self.peers.all.length,
self._pendingBlocks.length);
self.chain.pending.length);
}
return callback(null, true);