From 7a04ac25d084c261c660c1dab63aea4dffce0506 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 17 Feb 2016 13:04:26 -0800 Subject: [PATCH] add queue back to handleBlock. --- lib/bcoin/chain.js | 3 ++- lib/bcoin/pool.js | 57 +++++++++++++++++++++++++++++++++------------- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 78008297..6837e236 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -43,6 +43,7 @@ function Chain(options) { this.blockdb = options.blockdb; this.locked = false; this.pending = []; + this.orphanLimit = options.orphanLimit || 20 * 1024 * 1024; this.orphan = { map: {}, @@ -1019,7 +1020,7 @@ Chain.prototype.add = function add(initial, peer, callback) { // Failsafe for large orphan chains. Do not // allow more than 20mb stored in memory. - if (self.orphan.size > 20971520) { + if (self.orphan.size > self.orphanLimit) { Object.keys(self.orphan.bmap).forEach(function(hash) { self.emit('unresolved', self.orphan.bmap[hash], peer); }); diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 5753b396..d12dfa4c 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -84,6 +84,9 @@ function Pool(options) { interval: options.loadInterval || 5000 }; + this._pendingBlocks = []; + this._locked = false; + this.requestTimeout = options.requestTimeout || 600000; this.chain = new bcoin.chain({ @@ -606,13 +609,35 @@ Pool.prototype._prehandleBlock = function _prehandleBlock(block, peer, callback) Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { var self = this; - callback = utils.asyncify(callback); + if (this._locked) { + this._pendingBlocks.push([block, peer, callback]); + return; + } + + this._locked = true; + + function done(err, result) { + var item; + + utils.nextTick(function() { + callback(err, result); + + self._locked = false; + + if (self._pendingBlocks.length === 0) + return; + + item = self._pendingBlocks.shift(); + + self._handleBlock(item[0], item[1], item[2]); + }); + } this._prehandleBlock(block, peer, function(err) { var requested; if (err) - return callback(err); + return done(err); // Fulfill our request. requested = self._response(block); @@ -622,7 +647,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { if (self.block.invalid[block.hash('hex')]) { utils.debug('Peer is sending an invalid chain (%s)', peer.host); self.setMisbehavior(peer, 100); - return callback(null, false); + return done(null, false); } // Ensure this is not a continuation @@ -632,7 +657,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { 'Peer is sending an invalid continuation chain (%s)', peer.host); self.setMisbehavior(peer, 100); - return callback(null, false); + return done(null, false); } // Ignore if we already have. @@ -640,7 +665,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { utils.debug('Already have block %s (%s)', self.chain.getHeight(block), peer.host); self.setMisbehavior(peer, 1); - return callback(null, false); + return done(null, false); } // Make sure the block is valid. @@ -650,7 +675,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { block.rhash, peer.host); self.block.invalid[block.hash('hex')] = true; self.setMisbehavior(peer, 100); - return callback(null, false); + return done(null, false); } // Someone is sending us blocks without @@ -666,14 +691,14 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { if (!self.chain.hasBlock(block.prevBlock)) { // Special case for genesis block. if (block.isGenesis()) - return callback(null, false); + return done(null, false); // Make sure the peer doesn't send us // more than 200 orphans every 3 minutes. if (self.isOrphaning(peer)) { utils.debug('Peer is orphaning (%s)', peer.host); self.setMisbehavior(peer, 100); - return callback(null, false); + return done(null, false); } // NOTE: If we were to emit new orphans here, we @@ -682,7 +707,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { // the height until later. self._addIndex(block, peer, function(err, added) { if (err) - return callback(err); + return done(err); // Resolve orphan chain. self.peers.load.loadBlocks( @@ -692,7 +717,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { utils.debug('Handled orphan %s (%s)', block.rhash, peer.host); - return callback(null, false); + return done(null, false); }); return; @@ -701,12 +726,12 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { if (!self.chain.hasBlock(block.prevBlock)) { // Special case for genesis block. if (block.isGenesis()) - return callback(null, false); + return done(null, false); // Increase banscore by 10 if we're using getheaders. if (!self.options.multiplePeers) { if (self.setMisbehavior(peer, 10)) - return callback(null, false); + return done(null, false); } } } @@ -714,12 +739,12 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { // Add to index and emit/save self._addIndex(block, peer, function(err, added) { if (err) - return callback(err); + return done(err); if (added) - return callback(null, true); + return done(null, true); - return callback(null, false); + return done(null, false); }); }); }; @@ -749,7 +774,7 @@ Pool.prototype._addIndex = function _addIndex(block, peer, callback) { peer._blockQueue.length, self.chain.currentTarget(), self.peers.all.length, - self.chain.pending.length); + self._pendingBlocks.length); } return callback(null, true);