From b4abeceac3ce039c09e44ae962987b91367c70f6 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Fri, 29 Jan 2016 13:49:40 -0800 Subject: [PATCH] handle requesting directly by peer. --- lib/bcoin/pool.js | 55 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 44 insertions(+), 11 deletions(-) diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 55a5ee6d..46b3f369 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -464,7 +464,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { peer.host); if (headers.length > 2000) { - peer.destroy(); + self.misbehaving(peer, 100); return; } @@ -519,7 +519,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { peer.host); if (hashes.length > 500) { - peer.destroy(); + self.misbehaving(peer, 100); return; } @@ -578,7 +578,7 @@ Pool.prototype._handleInv = function _handleInv(hashes, peer) { if (this.options.headers) this.peers.load.loadHeaders(this.chain.locatorHashes(), hash); else - this._request(this.block.type, hash); + this._request(this.block.type, hash, peer); } } @@ -833,7 +833,7 @@ Pool.prototype._createPeer = function _createPeer(backoff, priority) { peer.on('txs', function(txs) { self.emit('txs', txs, peer); txs.forEach(function(hash) { - self._request('tx', hash); + self._request('tx', hash, peer); }); self._scheduleRequests(); }); @@ -1282,7 +1282,7 @@ Pool.prototype.search = function search(id, range, e) { return e; }; -Pool.prototype._request = function _request(type, hash, options, cb) { +Pool.prototype._request = function _request(type, hash, peer, options, cb) { if (typeof options === 'function') { cb = options; options = {}; @@ -1306,7 +1306,7 @@ Pool.prototype._request = function _request(type, hash, options, cb) { if (this.destroyed) return; - var req = new LoadRequest(this, type, hash, cb); + var req = new LoadRequest(this, type, hash, peer, cb); req.add(options.noQueue); }; @@ -1348,7 +1348,7 @@ Pool.prototype._scheduleRequests = function _scheduleRequests() { Pool.prototype._doRequests = function _doRequests() { var queue, above, items, below; var red, count, split, i, off, req, j; - var mapReq; + var mapReq, peerItems, peers; if (this.request.active >= this.parallel) return; @@ -1367,6 +1367,38 @@ Pool.prototype._doRequests = function _doRequests() { if (above && below && this.load.hiReached) this._load(); + if (items.length === 0) + return; + + peerItems = items.filter(function(item) { + return item.initialPeer != null; + }); + + items = items.filter(function(item) { + return item.initialPeer == null; + }); + + if (peerItems.length > 0) { + peers = utils.uniq(peerItems.map(function(item) { + assert(item.initialPeer); + return item.initialPeer; + })); + peers.forEach(function(peer) { + var items = peerItems.filter(function(item) { + return item.initialPeer === peer; + }); + assert(items.length); + utils.debug( + 'Requesting %s/%s items from %s with getdata', + items.length, + this.request.queue.length, + peer.host); + peer.getData(items.map(function(item) { + return item.start(peer); + })); + }, this); + } + if (items.length === 0) return; @@ -1402,7 +1434,7 @@ Pool.prototype._doRequests = function _doRequests() { }; Pool.prototype.getBlock = function getBlock(hash, cb) { - this._request('block', hash, { force: true }, function(block) { + this._request('block', hash, null, { force: true }, function(block) { cb(null, block); }); this._scheduleRequests(); @@ -1436,7 +1468,7 @@ Pool.prototype.getTX = function getTX(hash, range, cb) { // Add request without queueing it to get notification at the time of load tx = null; finished = false; - req = this._request('tx', hash, { noQueue: true }, function(t) { + req = this._request('tx', hash, null, { noQueue: true }, function(t) { finished = true; tx = t; }); @@ -1782,7 +1814,7 @@ Pool.prototype.fromJSON = function fromJSON(json) { return this; }; -function LoadRequest(pool, type, hash, cb) { +function LoadRequest(pool, type, hash, peer, cb) { var self = this; this.pool = pool; @@ -1794,6 +1826,7 @@ function LoadRequest(pool, type, hash, cb) { this.ts = +new Date(); this.active = false; this.noQueue = false; + this.initialPeer = peer || null; this.onclose = function onclose() { if (self.pool.destroyed) @@ -1819,7 +1852,7 @@ LoadRequest.prototype.start = function start(peer) { // self.retry(); // }, this.pool.requestTimeout); - this.peer = peer; + this.peer = peer || this.initialPeer; this.peer.once('close', this.onclose); return {