From 6e3cd9da854a34b2f34a8ea411d7b5b3772f1b07 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Fri, 2 May 2014 02:27:58 +0400 Subject: [PATCH] pool: wip --- lib/bcoin/chain.js | 9 ++++--- lib/bcoin/pool.js | 66 ++++++++++++++++++++++++---------------------- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 30afc97a..3933d323 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -20,18 +20,19 @@ module.exports = Chain; Chain.prototype.add = function add(block) { var res = false; do { - var rhash = block.hash(); - var prev = utils.toHex(block.prevBlock); - // No need to revalidate orphans if (!res && !block.verify()) break; + var rhash = block.hash(); + var prev = utils.toHex(block.prevBlock); + // Add orphan if (this.last && prev !== this.last) { if (!this.bloom.test(rhash) && !this.orphan.map[prev]) { this.orphan.count++; this.orphan.map[prev] = block; + this.bloom.add(rhash); } break; } @@ -63,5 +64,5 @@ Chain.prototype.getLast = function getLast() { Chain.prototype.has = function has(hash) { hash = utils.toHex(hash); - return this.bloom.test(hash) || !!this.orphan.map[hash]; + return this.bloom.test(hash); }; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 00f1efe8..1543b78a 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -7,17 +7,18 @@ function Pool(options) { return new Pool(options); this.options = options || {}; - this.size = options.size || 16; - this.parallel = options.parallel || 8000; - this.loadTimeout = options.loadTimeout || 10000; - this.loadWindow = options.loadWindow || 2500; - this.loadTimer = null; - this.loadWatermark = { - lo: options.lwm || 1000, - hi: options.hwm || 32000 + this.size = options.size || 3; + this.parallel = options.parallel || 1200; + this.load = { + timeout: options.loadTimeout || 5000, + window: options.loadWindow || 2500, + timer: null, + lwm: options.lwm || 1000, + hwm: options.hwm || 32000, + hiReached: false }; - this.maxRetries = options.maxRetries || 1000; - this.requestTimeout = options.requestTimeout || 15000; + this.maxRetries = options.maxRetries || 3000; + this.requestTimeout = options.requestTimeout || 30000; this.chain = new bcoin.chain(); this.bloom = new bcoin.bloom(8 * 10 * 1024, 10, @@ -36,6 +37,7 @@ function Pool(options) { active: 0, requests: {} }; + this.finished = 0; this.createConnection = options.createConnection; assert(this.createConnection); @@ -44,12 +46,13 @@ function Pool(options) { var self = this; setInterval(function() { - console.log('clen %d ocnt %d active %d queue %d reqs %d mem %d', + console.log('clen %d ocnt %d active %d queue %d reqs %d mem %d d %d', self.chain.chain.length, self.chain.orphan.count, self.block.active, self.block.queue.length, Object.keys(self.block.requests).length, - process.memoryUsage().heapUsed); + process.memoryUsage().heapUsed, + self.finished - self.chain.chain.length - self.chain.orphan.count); }, 5000); } module.exports = Pool; @@ -86,7 +89,7 @@ Pool.prototype._addLoader = function _addLoader() { function destroy() { peer.destroy(); } - var timer = setTimeout(destroy, this.loadTimeout); + var timer = setTimeout(destroy, this.load.timeout); // Split blocks and request them using multiple peers peer.on('blocks', function(hashes) { @@ -107,23 +110,23 @@ Pool.prototype._addLoader = function _addLoader() { clearTimeout(timer); // Reinstantiate timeout if (self._load()) - timer = setTimeout(destroy, self.loadTimeout); + timer = setTimeout(destroy, self.load.timeout); }); }; Pool.prototype._load = function load() { - if (this.block.queue.length >= this.loadWatermark.hi) + if (this.block.queue.length >= this.load.hwm) { + this.load.hiReached = true; return false; + } + this.load.hiReached = false; // Load more blocks, starting from last hash var hash; - if (this.block.lastSeen) { + if (this.block.lastSeen) hash = this.block.lastSeen; - console.log('Loading from (last): ' + utils.toHex(hash.slice().reverse())); - } else { + else hash = this.chain.getLast().hash(); - console.log('Loading from (chain): ' + utils.toHex(hash.slice().reverse())); - } this.peers.load.loadBlocks(hash); @@ -217,39 +220,39 @@ Pool.prototype._requestBlock = function _requestBlock(hash) { }; Pool.prototype._scheduleRequests = function _scheduleRequests() { - if (this.block.active > 100) + if (this.block.active > this.parallel / 2) return; // No need to wait - already have enough data if (this.block.queue.length > this.parallel) { - if (this.loadTimer !== null) - clearTimeout(this.loadTimer); - this.loadTimer = null; + if (this.load.timer !== null) + clearTimeout(this.load.timer); + this.load.timer = null; return this._doRequests(); } // Already waiting - if (this.loadTimer !== null) + if (this.load.timer !== null) return; var self = this; - this.loadTimer = setTimeout(function() { - self.loadTimer = null; + this.load.timer = setTimeout(function() { + self.load.timer = null; self._doRequests(); - }, this.loadWindow); + }, this.load.window); }; Pool.prototype._doRequests = function _doRequests() { if (this.block.active >= this.parallel) return; - var above = this.block.queue.length >= this.loadWatermark.lo; + var above = this.block.queue.length >= this.load.lwm; var items = this.block.queue.slice(0, this.parallel - this.block.active); this.block.queue = this.block.queue.slice(items.length); - var below = this.block.queue.length < this.loadWatermark.lo; + var below = this.block.queue.length < this.load.lwm; // Watermark boundary crossed, load more blocks - if (above && below) + if (above && below && this.load.hiReached) this._load(); // Split list between nodes @@ -353,4 +356,5 @@ BlockRequest.prototype.finish = function finish() { // We may have some free slots in queue now this.pool._scheduleRequests(); + this.pool.finished++; };