From df97d0ce11c5243b12a210aaa278e2a3f0b09dd9 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Sat, 5 Mar 2016 16:55:58 -0800 Subject: [PATCH] pool fixes. --- lib/bcoin/chain.js | 2 +- lib/bcoin/pool.js | 192 ++++++++++++++++++++++++--------------------- 2 files changed, 103 insertions(+), 91 deletions(-) diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index 05e9b262..dfd04f23 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -849,7 +849,7 @@ Chain.prototype._reorganize = function _reorganize(entry, callback) { if (err) return next(err); - self.emit('remove block', entry); + self.emit('remove block', block); next(); }); diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 3bdc942f..582f8d21 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -64,21 +64,16 @@ function Pool(node, options) { this.syncing = false; this.synced = false; + this.busy = false; + this.jobs = []; this.load = { - timeout: options.loadTimeout || 40000, + timeout: options.loadTimeout || 120000, interval: options.loadInterval || 20000 }; this.requestTimeout = options.requestTimeout || 2 * 60000; - // this.chain = new bcoin.chain({ - // spv: options.spv, - // preload: options.preload, - // blockdb: options.blockdb, - // mempool: options.mempool - // }); - this.watchMap = {}; this.bloom = new bcoin.bloom( @@ -237,6 +232,43 @@ Pool.prototype._init = function _init() { this.startServer(); }; +Pool.prototype._lock = function _lock(func, args, force) { + var self = this; + var called; + + if (force) { + assert(this.busy); + return function unlock() { + assert(!called); + called = true; + }; + } + + if (this.busy) { + this.jobs.push([func, args]); + return; + } + + this.busy = true; + + return function unlock() { + var item; + + assert(!called); + called = true; + + self.busy = false; + + if (self.jobs.length === 0) { + self.emit('flush'); + return; + } + + item = self.jobs.shift(); + item[0].apply(self, item[1]); + }; +}; + Pool.prototype.getBlocks = function getBlocks(peer, top, stop) { var self = this; this.chain.onFlush(function() { @@ -604,51 +636,35 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { this.emit('blocks', hashes); - var req = []; this.chain.onFlush(function() { - utils.forEachSerial(hashes, function(hash, next, i) { + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + // Resolve orphan chain. if (self.chain.hasOrphan(hash)) { utils.debug('Peer sent a hash that is already a known orphan.'); self.resolveOrphan(peer, null, hash); - return utils.nextTick(next); + continue; } - // Request a block if we don't have it. - self.chain.has(hash, function(err, has) { - assert(!err); + // Normally we request the hashContinue. + // In the odd case where we already have + // it, we can do one of two things: either + // force re-downloading of the block to + // continue the sync, or do a getblocks + // from the last hash. + if (i === hashes.length - 1) { + // Request more hashes: + // self.getBlocks(peer, hash, null); - if (!has) { - req.push([peer, self.block.type, hash]); - return next(); - } + // Re-download the block (traditional method): + self.getData(peer, self.block.type, hash, { force: true }); - // Normally we request the hashContinue. - // In the odd case where we already have - // it, we can do one of two things: either - // force re-downloading of the block to - // continue the sync, or do a getblocks - // from the last hash. - if (i === hashes.length - 1) { - // Request more hashes: - // self.getBlocks(peer, hash, null); + continue; + } - // Re-download the block (traditional method): - req.push([peer, self.block.type, hash, { force: true }]); - - return next(); - } - - return next(); - }); - }, function(err) { - if (err) - return self.emit('error', err); - - req.forEach(function(item) { - self.getData(item[0], item[1], item[2], item[3]); - }); - }); + self.getData(peer, self.block.type, hash); + } }); // Reset interval to avoid calling getblocks unnecessarily @@ -667,15 +683,10 @@ Pool.prototype._handleInv = function _handleInv(hashes, peer) { for (i = 0; i < hashes.length; i++) { hash = utils.toHex(hashes[i]); - this.chain.has(hash, function(err, has) { - assert(!err); - if (has) - return; - if (this.options.headers) - this.getHeaders(this.peers.load, null, hash); - else - this.getData(peer, this.block.type, hash); - }); + if (this.options.headers) + this.getHeaders(this.peers.load, null, hash); + else + this.getData(peer, this.block.type, hash); } }; @@ -1457,56 +1468,57 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) { if (Buffer.isBuffer(hash)) hash = utils.toHex(hash); - function has(cb) { + function hasItem(cb) { if (!options.force && type !== self.tx.type) return self.chain.has(hash, cb); return cb(null, false); } - has(function(err, res) { - assert(!err); - if (res) - return; + if (self.request.map[hash]) { + if (callback) + self.request.map[hash].callback.push(callback); + return; + } - if (self.request.map[hash]) { - if (callback) - self.request.map[hash].callback.push(callback); - return; - } + item = new LoadRequest(self, peer, type, hash, callback); - item = new LoadRequest(self, peer, type, hash, callback); + if (options.noQueue) + return; - if (options.noQueue) - return; + if (type === self.tx.type) { + if (peer.queue.tx.length === 0) { + utils.nextTick(function() { + utils.debug( + 'Requesting %d/%d txs from %s with getdata', + peer.queue.tx.length, + self.request.activeTX, + peer.host); - if (type === self.tx.type) { - if (peer.queue.tx.length === 0) { - utils.nextTick(function() { - utils.debug( - 'Requesting %d/%d txs from %s with getdata', - peer.queue.tx.length, - self.request.activeTX, - peer.host); - - peer.getData(peer.queue.tx); - peer.queue.tx.length = 0; - }); - } - - peer.queue.tx.push(item.start()); - - return; - } - - if (peer.queue.block.length === 0) { - self.chain.onFlush(function() { - utils.nextTick(function() { - self.scheduleRequests(peer); - }); + peer.getData(peer.queue.tx); + peer.queue.tx.length = 0; }); } - peer.queue.block.push(item); + peer.queue.tx.push(item.start()); + + return; + } + + if (peer.queue.block.length === 0) { + self.chain.onFlush(function() { + utils.nextTick(function() { + self.scheduleRequests(peer); + }); + }); + } + + peer.queue.block.push(item); + + hasItem(function(err, exists) { + assert(!err); + + if (exists) + return item.finish(); }); };