From a485a1d33462aa7519aa871352525263c8451bdb Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 17 Feb 2016 12:52:22 -0800 Subject: [PATCH] move queue to the chain. --- lib/bcoin/chain.js | 53 ++++++++++++++++++++++++++++++++++---------- lib/bcoin/pool.js | 55 +++++++++++++--------------------------------- 2 files changed, 56 insertions(+), 52 deletions(-) diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 73c863a5..82fb097b 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -41,6 +41,8 @@ function Chain(options) { this.tip = null; this.mempool = options.mempool; this.blockdb = options.blockdb; + this.locked = false; + this.pending = []; this.orphan = { map: {}, @@ -784,10 +786,12 @@ Chain.prototype.add = function add(initial, peer, callback) { var code = Chain.codes.unchanged; var total = 0; - callback = utils.asyncify(callback); + if (this.locked) { + this.pending.push([initial, peer, callback]); + return; + } - if (this._locked) - return callback(null, total); + this.locked = true; (function next(block) { var hash = block.hash('hex'); @@ -891,8 +895,11 @@ Chain.prototype.add = function add(initial, peer, callback) { if (existing) { // We already have this block. Do regular // orphan resolution (won't do anything). + // NOTE: Wrap this in a nextTick to avoid + // a stack overflow if there are a lot of + // existing blocks. if (existing.hash === hash) - return handleOrphans(); + return utils.nextTick(handleOrphans); // A valid block with an already existing // height came in, that spells fork. We @@ -912,19 +919,21 @@ Chain.prototype.add = function add(initial, peer, callback) { // The block has equal chainwork (an // alternate tip). Reset the chain, find // a new peer, and wait to see who wins. - self._locked = true; return self._removeBlock(existing.hash, function(err) { - self._locked = false; if (err) return done(err); - self.resetHeight(entry.height - 1); + + self.resetHeight(existing.height - 1); + self.emit('fork', { - height: prevHeight + 1, + height: existing.height, expected: existing.hash, received: hash, checkpoint: false }, peer); + code = Chain.codes.forked; + return done(null, code); }); } @@ -1007,6 +1016,8 @@ Chain.prototype.add = function add(initial, peer, callback) { })(initial); function done(err, code) { + var item; + // Failsafe for large orphan chains. Do not // allow more than 20mb stored in memory. if (self.orphan.size > 20971520) { @@ -1021,15 +1032,33 @@ Chain.prototype.add = function add(initial, peer, callback) { } if (code !== Chain.codes.okay) { - if (0) if (!(self.options.multiplePeers && code === Chain.codes.newOrphan)) utils.debug('Chain Error: %s', Chain.msg(code)); } - if (err) - return callback(err); + // We intentionally did not asyncify the + // callback so if it calls chain.add, it + // still gets added to the queue. The + // chain.add below needs to be in a nextTick + // so we don't cause a stack overflow if + // these end up being all sync chain.adds. + utils.nextTick(function() { + if (err) + callback(err); + else + callback(null, total); - return callback(null, total); + self.locked = false; + + // Start resolving the queue + // (I love asynchronous IO). + if (self.pending.length === 0) + return; + + item = self.pending.shift(); + + self.chain.add(item[0], item[1], item[2]); + }); } }; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index a0071fdc..ed760e12 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -84,9 +84,6 @@ function Pool(options) { interval: options.loadInterval || 5000 }; - this._pendingBlocks = []; - this._locked = false; - this.requestTimeout = options.requestTimeout || 600000; this.chain = new bcoin.chain({ @@ -609,35 +606,13 @@ Pool.prototype._prehandleBlock = function _prehandleBlock(block, peer, callback) Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { var self = this; - if (this._locked) { - this._pendingBlocks.push([block, peer, callback]); - return; - } - - this._locked = true; - callback = utils.asyncify(callback); - function done(err, result) { - var item; - - callback(err, result); - - self._locked = false; - - if (self._pendingBlocks.length === 0) - return; - - item = self._pendingBlocks.shift(); - - self._handleBlock(item[0], item[1], item[2]); - } - this._prehandleBlock(block, peer, function(err) { var requested; if (err) - return done(err); + return callback(err); // Fulfill our request. requested = self._response(block); @@ -655,7 +630,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { if (self.block.invalid[block.hash('hex')]) { utils.debug('Peer is sending an invalid chain (%s)', peer.host); self.setMisbehavior(peer, 100); - return done(null, false); + return callback(null, false); } // Ensure this is not a continuation @@ -665,7 +640,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { 'Peer is sending an invalid continuation chain (%s)', peer.host); self.setMisbehavior(peer, 100); - return done(null, false); + return callback(null, false); } // Ignore if we already have. @@ -673,7 +648,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { utils.debug('Already have block %s (%s)', self.chain.getHeight(block), peer.host); self.setMisbehavior(peer, 1); - return done(null, false); + return callback(null, false); } // Make sure the block is valid. @@ -683,7 +658,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { block.rhash, peer.host); self.block.invalid[block.hash('hex')] = true; self.setMisbehavior(peer, 100); - return done(null, false); + return callback(null, false); } // Someone is sending us blocks without @@ -699,14 +674,14 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { if (!self.chain.hasBlock(block.prevBlock)) { // Special case for genesis block. if (block.isGenesis()) - return done(null, false); + return callback(null, false); // Make sure the peer doesn't send us // more than 200 orphans every 3 minutes. if (self.isOrphaning(peer)) { utils.debug('Peer is orphaning (%s)', peer.host); self.setMisbehavior(peer, 100); - return done(null, false); + return callback(null, false); } // NOTE: If we were to emit new orphans here, we @@ -715,7 +690,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { // the height until later. self._addIndex(block, peer, function(err, added) { if (err) - return done(err); + return callback(err); // Resolve orphan chain. self.peers.load.loadBlocks( @@ -725,7 +700,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { utils.debug('Handled orphan %s (%s)', block.rhash, peer.host); - return done(null, false); + return callback(null, false); }); return; @@ -734,12 +709,12 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { if (!self.chain.hasBlock(block.prevBlock)) { // Special case for genesis block. if (block.isGenesis()) - return done(null, false); + return callback(null, false); // Increase banscore by 10 if we're using getheaders. if (!self.options.multiplePeers) { if (self.setMisbehavior(peer, 10)) - return done(null, false); + return callback(null, false); } } } @@ -747,12 +722,12 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { // Add to index and emit/save self._addIndex(block, peer, function(err, added) { if (err) - return done(err); + return callback(err); if (added) - return done(null, true); + return callback(null, true); - return done(null, false); + return callback(null, false); }); }); }; @@ -782,7 +757,7 @@ Pool.prototype._addIndex = function _addIndex(block, peer, callback) { peer._blockQueue.length, self.chain.currentTarget(), self.peers.all.length, - self._pendingBlocks.length); + self.chain.pending.length); } return callback(null, true);