pool: wip

This commit is contained in:
Fedor Indutny 2014-05-02 02:27:58 +04:00
parent c71258103b
commit 6e3cd9da85
2 changed files with 40 additions and 35 deletions

View File

@ -20,18 +20,19 @@ module.exports = Chain;
Chain.prototype.add = function add(block) { Chain.prototype.add = function add(block) {
var res = false; var res = false;
do { do {
var rhash = block.hash();
var prev = utils.toHex(block.prevBlock);
// No need to revalidate orphans // No need to revalidate orphans
if (!res && !block.verify()) if (!res && !block.verify())
break; break;
var rhash = block.hash();
var prev = utils.toHex(block.prevBlock);
// Add orphan // Add orphan
if (this.last && prev !== this.last) { if (this.last && prev !== this.last) {
if (!this.bloom.test(rhash) && !this.orphan.map[prev]) { if (!this.bloom.test(rhash) && !this.orphan.map[prev]) {
this.orphan.count++; this.orphan.count++;
this.orphan.map[prev] = block; this.orphan.map[prev] = block;
this.bloom.add(rhash);
} }
break; break;
} }
@ -63,5 +64,5 @@ Chain.prototype.getLast = function getLast() {
Chain.prototype.has = function has(hash) { Chain.prototype.has = function has(hash) {
hash = utils.toHex(hash); hash = utils.toHex(hash);
return this.bloom.test(hash) || !!this.orphan.map[hash]; return this.bloom.test(hash);
}; };

View File

@ -7,17 +7,18 @@ function Pool(options) {
return new Pool(options); return new Pool(options);
this.options = options || {}; this.options = options || {};
this.size = options.size || 16; this.size = options.size || 3;
this.parallel = options.parallel || 8000; this.parallel = options.parallel || 1200;
this.loadTimeout = options.loadTimeout || 10000; this.load = {
this.loadWindow = options.loadWindow || 2500; timeout: options.loadTimeout || 5000,
this.loadTimer = null; window: options.loadWindow || 2500,
this.loadWatermark = { timer: null,
lo: options.lwm || 1000, lwm: options.lwm || 1000,
hi: options.hwm || 32000 hwm: options.hwm || 32000,
hiReached: false
}; };
this.maxRetries = options.maxRetries || 1000; this.maxRetries = options.maxRetries || 3000;
this.requestTimeout = options.requestTimeout || 15000; this.requestTimeout = options.requestTimeout || 30000;
this.chain = new bcoin.chain(); this.chain = new bcoin.chain();
this.bloom = new bcoin.bloom(8 * 10 * 1024, this.bloom = new bcoin.bloom(8 * 10 * 1024,
10, 10,
@ -36,6 +37,7 @@ function Pool(options) {
active: 0, active: 0,
requests: {} requests: {}
}; };
this.finished = 0;
this.createConnection = options.createConnection; this.createConnection = options.createConnection;
assert(this.createConnection); assert(this.createConnection);
@ -44,12 +46,13 @@ function Pool(options) {
var self = this; var self = this;
setInterval(function() { setInterval(function() {
console.log('clen %d ocnt %d active %d queue %d reqs %d mem %d', console.log('clen %d ocnt %d active %d queue %d reqs %d mem %d d %d',
self.chain.chain.length, self.chain.orphan.count, self.chain.chain.length, self.chain.orphan.count,
self.block.active, self.block.active,
self.block.queue.length, self.block.queue.length,
Object.keys(self.block.requests).length, Object.keys(self.block.requests).length,
process.memoryUsage().heapUsed); process.memoryUsage().heapUsed,
self.finished - self.chain.chain.length - self.chain.orphan.count);
}, 5000); }, 5000);
} }
module.exports = Pool; module.exports = Pool;
@ -86,7 +89,7 @@ Pool.prototype._addLoader = function _addLoader() {
function destroy() { function destroy() {
peer.destroy(); peer.destroy();
} }
var timer = setTimeout(destroy, this.loadTimeout); var timer = setTimeout(destroy, this.load.timeout);
// Split blocks and request them using multiple peers // Split blocks and request them using multiple peers
peer.on('blocks', function(hashes) { peer.on('blocks', function(hashes) {
@ -107,23 +110,23 @@ Pool.prototype._addLoader = function _addLoader() {
clearTimeout(timer); clearTimeout(timer);
// Reinstantiate timeout // Reinstantiate timeout
if (self._load()) if (self._load())
timer = setTimeout(destroy, self.loadTimeout); timer = setTimeout(destroy, self.load.timeout);
}); });
}; };
Pool.prototype._load = function load() { Pool.prototype._load = function load() {
if (this.block.queue.length >= this.loadWatermark.hi) if (this.block.queue.length >= this.load.hwm) {
this.load.hiReached = true;
return false; return false;
}
this.load.hiReached = false;
// Load more blocks, starting from last hash // Load more blocks, starting from last hash
var hash; var hash;
if (this.block.lastSeen) { if (this.block.lastSeen)
hash = this.block.lastSeen; hash = this.block.lastSeen;
console.log('Loading from (last): ' + utils.toHex(hash.slice().reverse())); else
} else {
hash = this.chain.getLast().hash(); hash = this.chain.getLast().hash();
console.log('Loading from (chain): ' + utils.toHex(hash.slice().reverse()));
}
this.peers.load.loadBlocks(hash); this.peers.load.loadBlocks(hash);
@ -217,39 +220,39 @@ Pool.prototype._requestBlock = function _requestBlock(hash) {
}; };
Pool.prototype._scheduleRequests = function _scheduleRequests() { Pool.prototype._scheduleRequests = function _scheduleRequests() {
if (this.block.active > 100) if (this.block.active > this.parallel / 2)
return; return;
// No need to wait - already have enough data // No need to wait - already have enough data
if (this.block.queue.length > this.parallel) { if (this.block.queue.length > this.parallel) {
if (this.loadTimer !== null) if (this.load.timer !== null)
clearTimeout(this.loadTimer); clearTimeout(this.load.timer);
this.loadTimer = null; this.load.timer = null;
return this._doRequests(); return this._doRequests();
} }
// Already waiting // Already waiting
if (this.loadTimer !== null) if (this.load.timer !== null)
return; return;
var self = this; var self = this;
this.loadTimer = setTimeout(function() { this.load.timer = setTimeout(function() {
self.loadTimer = null; self.load.timer = null;
self._doRequests(); self._doRequests();
}, this.loadWindow); }, this.load.window);
}; };
Pool.prototype._doRequests = function _doRequests() { Pool.prototype._doRequests = function _doRequests() {
if (this.block.active >= this.parallel) if (this.block.active >= this.parallel)
return; return;
var above = this.block.queue.length >= this.loadWatermark.lo; var above = this.block.queue.length >= this.load.lwm;
var items = this.block.queue.slice(0, this.parallel - this.block.active); var items = this.block.queue.slice(0, this.parallel - this.block.active);
this.block.queue = this.block.queue.slice(items.length); this.block.queue = this.block.queue.slice(items.length);
var below = this.block.queue.length < this.loadWatermark.lo; var below = this.block.queue.length < this.load.lwm;
// Watermark boundary crossed, load more blocks // Watermark boundary crossed, load more blocks
if (above && below) if (above && below && this.load.hiReached)
this._load(); this._load();
// Split list between nodes // Split list between nodes
@ -353,4 +356,5 @@ BlockRequest.prototype.finish = function finish() {
// We may have some free slots in queue now // We may have some free slots in queue now
this.pool._scheduleRequests(); this.pool._scheduleRequests();
this.pool.finished++;
}; };