From cefeaf779b2847227804552b99230a5376a07a13 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Fri, 19 Feb 2016 07:13:54 -0800 Subject: [PATCH] better block sync. --- lib/bcoin/chain.js | 99 ++++++++++++++++++++++++++++++++++++---------- lib/bcoin/pool.js | 72 +++++++++++++++++++++++---------- 2 files changed, 130 insertions(+), 41 deletions(-) diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index b828b3a6..cc9f589b 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -42,6 +42,7 @@ function Chain(options) { this.locked = false; this.handling = false; this.busy = false; + this.jobQueue = []; this.pending = []; this.pendingBlocks = {}; this.pendingSize = 0; @@ -928,36 +929,90 @@ Chain.prototype._onUnlock = function _onUnlock(callback) { this.once('unlock', callback); }; -Chain.prototype._onBored = function _onBored(callback) { - if (!this.busy) - return callback(); - - this.once('bored', callback); -}; - // REALLY? Did it have to be this fucking complicated? Chain.prototype._onReady = function _onReady(internal, callback) { var self = this; + + if (typeof internal === 'function') { + callback = internal; + internal = false; + } + if (internal) - return callback(); + return callback(function() {}); + + if (this.busy) { + this.jobQueue.push(callback); + return; + } + + self.busy = true; + this._onUnlock(function() { - if (self.locked) - return self._onReady(internal, callback); + assert(!self.locked); + assert(!self.handling); + assert(self.busy); + self.locked = true; - self._onBored(function() { - if (self.busy) - return self._onReady(internal, callback); - self.busy = true; - callback(function unlock() { - self.busy = false; - self.locked = false; - self.emit('unlock'); - self.emit('bored'); - }); + + callback(function unlock() { + var item; + + assert(self.locked); + assert(!self.handling); + assert(self.busy); + + self.busy = false; + self.locked = false; + + if (self.jobQueue.length > 0) { + item = self.jobQueue.shift(); + self._onReady(false, item); + return; + } + + if (self.pending.length === 0) + return; + + item = self.pending.shift(); + delete self.pendingBlocks[item[0].hash('hex')]; + self.pendingSize -= item[0].getSize(); + + self.add(item[0], item[1], item[2]); }); }); }; +function wrap(method) { + return function wrapper() { + var self = this; + var args = Array.prototype.slice.call(arguments); + var callback, internal; + + if (typeof args[args.length - 1] === 'boolean') + internal = args.pop(); + + if (typeof args[args.length - 1] === 'function') + callback = args.pop(); + + this._onReady(internal, function(unlock) { + args.push(function() { + unlock(); + + if (!callback) + return; + + return callback.apply(null, arguments); + }); + + method.apply(self, args); + + if (!callback) + unlock(); + }); + }; +} + Chain.prototype.add = function add(initial, peer, callback) { var self = this; var host = peer ? peer.host : 'unknown'; @@ -1294,6 +1349,9 @@ Chain.prototype.add = function add(initial, peer, callback) { self.emit('unlock'); + if (self.locked) + return; + // Start resolving the queue // (I love asynchronous IO). if (self.pending.length === 0) { @@ -1551,7 +1609,6 @@ Chain.prototype.getHashRangeAsync = function getHashRangeAsync(start, end, callb }); }; - Chain.prototype.getLocator = function getLocator(start) { var hashes = []; var top = this.height; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 9f31db36..eb30e87a 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -653,7 +653,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { utils.debug( 'Recieved unrequested block: %s (%s)', block.rhash, peer.host); - return; + return callback(null, false); } this._prehandleBlock(block, peer, function(err) { @@ -664,6 +664,8 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { if (err) return callback(err); + self._startRequests(peer); + if (added === 0) return callback(null, false); @@ -776,6 +778,10 @@ Pool.prototype._createPeer = function _createPeer(options) { peer.on('txs', function(txs) { self.emit('txs', txs, peer); + + if (self.blockdb && !self.chain.isFull()) + return; + txs.forEach(function(hash) { hash = utils.toHex(hash); if (self._addTX(hash, 0)) @@ -1456,12 +1462,12 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) { return; } - item = new LoadRequest(this, peer, type, hash, cb); - if (options.noQueue) return; - if (item.type === 'tx') { + item = new LoadRequest(this, peer, type, hash, cb); + + if (type === 'tx') { if (peer._txQueue.length === 0) { utils.nextTick(function() { utils.debug( @@ -1475,10 +1481,7 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) { }); } - peer._txQueue.push({ - type: type, - hash: hash - }); + peer._txQueue.push(item.start()); return; } @@ -1486,22 +1489,47 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) { if (peer._blockQueue.length === 0) { this.chain._onFlush(function() { utils.nextTick(function() { - utils.debug( - 'Requesting %d/%d blocks from %s with getdata', - peer._blockQueue.length, - self.request.activeBlocks, - peer.host); - - peer.getData(peer._blockQueue); - peer._blockQueue.length = 0; + self._startRequests(peer); }); }); } - peer._blockQueue.push({ - type: type, - hash: hash + peer._blockQueue.push(item); +}; + +Pool.prototype._startRequests = function _startRequests(peer) { + var size, items; + + if (this.chain.pending.length > 0) + return; + + if (peer._blockQueue.length === 0) + return; + + if (!this.blockdb) { + items = peer._blockQueue.slice(); + peer._blockQueue.length = 0; + } else { + // Blocks start getting big after 150k + if (this.chain.height <= 150000) + size = 500; + else + size = 1; + items = peer._blockQueue.slice(0, size); + peer._blockQueue = peer._blockQueue.slice(size); + } + + items = items.map(function(item) { + return item.start(); }); + + utils.debug( + 'Requesting %d/%d blocks from %s with getdata', + items.length, + this.request.activeBlocks, + peer.host); + + peer.getData(items); }; Pool.prototype._response = function _response(hash) { @@ -1865,7 +1893,9 @@ function LoadRequest(pool, peer, type, hash, cb) { this.cb.push(cb); this._finish = this.finish.bind(this); +} +LoadRequest.prototype.start = function start() { this.timeout = setTimeout(this._finish, this.pool.requestTimeout); this.peer.on('close', this._finish); @@ -1877,7 +1907,9 @@ function LoadRequest(pool, peer, type, hash, cb) { assert(!this.pool.request.map[this.hash]); this.pool.request.map[this.hash] = this; -} + + return this; +}; LoadRequest.prototype.finish = function finish() { var index;