From 1ac3208360163d91c281ee6468a512cbf07d3acb Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Sat, 3 May 2014 16:11:06 +0400 Subject: [PATCH] pool: refactor, make search work! --- lib/bcoin/bloom.js | 17 ++- lib/bcoin/chain.js | 239 +++++++++++++++++------------------ lib/bcoin/peer.js | 5 +- lib/bcoin/pool.js | 101 ++++++++------- lib/bcoin/protocol/framer.js | 7 +- 5 files changed, 197 insertions(+), 172 deletions(-) diff --git a/lib/bcoin/bloom.js b/lib/bcoin/bloom.js index 818d6299..9011b553 100644 --- a/lib/bcoin/bloom.js +++ b/lib/bcoin/bloom.js @@ -6,11 +6,11 @@ function Bloom(size, n, tweak) { return new Bloom(size, n, tweak); this.filter = new Array(Math.ceil(size / 32)); - for (var i = 0; i < this.filter.length; i++) - this.filter[i] = 0; this.size = size; this.n = n; this.tweak = tweak; + + this.reset(); } module.exports = Bloom; @@ -18,7 +18,14 @@ Bloom.prototype.hash = function hash(val, n) { return Bloom.hash(val, sum32(mul32(n, 0xfba4c795), this.tweak)) % this.size; }; -Bloom.prototype.add = function add(val) { +Bloom.prototype.reset = function reset() { + for (var i = 0; i < this.filter.length; i++) + this.filter[i] = 0; +}; + +Bloom.prototype.add = function add(val, enc) { + val = utils.toArray(val, enc); + for (var i = 0; i < this.n; i++) { var bit = this.hash(val, i); var pos = 1 << (bit & 0x1f); @@ -28,7 +35,9 @@ Bloom.prototype.add = function add(val) { } }; -Bloom.prototype.test = function test(val) { +Bloom.prototype.test = function test(val, enc) { + val = utils.toArray(val, enc); + for (var i = 0; i < this.n; i++) { var bit = this.hash(val, i); var pos = 1 << (bit & 0x1f); diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index f5d3e566..70e02b52 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -13,31 +13,77 @@ function Chain(options) { EventEmitter.call(this); this.options = options || {}; - this.blocks = []; - this.fifo = []; - this.hashes = preload.hashes.slice(); - this.ts = preload.ts.slice(); + this.block = { + list: [], + bloom: new bcoin.bloom(8 * 1024 * 1024, 16, 0xdeadbeef) + }; this.orphan = { map: {}, count: 0 }; - this.requests = { + this.index = { + bloom: new bcoin.bloom(28 * 1024 * 1024, 33, 0xdeadbee0), + hashes: preload.hashes.slice(), + ts: preload.ts.slice() + }; + this.request = { map: {}, count: 0 }; - this.bloom = new bcoin.bloom(28 * 1024 * 1024, 33, 0xdeadbeef); - if (this.hashes.length === 0) { + if (this.index.hashes.length === 0) this.add(new bcoin.block(constants.genesis)); - } else { - // Add all preloaded hashes to the bloom filter - for (var i = 0; i < this.hashes.length; i++) - this.bloom.add(utils.toArray(this.hashes[i], 'hex')); - } + + for (var i = 0; i < this.index.hashes.length; i++) + this.index.bloom.add(this.index.hashes[i], 'hex'); } util.inherits(Chain, EventEmitter); module.exports = Chain; +function compareTs(a, b) { + return a -b; +} + +Chain.prototype.probeIndex = function probeIndex(hash, ts) { + if (!this.index.bloom.test(hash, 'hex')) + return false; + + var start = 0; + var end = this.index.ts.length; + if (ts) { + start = utils.binaryInsert(this.index.ts, ts - 2 * 3600, compareTs, true); + start = Math.max(0, start - 1); + end = utils.binaryInsert(this.index.ts, ts + 2 * 3600, compareTs, true); + } + + for (var i = start; i < end; i++) + if (this.index.hashes[i] === hash) + return true; + + return false; +}; + +Chain.prototype.addIndex = function addIndex(hash, ts) { + if (this.probeIndex(hash, ts)) + return; + + var pos = utils.binaryInsert(this.index.ts, ts, compareTs); + this.index.hashes.splice(pos, 0, hash); + this.index.bloom.add(hash, 'hex'); +}; + +Chain.prototype.getRange = function getRange(ts) { + var start = utils.binaryInsert(this.index.ts, ts - 2 * 3600, compareTs, true); + var end = utils.binaryInsert(this.index.ts, ts + 2 * 3600, compareTs, true); + + if (start > 0) + start--; + if (end > 0) + end--; + + return { start: this.index.hashes[start], end: this.index.hashes[end] }; +}; + Chain.prototype.add = function add(block) { var res = false; var initial = block; @@ -47,152 +93,105 @@ Chain.prototype.add = function add(block) { break; var hash = block.hash('hex'); - var rhash = block.hash(); var prev = block.prevBlock; - // Validity period is two hours - var pos = utils.binaryInsert(this.ts, block.ts - 2 * 3600, function(a, b) { - return a - b; - }, true); - var test = this.bloom.test(rhash) && - prev !== initial && - !this.orphan.map[prev]; + // If previous block wasn't ever seen - add current to orphans + if (this.orphan.map[prev]) + break; + if (!this.probeIndex(hash, block.ts) && !this.probeIndex(prev, block.ts)) { + this.orphan.count++; + this.orphan.map[prev] = block; - var match = this.hashes.length === 0; - for (pos--; !match && 0 <= 0 && pos < this.hashes.length; pos++) - match = this.hashes[pos] === prev; - - // If last hash at ts matches prev hash or we already know this block - - // add it to either list or FIFO - if (!match && !test) { - // Add orphan - if (!this.orphan.map[prev]) { - this.orphan.count++; - this.orphan.map[prev] = block; - this.bloom.add(rhash); - this.emit('missing', prev); - } + // Add block to bloom filter, as we now have it in memory + this.block.bloom.add(hash, 'hex'); + this.emit('missing', prev, this.getRange(block.ts)); break; } - // It may be a re-requested block - if (!this.bloom.test(rhash)) { - // Sorted insert - var pos = utils.binaryInsert(this.ts, block.ts, function(a, b) { - return a - b; - }, true); + // Validated known block at this point - add it to index + this.addIndex(hash, block.ts); - this.ts.splice(pos, 0, block.ts); - this.hashes.splice(pos, 0, hash); - this.bloom.add(rhash); - } - - // Some old block for caching purposes, should be a FIFO - if (!this.covers(block.ts)) { - this.fifo.push(block); - - // A new block - } else { - // Insert block into a cache set if it isn't already there - var pos = utils.binaryInsert(this.blocks, block, function(a, b) { - return a.ts - b.ts; - }, true); - this.blocks.splice(pos, 0, block); - } - - // Fullfill requests - if (this.requests.map[hash]) { - var req = this.requests.map[hash]; - delete this.requests.map[hash]; - this.requests.count--; + // Fullfill request + if (this.request.map[hash]) { + var req = this.request.map[hash]; + delete this.request.map[hash]; + this.request.count--; req.forEach(function(cb) { cb(block); }); } + // At least one block was added res = true; + this.block.list.push(block); - // Compress old blocks - this._compress(); + if (!this.orphan.map[hash]) + break; // We have orphan child for this block - add it to chain - if (this.orphan.map[hash]) { - block = this.orphan.map[hash]; - delete this.orphan.map[hash]; - this.orphan.count--; - continue; - } - - break; + block = this.orphan.map[hash]; + delete this.orphan.map[hash]; + this.orphan.count--; } while (true); + // Compress old blocks + this._compress(); + return res; }; Chain.prototype._compress = function compress() { - // Store only last 1000 blocks and 1000 FIFO - if (this.blocks.length > 1000) - this.blocks = this.blocks.slice(-1000); - if (this.fifo.length > 1000) - this.fifo = this.fifo.slice(-1000); + // Keep at least 1000 blocks and at most 2000 + if (this.block.list.length < 2000) + return; + + // Bloom filter rebuilt is needed + this.block.list = this.block.list.slice(-1000); + this.block.bloom.reset(); + + for (var i = 0; i < this.block.list.length; i++) + this.block.bloom.add(this.block.list[i].hash()); }; -Chain.prototype.getLast = function getLast() { - return { - hash: this.hashes[this.hashes.length - 1], - ts: this.ts[this.ts.length - 1] - }; -}; - -Chain.prototype.has = function has(hash, cacheonly) { - if (!Array.isArray(hash)) - hash = utils.toArray(hash, 'hex'); - if (!cacheonly) - return this.bloom.test(hash) && !this.requests.map[utils.toHex(hash)]; - - for (var i = 0; i < this.blocks.length; i++) - if (this.blocks[i].hash('hex') === hash) - return true; - for (var i = 0; i < this.fifo.length; i++) - if (this.fifo[i].hash('hex') === hash) - return true; - - return false; +Chain.prototype.has = function has(hash) { + return this.probeIndex(hash) || !!this.orphan.map[hash]; }; Chain.prototype.get = function get(hash, cb) { - if (Array.isArray(hash)) - hash = utils.toHex(hash); + // Cached block found + if (this.block.bloom.test(hash, 'hex')) { + for (var i = 0; i < this.block.list.length; i++) + if (this.block.list[i].hash('hex') === hash) + return cb(this.block.list[i]); + assert(false); + } - for (var i = 0; i < this.blocks.length; i++) - if (this.blocks[i].hash('hex') === hash) - return cb(this.blocks[i]); - for (var i = 0; i < this.fifo.length; i++) - if (this.fifo[i].hash('hex') === hash) - return cb(this.fifo[i]); - - if (this.requests.map[hash]) { - this.requests.map[hash].push(cb); - this.requests.count++; + if (this.request.map[hash]) { + this.request.map[hash].push(cb); + this.request.count++; } else { - this.requests.map[hash] = [ cb ]; + this.request.map[hash] = [ cb ]; } this.emit('missing', hash); }; Chain.prototype.isFull = function isFull() { // < 10m since last block - return !this.requests.count && - (+new Date / 1000) - this.ts[this.ts.length - 1] < 10 * 60; + return !this.request.count && + (+new Date / 1000) - this.index.ts[this.index.ts.length - 1] < 10 * 60; }; Chain.prototype.covers = function covers(ts) { - return this.ts.length && this.ts[0] <= ts; + return ts >= this.index.ts[0]; }; -Chain.prototype.hashesFrom = function hashesFrom(ts) { - var pos = utils.binaryInsert(this.ts, ts, function(a, b) { - return a - b; - }, true); - return this.hashes.slice(pos); +Chain.prototype.hashesFrom = function hashesFrom(ts, index) { + var pos = utils.binaryInsert(this.index.ts, ts - 2 * 3600, compareTs, true); + if (pos > 0) + pos--; + return this.index.hashes.slice(pos); +}; + +Chain.prototype.getLast = function getLast() { + return this.index.hashes[this.index.hashes.length - 1]; }; diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index ba362b3c..4175c07a 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -284,6 +284,7 @@ Peer.prototype._handleInv = function handleInv(items) { this.getData(txs); }; -Peer.prototype.loadBlocks = function loadBlocks(hash) { - this._write(this.framer.getBlocks([ hash ])); +Peer.prototype.loadBlocks = function loadBlocks(hash, stop) { + this._write(this.framer.getBlocks(Array.isArray(hash) ? hash : [ hash ], + stop)); }; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 0f6d3368..c004b59a 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -12,18 +12,18 @@ function Pool(options) { EventEmitter.call(this); this.options = options || {}; - this.size = options.size || 4; - this.parallel = options.parallel || 1000; + this.size = options.size || 2; + this.parallel = options.parallel || 2000; this.load = { timeout: options.loadTimeout || 10000, - window: options.loadWindow || 2500, + window: options.loadWindow || 250, timer: null, lwm: options.lwm || this.parallel * 2, hwm: options.hwm || this.parallel * 8, hiReached: false }; - this.maxRetries = options.maxRetries || 50; - this.requestTimeout = options.requestTimeout || 10000; + this.maxRetries = options.maxRetries || 300; + this.requestTimeout = options.requestTimeout || 30000; this.chain = new bcoin.chain(); this.bloom = new bcoin.bloom(8 * 10 * 1024, 10, @@ -37,8 +37,7 @@ function Pool(options) { load: null }; this.block = { - nextLoad: [], - loadTs: Infinity, + lastHash: null, queue: [], active: 0, requests: {} @@ -58,28 +57,21 @@ Pool.prototype._init = function _init() { this._addPeer(); var self = this; - this.chain.on('missing', function(hash) { - self._requestBlock(hash); + this.chain.on('missing', function(hash, range) { + self._requestBlock(hash, true); self._scheduleRequests(); - self._load(); + self._loadRange(range); }); - - setInterval(function() { - console.log(self.chain.fifo.length, self.chain.blocks.length, self.chain.orphan.count, self.chain.hashes.length, self.chain.ts.length); - }, 5000); }; Pool.prototype._addLoader = function _addLoader() { if (this.peers.load !== null) return; + var socket = this.createConnection(); var peer = bcoin.peer(this, socket, this.options.peer); this.peers.load = peer; - // Queue block's hash for search - if (this.block.loadTs < Infinity) - this.block.nextLoad.push(this.chain.hashesFrom(this.block.loadTs)[0]); - var self = this; peer.once('error', function() { // Just ignore, it will result in `close` anyway @@ -100,7 +92,8 @@ Pool.prototype._addLoader = function _addLoader() { function destroy() { // Chain is full and up-to-date - if (self.block.nextLoad.length === 0 && self.chain.isFull()) { + if (self.block.lastHash === null && + self.chain.isFull()) { clearTimeout(timer); peer.removeListener('close', onclose); self._removePeer(peer); @@ -113,8 +106,8 @@ Pool.prototype._addLoader = function _addLoader() { // Split blocks and request them using multiple peers peer.on('blocks', function(hashes) { if (hashes.length === 0) { - // Reset search loads - self.block.loadTs = Infinity; + // Reset global load + self.block.lastHash = null; return; } @@ -125,22 +118,31 @@ Pool.prototype._addLoader = function _addLoader() { self._scheduleRequests(); - self.block.nextLoad.push(hashes[hashes.length - 1]); + // Store last hash to continue global load + self._lastHash = hashes[hashes.length - 1]; clearTimeout(timer); + // Reinstantiate timeout if (self._load()) timer = setTimeout(destroy, self.load.timeout); }); }; -Pool.prototype._load = function load(from) { - // Requested load of earlier block ids - if (from !== undefined && this.block.loadTs > from) { - this.block.loadTs = from; - this.block.nextLoad.push(this.chain.hashesFrom(from)[0]); - } +Pool.prototype._loadRange = function _loadRange(range) { + if (!range) + return; + // We will be requesting block anyway + if (range.start === range.end) + return; + + if (!this.peers.load) + this._addLoader(); + this.peers.load.loadBlocks(range.start, range.end); +}; + +Pool.prototype._load = function _load() { if (this.block.queue.length >= this.load.hwm) { this.load.hiReached = true; return false; @@ -149,15 +151,15 @@ Pool.prototype._load = function load(from) { // Load more blocks, starting from last hash var hash; - if (this.block.nextLoad.length) { - hash = this.block.nextLoad.shift(); - } else { - hash = this.chain.getLast().hash; - } + if (this.block.lastHash) + hash = this.block.lastHash; + else + hash = this.chain.getLast(); if (!this.peers.load) this._addLoader(); - this.peers.load.loadBlocks(hash); + else + this.peers.load.loadBlocks(hash); return true; }; @@ -256,18 +258,23 @@ Pool.prototype.search = function search(id, limit, cb) { if (!limit) limit = (+new Date / 1000 - 432000); + var self = this; + var hashes = this.chain.hashesFrom(limit); + var waiting = hashes.length; + this.watch(id); - if (!this.chain.covers(limit)) { - // Load some block hashes first - this._load(limit); - } else { - // Load the blocks themselves! - this.chain.hashesFrom(limit).forEach(function(hash) { - this._requestBlock(hash, true); - }, this); - this._scheduleRequests(); - } + hashes.slice().reverse().forEach(function(hash) { + // Get the block that is in index + this.chain.get(hash, function(block) { + // Get block's prev and request it and all of it's parents up to + // the next known block hash + self.chain.get(block.prevBlock, function() { + if (--waiting === 0) + cb(); + }); + }); + }, this); }; Pool.prototype._requestBlock = function _requestBlock(hash, force) { @@ -275,7 +282,7 @@ Pool.prototype._requestBlock = function _requestBlock(hash, force) { hash = utils.toArray(hash, 'hex'); // Block should be not in chain, or be requested - if (this.chain.has(hash, force)) + if (!force && this.chain.has(hash)) return; var hex = utils.toHex(hash); @@ -314,6 +321,10 @@ Pool.prototype._doRequests = function _doRequests() { if (this.block.active >= this.parallel) return; + // No peers so far + if (this.peers.block.length === 0) + return; + var above = this.block.queue.length >= this.load.lwm; var items = this.block.queue.slice(0, this.parallel - this.block.active); this.block.queue = this.block.queue.slice(items.length); diff --git a/lib/bcoin/protocol/framer.js b/lib/bcoin/protocol/framer.js index 7770c933..c0d79eff 100644 --- a/lib/bcoin/protocol/framer.js +++ b/lib/bcoin/protocol/framer.js @@ -199,7 +199,12 @@ Framer.prototype.getBlocks = function getBlocks(hashes, stop) { off += len; } - var len = stop ? utils.copy(stop, p, off) : 0; + if (stop) { + stop = utils.toArray(stop, 'hex'); + var len = utils.copy(stop, p, off); + } else { + var len = 0; + } for (; len < 32; len++) p[off + len] = 0; assert.equal(off + len, p.length);