pool improvements.

This commit is contained in:
Christopher Jeffrey 2016-02-17 05:08:19 -08:00
parent 8e81b8914d
commit 9b1effdc66
2 changed files with 39 additions and 66 deletions

View File

@ -52,7 +52,6 @@ function Peer(pool, options) {
this.banScore = 0;
this.orphans = 0;
this.orphanTime = 0;
this.activeBlocks = 0;
if (options.socket) {
this.socket = options.socket;

View File

@ -84,6 +84,9 @@ function Pool(options) {
interval: options.loadInterval || 5000
};
this._pendingBlocks = [];
this._locked = false;
this.requestTimeout = options.requestTimeout || 600000;
this.chain = new bcoin.chain({
@ -133,8 +136,8 @@ function Pool(options) {
this.request = {
map: {},
active: 0,
queue: [],
activeBlocks: 0
activeBlocks: 0,
activeTX: 0
};
this.validate = {
@ -516,10 +519,6 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) {
if (last && headers.length === 2000)
peer.loadHeaders(this.chain.getLocator(last), null);
// If we're not currently handling a
// block, start the request cycle.
this._nextBlock(peer);
// Reset interval to avoid calling getheaders unnecessarily
this._startInterval();
};
@ -574,10 +573,6 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
}
}
// If we're not currently handling a
// block, start the request cycle.
this._nextBlock(peer);
// Reset interval to avoid calling getblocks unnecessarily
this._startInterval();
@ -614,30 +609,26 @@ Pool.prototype._prehandleBlock = function _prehandleBlock(block, peer, callback)
Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
var self = this;
this._pending = this._pending || [];
if (this._lock) {
this._pending.push([block, peer, callback]);
if (this._locked) {
this._pendingBlocks.push([block, peer, callback]);
return;
}
this._lock = true;
this._locked = true;
callback = utils.asyncify(callback);
function done(err, result) {
var item;
self._lock = false;
callback(err, result);
if (self._pending.length === 0) {
self._nextBlock(peer);
return;
}
self._locked = false;
item = self._pending.shift();
if (self._pendingBlocks.length === 0)
return;
item = self._pendingBlocks.shift();
self._handleBlock(item[0], item[1], item[2]);
}
@ -781,16 +772,17 @@ Pool.prototype._addIndex = function _addIndex(block, peer, callback) {
if (self.chain.height() % 20 === 0) {
utils.debug(
'Got: %s from %s chain len %d blocks %d orp %d act %d queue %d target %s peers %d',
'Got: %s from %s chain len %d blocks %d orp %d act %d queue %d target %s peers %d pending %d',
block.rhash,
new Date(block.ts * 1000).toString(),
self.chain.height(),
self.block.total,
self.chain.orphan.count,
self.request.active,
self.request.queue.length,
peer._blockQueue.length,
self.chain.currentTarget(),
self.peers.all.length);
self.peers.all.length,
self._pendingBlocks.length);
}
return callback(null, true);
@ -1576,9 +1568,9 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) {
if (peer._txQueue.length === 0) {
utils.nextTick(function() {
utils.debug(
'Requesting %d/%d items from %s with getdata',
'Requesting %d/%d txs from %s with getdata',
peer._txQueue.length,
self.request.active,
self.request.activeTX,
peer.host);
peer.getData(peer._txQueue);
@ -1594,42 +1586,25 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) {
return;
}
if (peer._blockQueue.length === 0) {
utils.nextTick(function() {
utils.debug(
'Requesting %d/%d blocks from %s with getdata',
peer._blockQueue.length,
self.request.activeBlocks,
peer.host);
peer.getData(peer._blockQueue);
peer._blockQueue.length = 0;
});
}
peer._blockQueue.push({
type: type,
hash: hash
});
};
Pool.prototype._nextBlock = function _nextBlock(peer) {
var item;
if (peer.activeBlocks > 0)
return;
// item = peer._blockQueue.shift();
item = peer._blockQueue.slice(0, 500);
peer._blockQueue = peer._blockQueue.slice(500);
if (peer.destroyed)
return;
if (!item.length)
return;
utils.debug(
'Requesting block %s (%d/%d) from %s with getdata (height %d)',
utils.revHex(item[item.length - 1].hash),
peer._blockQueue.length,
this.request.active,
peer.host,
this.chain.height());
peer.activeBlocks += item.length;
this.request.activeBlocks += item.length;
peer.getData(item);
};
Pool.prototype._response = function _response(hash) {
var hash;
@ -1785,10 +1760,6 @@ Pool.prototype.destroy = function destroy() {
if (this.peers.load)
this.peers.load.destroy();
this.request.queue.slice().forEach(function(item) {
item.finish();
});
this.inv.list.forEach(function(entry) {
clearTimeout(entry.timer);
entry.timer = null;
@ -2015,6 +1986,10 @@ function LoadRequest(pool, peer, type, hash, cb) {
this.peer.on('close', this._finish);
this.pool.request.active++;
if (this.type === 'tx')
this.pool.request.activeTX++;
else
this.pool.request.activeBlocks++;
assert(!this.pool.request.map[this.hash]);
this.pool.request.map[this.hash] = this;
@ -2026,13 +2001,12 @@ LoadRequest.prototype.finish = function finish() {
if (this.pool.request.map[this.hash]) {
delete this.pool.request.map[this.hash];
this.pool.request.active--;
if (this.type !== 'tx')
if (this.type === 'tx')
this.pool.request.activeTX--;
else
this.pool.request.activeBlocks--;
}
if (this.type !== 'tx')
this.peer.activeBlocks--;
if (this.type === 'tx') {
index = this.peer._txQueue.indexOf(this);
if (index !== -1)