diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 71ef673d..5f9037fc 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -52,7 +52,6 @@ function Peer(pool, options) { this.banScore = 0; this.orphans = 0; this.orphanTime = 0; - this.activeBlocks = 0; if (options.socket) { this.socket = options.socket; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 50a8d9a6..a0071fdc 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -84,6 +84,9 @@ function Pool(options) { interval: options.loadInterval || 5000 }; + this._pendingBlocks = []; + this._locked = false; + this.requestTimeout = options.requestTimeout || 600000; this.chain = new bcoin.chain({ @@ -133,8 +136,8 @@ function Pool(options) { this.request = { map: {}, active: 0, - queue: [], - activeBlocks: 0 + activeBlocks: 0, + activeTX: 0 }; this.validate = { @@ -516,10 +519,6 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { if (last && headers.length === 2000) peer.loadHeaders(this.chain.getLocator(last), null); - // If we're not currently handling a - // block, start the request cycle. - this._nextBlock(peer); - // Reset interval to avoid calling getheaders unnecessarily this._startInterval(); }; @@ -574,10 +573,6 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { } } - // If we're not currently handling a - // block, start the request cycle. - this._nextBlock(peer); - // Reset interval to avoid calling getblocks unnecessarily this._startInterval(); @@ -614,30 +609,26 @@ Pool.prototype._prehandleBlock = function _prehandleBlock(block, peer, callback) Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { var self = this; - this._pending = this._pending || []; - - if (this._lock) { - this._pending.push([block, peer, callback]); + if (this._locked) { + this._pendingBlocks.push([block, peer, callback]); return; } - this._lock = true; + this._locked = true; callback = utils.asyncify(callback); function done(err, result) { var item; - self._lock = false; - callback(err, result); - if (self._pending.length === 0) { - self._nextBlock(peer); - return; - } + self._locked = false; - item = self._pending.shift(); + if (self._pendingBlocks.length === 0) + return; + + item = self._pendingBlocks.shift(); self._handleBlock(item[0], item[1], item[2]); } @@ -781,16 +772,17 @@ Pool.prototype._addIndex = function _addIndex(block, peer, callback) { if (self.chain.height() % 20 === 0) { utils.debug( - 'Got: %s from %s chain len %d blocks %d orp %d act %d queue %d target %s peers %d', + 'Got: %s from %s chain len %d blocks %d orp %d act %d queue %d target %s peers %d pending %d', block.rhash, new Date(block.ts * 1000).toString(), self.chain.height(), self.block.total, self.chain.orphan.count, self.request.active, - self.request.queue.length, + peer._blockQueue.length, self.chain.currentTarget(), - self.peers.all.length); + self.peers.all.length, + self._pendingBlocks.length); } return callback(null, true); @@ -1576,9 +1568,9 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) { if (peer._txQueue.length === 0) { utils.nextTick(function() { utils.debug( - 'Requesting %d/%d items from %s with getdata', + 'Requesting %d/%d txs from %s with getdata', peer._txQueue.length, - self.request.active, + self.request.activeTX, peer.host); peer.getData(peer._txQueue); @@ -1594,42 +1586,25 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) { return; } + if (peer._blockQueue.length === 0) { + utils.nextTick(function() { + utils.debug( + 'Requesting %d/%d blocks from %s with getdata', + peer._blockQueue.length, + self.request.activeBlocks, + peer.host); + + peer.getData(peer._blockQueue); + peer._blockQueue.length = 0; + }); + } + peer._blockQueue.push({ type: type, hash: hash }); }; -Pool.prototype._nextBlock = function _nextBlock(peer) { - var item; - - if (peer.activeBlocks > 0) - return; - - // item = peer._blockQueue.shift(); - item = peer._blockQueue.slice(0, 500); - peer._blockQueue = peer._blockQueue.slice(500); - - if (peer.destroyed) - return; - - if (!item.length) - return; - - utils.debug( - 'Requesting block %s (%d/%d) from %s with getdata (height %d)', - utils.revHex(item[item.length - 1].hash), - peer._blockQueue.length, - this.request.active, - peer.host, - this.chain.height()); - - peer.activeBlocks += item.length; - this.request.activeBlocks += item.length; - - peer.getData(item); -}; - Pool.prototype._response = function _response(hash) { var hash; @@ -1785,10 +1760,6 @@ Pool.prototype.destroy = function destroy() { if (this.peers.load) this.peers.load.destroy(); - this.request.queue.slice().forEach(function(item) { - item.finish(); - }); - this.inv.list.forEach(function(entry) { clearTimeout(entry.timer); entry.timer = null; @@ -2015,6 +1986,10 @@ function LoadRequest(pool, peer, type, hash, cb) { this.peer.on('close', this._finish); this.pool.request.active++; + if (this.type === 'tx') + this.pool.request.activeTX++; + else + this.pool.request.activeBlocks++; assert(!this.pool.request.map[this.hash]); this.pool.request.map[this.hash] = this; @@ -2026,13 +2001,12 @@ LoadRequest.prototype.finish = function finish() { if (this.pool.request.map[this.hash]) { delete this.pool.request.map[this.hash]; this.pool.request.active--; - if (this.type !== 'tx') + if (this.type === 'tx') + this.pool.request.activeTX--; + else this.pool.request.activeBlocks--; } - if (this.type !== 'tx') - this.peer.activeBlocks--; - if (this.type === 'tx') { index = this.peer._txQueue.indexOf(this); if (index !== -1)