From 60c3db7cb9b65f9b77acf59e7bc468f4211099ee Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Fri, 29 Jan 2016 18:23:03 -0800 Subject: [PATCH] better request handling. --- lib/bcoin/peer.js | 2 + lib/bcoin/pool.js | 456 ++++++++++++---------------------------------- 2 files changed, 117 insertions(+), 341 deletions(-) diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index fbcf425f..f593a6c8 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -82,6 +82,8 @@ function Peer(pool, createConnection, options) { interval: this.options.pingInterval || 30000 }; + this._queue = []; + Peer.uid.iaddn(1); this.id = Peer.uid.toString(10); diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index c58e3544..8a9235e5 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -56,8 +56,6 @@ function Pool(options) { this.destroyed = false; this.size = options.size || 32; - this.parallel = options.parallel || 2000; - this.redundancy = options.redundancy || 1; this._createConnection = options.createConnection; this._createSocket = options.createSocket; @@ -87,18 +85,10 @@ function Pool(options) { this.load = { timeout: options.loadTimeout || 30000, - interval: options.loadInterval || 5000, - window: options.loadWindow || 250, - lastRange: null, - rangeWindow: options.rangeWindow || 1000, - timer: null, - lwm: options.lwm || this.parallel * 2, - hwm: options.hwm || this.parallel * 8, - hiReached: false + interval: options.loadInterval || 5000 }; - this.maxRetries = options.maxRetries || 42; - this.requestTimeout = options.requestTimeout || 10000; + this.requestTimeout = options.requestTimeout || 30000; this.chain = new bcoin.chain({ fullNode: this.options.fullNode, @@ -312,6 +302,9 @@ Pool.prototype.createConnection = function createConnection(peer, pool, options) addr = pool.usableSeed(options.priority, true); + assert(addr); + assert(addr.host); + peer.host = addr.host; peer.port = addr.port; @@ -345,12 +338,6 @@ Pool.prototype._addLoader = function _addLoader() { if (this.peers.load != null) return; - if (!this.options.headers && !this.options.multiplePeers) { - this.request.map = {}; - this.request.active = 0; - this.request.queue = []; - } - peer = this._createPeer(750 * Math.random(), true); peer.once('socket', function() { @@ -454,7 +441,7 @@ Pool.prototype.stopSync = function stopSync() { }; Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { - var i, header, last, block; + var i, header, last, block, blockPeer; assert(this.options.headers); @@ -475,6 +462,12 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { for (i = 0; i < headers.length; i++) { block = bcoin.block(headers[i], 'header'); + blockPeer = peer; + + // if (this.options.multiplePeers) { + // if (this.peers.block.length) + // blockPeer = this.peers.block[i % this.peers.block.length]; + // } if (last && block.prevBlock !== last.hash('hex')) break; @@ -483,7 +476,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { break; if (!this.chain.has(block)) - this._request(this.block.type, block.hash('hex')); + this._request(this.block.type, block.hash('hex'), blockPeer); last = block; } @@ -505,7 +498,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { this._startInterval(); // Reset timeout to avoid killing the loader - this._startTimer(); + // this._startTimer(); }; Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { @@ -553,7 +546,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { // a failsafe because we _need_ to request // the hashContinue no matter what. if (!this.chain.has(hash) || i === hashes.length - 1) { - this._request(this.block.type, hash); + this._request(this.block.type, hash, peer); continue; } } @@ -715,35 +708,6 @@ Pool.prototype.isFull = function isFull() { return this.chain.isFull(); }; -Pool.prototype._loadRange = function _loadRange(hashes, force) { - var now = +new Date(); - var last; - - if (this.options.fullNode) - return; - - if (!hashes) - return; - - if (hashes.length <= 1) - return; - - // Limit number of requests - if (!force && now - this.load.lastRange < this.load.rangeWindow) - return; - - this.load.lastRange = now; - - if (!this.peers.load) - this._addLoader(); - - last = hashes[hashes.length - 1]; - - hashes.slice(0, -1).forEach(function(hash) { - this.peers.load.loadItems([hash], last); - }, this); -}; - Pool.prototype._load = function _load() { var self = this; var next; @@ -751,13 +715,6 @@ Pool.prototype._load = function _load() { if (!this.syncing) return true; - if (this.request.queue.length >= this.load.hwm) { - this.load.hiReached = true; - return false; - } - - this.load.hiReached = false; - if (!this.peers.load) { this._addLoader(); return true; @@ -810,7 +767,7 @@ Pool.prototype._createPeer = function _createPeer(backoff, priority) { items.forEach(function(item) { var req = self.request.map[utils.toHex(item.hash)]; if (req && req.peer === peer) - req.finish(null); + item.finish(); }); }); @@ -1313,18 +1270,28 @@ Pool.prototype.search = function search(id, range, e) { }; Pool.prototype._request = function _request(type, hash, peer, options, cb) { + var self = this; + var item; + if (typeof options === 'function') { cb = options; options = {}; } + if (this.destroyed) + return; + if (!options) options = {}; - hash = utils.toHex(hash); + if (utils.isBuffer(hash)) + hash = utils.toHex(hash); - if (this.request.map[hash]) - return this.request.map[hash].addCallback(cb); + if (this.request.map[hash]) { + if (cb) + this.request.map[hash].cb.push(cb); + return; + } // Block should be not in chain, or be requested // Do not use with headers-first @@ -1333,149 +1300,103 @@ Pool.prototype._request = function _request(type, hash, peer, options, cb) { return; } - if (this.destroyed) - return; + item = new LoadRequest(this, peer, type, hash, cb); - var req = new LoadRequest(this, type, hash, peer, cb); - req.add(options.noQueue); + if (peer._queue.length === 0) { + utils.nextTick(function() { + utils.debug( + 'Requesting %d/%d items from %s with getdata', + peer._queue.length, + self.request.active, + peer.host); + + peer.getData(peer._queue); + peer._queue.length = 0; + }); + } + + peer._queue.push({ + type: type, + hash: hash + }); }; -Pool.prototype._response = function _response(entity) { - var req = this.request.map[entity.hash('hex')]; - if (!req) +function LoadRequest(pool, peer, type, hash, cb) { + this.pool = pool; + this.peer = peer; + this.type = type; + this.hash = hash; + this.cb = []; + + if (cb) + this.cb.push(cb); + + this._finish = this.finish.bind(this); + + this.timeout = setTimeout(this._finish, this.pool.requestTimeout); + this.peer.on('close', this._finish); + + this.pool.request.active++; + + assert(!this.pool.request.map[this.hash]); + this.pool.request.map[this.hash] = this; +} + +LoadRequest.prototype.finish = function finish() { + var index; + + if (this.pool.request.map[this.hash]) { + delete this.pool.request.map[this.hash]; + this.pool.request.active--; + } + + index = this.peer._queue.indexOf(this); + if (index !== -1) + this.peer._queue.splice(index, 1); + + this.peer.removeListener('close', this._finish); + + if (this.timeout != null) { + clearTimeout(this.timeout); + delete this.timeout; + } +}; + +Pool.prototype._response = function _response(hash) { + var hash; + + if (utils.isBuffer(hash)) + hash = utils.toHex(hash); + else if (hash.hash) + hash = hash.hash('hex'); + + item = this.request.map[hash]; + if (!item) return false; - req.finish(entity); + + item.finish(); + + item.cb.forEach(function(cb) { + cb(); + }); + return true; }; Pool.prototype._scheduleRequests = function _scheduleRequests() { - var self = this; - - if (this.destroyed) - return; - - if (this.request.active > this.parallel / 2) - return; - - // No need to wait - already have enough data - if (this.request.queue.length > this.parallel) { - if (this.load.timer !== null) - clearTimeout(this.load.timer); - this.load.timer = null; - return this._doRequests(); - } - - // Already waiting - if (this.load.timer !== null) - return; - - this.load.timer = setTimeout(function() { - self.load.timer = null; - self._doRequests(); - }, this.load.window); }; Pool.prototype._doRequests = function _doRequests() { - var queue, above, items, below; - var red, count, split, i, off, req, j; - var mapReq, peerItems, peers, uniq; - - if (this.request.active >= this.parallel) - return; - - // No peers so far - if (this.peers.block.length === 0) - return; - - queue = this.request.queue; - above = queue.length >= this.load.lwm; - items = queue.slice(0, this.parallel - this.request.active); - this.request.queue = queue.slice(items.length); - below = this.request.queue.length < this.load.lwm; - - // Watermark boundary crossed, load more blocks - if (above && below && this.load.hiReached) - this._load(); - - if (items.length === 0) - return; - - peerItems = items.filter(function(item) { - return item.targetPeer != null; - }); - - items = items.filter(function(item) { - return item.targetPeer == null; - }); - - if (peerItems.length > 0) { - uniq = {}; - peers = []; - peerItems.forEach(function(item) { - if (uniq[item.targetPeer.id]) - return; - uniq[item.targetPeer.id] = true; - peers.push(item.targetPeer); - })); - peers.forEach(function(peer) { - var items = peerItems.filter(function(item) { - return item.targetPeer === peer; - }); - - assert(items.length); - - utils.debug( - 'Requesting %d/%d/%d items from %s with getdata', - items.length, - this.request.queue.length, - this.request.active, - peer.host); - - peer.getData(items.map(function(item) { - return item.start(); - })); - }, this); - } - - if (items.length === 0) - return; - - if (this.options.multiplePeers) { - mapReq = function(item) { - return item.start(this.peers.block[i]); - }; - - // Split list between peers - red = this.redundancy; - count = this.peers.block.length; - split = Math.ceil(items.length * red / count); - for (i = 0, off = 0; i < count; i += red, off += split) { - req = items.slice(off, off + split).map(mapReq, this); - for (j = 0; j < red && i + j < count; j++) - this.peers.block[i + j].getData(req); - } - - return; - } - - req = items.map(function(item) { - return item.start(this.peers.load); - }, this); - - utils.debug( - 'Requesting %d/%d/%d items from %s with getdata', - req.length, - this.request.queue.length, - this.request.active, - this.peers.load.host); - - this.peers.load.getData(req); }; Pool.prototype.getBlock = function getBlock(hash, cb) { - this._request('block', hash, null, { force: true }, function(block) { + if (!this.peers.load) + return setTimeout(this.getBlock.bind(this, hash, cb), 1000); + + this._request('block', hash, this.peers.load, { force: true }, function(block) { cb(null, block); }); + this._scheduleRequests(); }; @@ -1487,6 +1408,9 @@ Pool.prototype.getTX = function getTX(hash, range, cb) { var self = this; var cbs, tx, finished, req, delta; + if (!this.peers.load) + return setTimeout(this.getBlock.bind(this, hash, cb), 1000); + if (this.options.fullNode) return cb(new Error('Cannot get tx with full node')); @@ -1507,7 +1431,7 @@ Pool.prototype.getTX = function getTX(hash, range, cb) { // Add request without queueing it to get notification at the time of load tx = null; finished = false; - req = this._request('tx', hash, null, { noQueue: true }, function(t) { + req = this._request('tx', hash, this.peers.load, { noQueue: true }, function(t) { finished = true; tx = t; }); @@ -1602,7 +1526,7 @@ Pool.prototype.destroy = function destroy() { this.peers.load.destroy(); this.request.queue.slice().forEach(function(item) { - item.finish(null); + item.finish(); }); this.inv.list.forEach(function(entry) { @@ -1617,11 +1541,6 @@ Pool.prototype.destroy = function destroy() { this.peers.block.slice().forEach(function(peer) { peer.destroy(); }); - - if (this.load.timer) - clearTimeout(this.load.timer); - - this.load.timer = null; }; Pool.prototype.getPeer = function getPeer(addr) { @@ -1826,151 +1745,6 @@ Pool.prototype.isMisbehaving = function isMisbehaving(host) { return false; }; -Pool.prototype.toJSON = function toJSON() { - return { - v: 1, - type: 'pool', - chain: this.chain.toJSON(), - requests: this.request.queue.map(function(item) { - return { - type: item.type, - hash: item.hash - }; - }) - }; -}; - -Pool.prototype.fromJSON = function fromJSON(json) { - assert.equal(json.v, 1); - assert.equal(json.type, 'pool'); - - this.chain.fromJSON(json.chain); - - json.requests.forEach(function(item) { - this._request(item.type, item.hash); - }, this); - - return this; -}; - -function LoadRequest(pool, type, hash, peer, cb) { - var self = this; - - this.pool = pool; - this.type = type; - this.hash = hash; - this.cbs = cb ? [cb] : []; - this.timer = null; - this.peer = null; - this.ts = +new Date(); - this.active = false; - this.noQueue = false; - this.targetPeer = peer || null; - - this.onclose = function onclose() { - if (self.pool.destroyed) - self.clear(); - else - self.retry(); - }; -} - -LoadRequest.prototype.start = function start(peer) { - var self = this; - var reqType; - - assert(!this.active); - assert(!this.timer); - assert(!this.peer); - - this.active = true; - this.pool.request.active++; - - this.timer = setTimeout(function() { - self.timer = null; - self.retry(); - }, this.pool.requestTimeout); - - this.peer = peer || this.targetPeer; - this.peer.once('close', this.onclose); - - return { - type: this.type, - hash: this.hash - }; -}; - -LoadRequest.compare = function compare(a, b) { - return a.ts - b.ts; -}; - -LoadRequest.prototype.add = function add(noQueue) { - assert(!this.pool.request.map[this.hash]); - this.pool.request.map[this.hash] = this; - if (!noQueue) - this.pool.request.queue.push(this); - else - this.noQueue = true; -}; - -LoadRequest.prototype.clear = function clear() { - assert(this.active); - this.pool.request.active--; - this.active = false; - this.peer.removeListener('close', this.onclose); - this.peer = null; - clearTimeout(this.timer); - this.timer = null; -}; - -LoadRequest.prototype.retry = function retry() { - var peer = this.peer; - - // Put block into the queue, ensure that the queue is always sorted by ts - utils.binaryInsert(this.pool.request.queue, this, LoadRequest.compare); - - this.clear(); - - // Kill peer, if it misbehaves - if (++peer._retry > this.pool.maxRetries) - peer.destroy(); - - // And schedule requesting blocks again - this.pool._scheduleRequests(); -}; - -LoadRequest.prototype.finish = function finish(entity) { - var index; - - if (this.active) { - this.clear(); - index = this.pool.request.queue.indexOf(this); - assert(index !== -1 || this.noQueue); - } else { - // It could be that request was never sent to the node, remove it from - // queue and forget about it - index = this.pool.request.queue.indexOf(this); - assert(index !== -1 || this.noQueue); - if (!this.noQueue) - this.pool.request.queue.splice(index, 1); - assert(!this.peer); - assert(!this.timer); - } - delete this.pool.request.map[this.hash]; - - // We may have some free slots in queue now - this.pool._scheduleRequests(); - - this.cbs.forEach(function(cb) { - cb(entity); - }); -}; - -LoadRequest.prototype.addCallback = function addCallback(cb) { - if (cb) - this.cbs.push(cb); -}; - /** * Expose */