diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index f2e5ea2a..f5d3e566 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -14,13 +14,17 @@ function Chain(options) { this.options = options || {}; this.blocks = []; + this.fifo = []; this.hashes = preload.hashes.slice(); this.ts = preload.ts.slice(); this.orphan = { map: {}, count: 0 }; - this.requests = {}; + this.requests = { + map: {}, + count: 0 + }; this.bloom = new bcoin.bloom(28 * 1024 * 1024, 33, 0xdeadbeef); if (this.hashes.length === 0) { @@ -36,21 +40,33 @@ module.exports = Chain; Chain.prototype.add = function add(block) { var res = false; + var initial = block; do { // No need to revalidate orphans if (!res && !block.verify()) break; + var hash = block.hash('hex'); var rhash = block.hash(); var prev = block.prevBlock; - var pos = utils.binaryInsert(this.ts, block.ts, function(a, b) { + + // Validity period is two hours + var pos = utils.binaryInsert(this.ts, block.ts - 2 * 3600, function(a, b) { return a - b; }, true); - var last = (this.hashes.length && pos > 0) ? this.hashes[pos - 1] : null; + var test = this.bloom.test(rhash) && + prev !== initial && + !this.orphan.map[prev]; - // Add orphan - if (last && prev !== last && !this.bloom.test(rhash)) { - if (!this.bloom.test(rhash) && !this.orphan.map[prev]) { + var match = this.hashes.length === 0; + for (pos--; !match && 0 <= 0 && pos < this.hashes.length; pos++) + match = this.hashes[pos] === prev; + + // If last hash at ts matches prev hash or we already know this block - + // add it to either list or FIFO + if (!match && !test) { + // Add orphan + if (!this.orphan.map[prev]) { this.orphan.count++; this.orphan.map[prev] = block; this.bloom.add(rhash); @@ -59,8 +75,6 @@ Chain.prototype.add = function add(block) { break; } - var hash = block.hash('hex'); - // It may be a re-requested block if (!this.bloom.test(rhash)) { // Sorted insert @@ -73,13 +87,24 @@ Chain.prototype.add = function add(block) { this.bloom.add(rhash); } - // Blocks is a FIFO queue, acting like a cache for the requests - this.blocks.push(block); + // Some old block for caching purposes, should be a FIFO + if (!this.covers(block.ts)) { + this.fifo.push(block); + + // A new block + } else { + // Insert block into a cache set if it isn't already there + var pos = utils.binaryInsert(this.blocks, block, function(a, b) { + return a.ts - b.ts; + }, true); + this.blocks.splice(pos, 0, block); + } // Fullfill requests - if (this.requests[hash]) { - var req = this.requests[hash]; - delete this.requests[hash]; + if (this.requests.map[hash]) { + var req = this.requests.map[hash]; + delete this.requests.map[hash]; + this.requests.count--; req.forEach(function(cb) { cb(block); }); @@ -105,11 +130,11 @@ Chain.prototype.add = function add(block) { }; Chain.prototype._compress = function compress() { - // Store only last 1000 blocks, others will be requested if needed - if (this.blocks.length < 1000) - return; - - this.blocks = this.blocks.slice(this.blocks.length - 1000); + // Store only last 1000 blocks and 1000 FIFO + if (this.blocks.length > 1000) + this.blocks = this.blocks.slice(-1000); + if (this.fifo.length > 1000) + this.fifo = this.fifo.slice(-1000); }; Chain.prototype.getLast = function getLast() { @@ -119,10 +144,20 @@ Chain.prototype.getLast = function getLast() { }; }; -Chain.prototype.has = function has(hash) { +Chain.prototype.has = function has(hash, cacheonly) { if (!Array.isArray(hash)) hash = utils.toArray(hash, 'hex'); - return this.bloom.test(hash) && !this.requests[utils.toHex(hash)]; + if (!cacheonly) + return this.bloom.test(hash) && !this.requests.map[utils.toHex(hash)]; + + for (var i = 0; i < this.blocks.length; i++) + if (this.blocks[i].hash('hex') === hash) + return true; + for (var i = 0; i < this.fifo.length; i++) + if (this.fifo[i].hash('hex') === hash) + return true; + + return false; }; Chain.prototype.get = function get(hash, cb) { @@ -132,15 +167,32 @@ Chain.prototype.get = function get(hash, cb) { for (var i = 0; i < this.blocks.length; i++) if (this.blocks[i].hash('hex') === hash) return cb(this.blocks[i]); + for (var i = 0; i < this.fifo.length; i++) + if (this.fifo[i].hash('hex') === hash) + return cb(this.fifo[i]); - if (this.requests[hash]) - this.requests[hash].push(cb); - else - this.requests[hash] = [ cb ]; + if (this.requests.map[hash]) { + this.requests.map[hash].push(cb); + this.requests.count++; + } else { + this.requests.map[hash] = [ cb ]; + } this.emit('missing', hash); }; Chain.prototype.isFull = function isFull() { // < 10m since last block - return (+new Date / 1000) - this.ts[this.ts.length - 1] < 10 * 60; + return !this.requests.count && + (+new Date / 1000) - this.ts[this.ts.length - 1] < 10 * 60; +}; + +Chain.prototype.covers = function covers(ts) { + return this.ts.length && this.ts[0] <= ts; +}; + +Chain.prototype.hashesFrom = function hashesFrom(ts) { + var pos = utils.binaryInsert(this.ts, ts, function(a, b) { + return a - b; + }, true); + return this.hashes.slice(pos); }; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 51217478..0f6d3368 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -12,7 +12,7 @@ function Pool(options) { EventEmitter.call(this); this.options = options || {}; - this.size = options.size || 2; + this.size = options.size || 4; this.parallel = options.parallel || 1000; this.load = { timeout: options.loadTimeout || 10000, @@ -37,13 +37,12 @@ function Pool(options) { load: null }; this.block = { - lastSeen: null, - lastTs: null, + nextLoad: [], + loadTs: Infinity, queue: [], active: 0, requests: {} }; - this.searches = []; this.createConnection = options.createConnection; assert(this.createConnection); @@ -62,17 +61,25 @@ Pool.prototype._init = function _init() { this.chain.on('missing', function(hash) { self._requestBlock(hash); self._scheduleRequests(); - if (self.peers.load === null) - self._addLoader(); + self._load(); }); + + setInterval(function() { + console.log(self.chain.fifo.length, self.chain.blocks.length, self.chain.orphan.count, self.chain.hashes.length, self.chain.ts.length); + }, 5000); }; Pool.prototype._addLoader = function _addLoader() { - assert(this.peers.load === null); + if (this.peers.load !== null) + return; var socket = this.createConnection(); var peer = bcoin.peer(this, socket, this.options.peer); this.peers.load = peer; + // Queue block's hash for search + if (this.block.loadTs < Infinity) + this.block.nextLoad.push(this.chain.hashesFrom(this.block.loadTs)[0]); + var self = this; peer.once('error', function() { // Just ignore, it will result in `close` anyway @@ -93,13 +100,12 @@ Pool.prototype._addLoader = function _addLoader() { function destroy() { // Chain is full and up-to-date - if (self.chain.isFull()) { + if (self.block.nextLoad.length === 0 && self.chain.isFull()) { clearTimeout(timer); peer.removeListener('close', onclose); self._removePeer(peer); } - self.block.lastSeen = null; peer.destroy(); } var timer = setTimeout(destroy, this.load.timeout); @@ -107,7 +113,8 @@ Pool.prototype._addLoader = function _addLoader() { // Split blocks and request them using multiple peers peer.on('blocks', function(hashes) { if (hashes.length === 0) { - self.block.lastSeen = null; + // Reset search loads + self.block.loadTs = Infinity; return; } @@ -118,7 +125,7 @@ Pool.prototype._addLoader = function _addLoader() { self._scheduleRequests(); - self.block.lastSeen = hashes[hashes.length - 1]; + self.block.nextLoad.push(hashes[hashes.length - 1]); clearTimeout(timer); // Reinstantiate timeout @@ -127,7 +134,13 @@ Pool.prototype._addLoader = function _addLoader() { }); }; -Pool.prototype._load = function load() { +Pool.prototype._load = function load(from) { + // Requested load of earlier block ids + if (from !== undefined && this.block.loadTs > from) { + this.block.loadTs = from; + this.block.nextLoad.push(this.chain.hashesFrom(from)[0]); + } + if (this.block.queue.length >= this.load.hwm) { this.load.hiReached = true; return false; @@ -136,11 +149,14 @@ Pool.prototype._load = function load() { // Load more blocks, starting from last hash var hash; - if (this.block.lastSeen) - hash = this.block.lastSeen; - else + if (this.block.nextLoad.length) { + hash = this.block.nextLoad.shift(); + } else { hash = this.chain.getLast().hash; + } + if (!this.peers.load) + this._addLoader(); this.peers.load.loadBlocks(hash); return true; @@ -225,24 +241,41 @@ Pool.prototype.watch = function watch(id) { this.peers.block[i].updateWatch(); }; -Pool.prototype.search = function search(id, limit) { +Pool.prototype.search = function search(id, limit, cb) { if (typeof id === 'string') id = utils.toArray(id, 'hex'); - this.watch(id); - this.searches.push({ - id: id, - // Last 30 days by default - limit: limit || (+new Date / 1000 - 2592000) - }); + // Optional limit argument + if (typeof limit === 'function') { + cb = limit; + limit = null; + } + + // Last 5 days by default, this covers 1000 blocks that we have in the + // chain by default + if (!limit) + limit = (+new Date / 1000 - 432000); + + this.watch(id); + + if (!this.chain.covers(limit)) { + // Load some block hashes first + this._load(limit); + } else { + // Load the blocks themselves! + this.chain.hashesFrom(limit).forEach(function(hash) { + this._requestBlock(hash, true); + }, this); + this._scheduleRequests(); + } }; -Pool.prototype._requestBlock = function _requestBlock(hash) { +Pool.prototype._requestBlock = function _requestBlock(hash, force) { if (typeof hash === 'string') hash = utils.toArray(hash, 'hex'); - // Block is already in chain, or being requested - if (this.chain.has(hash)) + // Block should be not in chain, or be requested + if (this.chain.has(hash, force)) return; var hex = utils.toHex(hash);