handle requesting directly by peer.

This commit is contained in:
Christopher Jeffrey 2016-01-29 13:49:40 -08:00
parent 294e5a5b37
commit b4abeceac3

View File

@ -464,7 +464,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) {
peer.host);
if (headers.length > 2000) {
peer.destroy();
self.misbehaving(peer, 100);
return;
}
@ -519,7 +519,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
peer.host);
if (hashes.length > 500) {
peer.destroy();
self.misbehaving(peer, 100);
return;
}
@ -578,7 +578,7 @@ Pool.prototype._handleInv = function _handleInv(hashes, peer) {
if (this.options.headers)
this.peers.load.loadHeaders(this.chain.locatorHashes(), hash);
else
this._request(this.block.type, hash);
this._request(this.block.type, hash, peer);
}
}
@ -833,7 +833,7 @@ Pool.prototype._createPeer = function _createPeer(backoff, priority) {
peer.on('txs', function(txs) {
self.emit('txs', txs, peer);
txs.forEach(function(hash) {
self._request('tx', hash);
self._request('tx', hash, peer);
});
self._scheduleRequests();
});
@ -1282,7 +1282,7 @@ Pool.prototype.search = function search(id, range, e) {
return e;
};
Pool.prototype._request = function _request(type, hash, options, cb) {
Pool.prototype._request = function _request(type, hash, peer, options, cb) {
if (typeof options === 'function') {
cb = options;
options = {};
@ -1306,7 +1306,7 @@ Pool.prototype._request = function _request(type, hash, options, cb) {
if (this.destroyed)
return;
var req = new LoadRequest(this, type, hash, cb);
var req = new LoadRequest(this, type, hash, peer, cb);
req.add(options.noQueue);
};
@ -1348,7 +1348,7 @@ Pool.prototype._scheduleRequests = function _scheduleRequests() {
Pool.prototype._doRequests = function _doRequests() {
var queue, above, items, below;
var red, count, split, i, off, req, j;
var mapReq;
var mapReq, peerItems, peers;
if (this.request.active >= this.parallel)
return;
@ -1367,6 +1367,38 @@ Pool.prototype._doRequests = function _doRequests() {
if (above && below && this.load.hiReached)
this._load();
if (items.length === 0)
return;
peerItems = items.filter(function(item) {
return item.initialPeer != null;
});
items = items.filter(function(item) {
return item.initialPeer == null;
});
if (peerItems.length > 0) {
peers = utils.uniq(peerItems.map(function(item) {
assert(item.initialPeer);
return item.initialPeer;
}));
peers.forEach(function(peer) {
var items = peerItems.filter(function(item) {
return item.initialPeer === peer;
});
assert(items.length);
utils.debug(
'Requesting %s/%s items from %s with getdata',
items.length,
this.request.queue.length,
peer.host);
peer.getData(items.map(function(item) {
return item.start(peer);
}));
}, this);
}
if (items.length === 0)
return;
@ -1402,7 +1434,7 @@ Pool.prototype._doRequests = function _doRequests() {
};
Pool.prototype.getBlock = function getBlock(hash, cb) {
this._request('block', hash, { force: true }, function(block) {
this._request('block', hash, null, { force: true }, function(block) {
cb(null, block);
});
this._scheduleRequests();
@ -1436,7 +1468,7 @@ Pool.prototype.getTX = function getTX(hash, range, cb) {
// Add request without queueing it to get notification at the time of load
tx = null;
finished = false;
req = this._request('tx', hash, { noQueue: true }, function(t) {
req = this._request('tx', hash, null, { noQueue: true }, function(t) {
finished = true;
tx = t;
});
@ -1782,7 +1814,7 @@ Pool.prototype.fromJSON = function fromJSON(json) {
return this;
};
function LoadRequest(pool, type, hash, cb) {
function LoadRequest(pool, type, hash, peer, cb) {
var self = this;
this.pool = pool;
@ -1794,6 +1826,7 @@ function LoadRequest(pool, type, hash, cb) {
this.ts = +new Date();
this.active = false;
this.noQueue = false;
this.initialPeer = peer || null;
this.onclose = function onclose() {
if (self.pool.destroyed)
@ -1819,7 +1852,7 @@ LoadRequest.prototype.start = function start(peer) {
// self.retry();
// }, this.pool.requestTimeout);
this.peer = peer;
this.peer = peer || this.initialPeer;
this.peer.once('close', this.onclose);
return {