pool fixes.

This commit is contained in:
Christopher Jeffrey 2016-03-05 16:55:58 -08:00
parent c9bf605a85
commit df97d0ce11
2 changed files with 103 additions and 91 deletions

View File

@ -849,7 +849,7 @@ Chain.prototype._reorganize = function _reorganize(entry, callback) {
if (err)
return next(err);
self.emit('remove block', entry);
self.emit('remove block', block);
next();
});

View File

@ -64,21 +64,16 @@ function Pool(node, options) {
this.syncing = false;
this.synced = false;
this.busy = false;
this.jobs = [];
this.load = {
timeout: options.loadTimeout || 40000,
timeout: options.loadTimeout || 120000,
interval: options.loadInterval || 20000
};
this.requestTimeout = options.requestTimeout || 2 * 60000;
// this.chain = new bcoin.chain({
// spv: options.spv,
// preload: options.preload,
// blockdb: options.blockdb,
// mempool: options.mempool
// });
this.watchMap = {};
this.bloom = new bcoin.bloom(
@ -237,6 +232,43 @@ Pool.prototype._init = function _init() {
this.startServer();
};
Pool.prototype._lock = function _lock(func, args, force) {
var self = this;
var called;
if (force) {
assert(this.busy);
return function unlock() {
assert(!called);
called = true;
};
}
if (this.busy) {
this.jobs.push([func, args]);
return;
}
this.busy = true;
return function unlock() {
var item;
assert(!called);
called = true;
self.busy = false;
if (self.jobs.length === 0) {
self.emit('flush');
return;
}
item = self.jobs.shift();
item[0].apply(self, item[1]);
};
};
Pool.prototype.getBlocks = function getBlocks(peer, top, stop) {
var self = this;
this.chain.onFlush(function() {
@ -604,51 +636,35 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
this.emit('blocks', hashes);
var req = [];
this.chain.onFlush(function() {
utils.forEachSerial(hashes, function(hash, next, i) {
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
// Resolve orphan chain.
if (self.chain.hasOrphan(hash)) {
utils.debug('Peer sent a hash that is already a known orphan.');
self.resolveOrphan(peer, null, hash);
return utils.nextTick(next);
continue;
}
// Request a block if we don't have it.
self.chain.has(hash, function(err, has) {
assert(!err);
// Normally we request the hashContinue.
// In the odd case where we already have
// it, we can do one of two things: either
// force re-downloading of the block to
// continue the sync, or do a getblocks
// from the last hash.
if (i === hashes.length - 1) {
// Request more hashes:
// self.getBlocks(peer, hash, null);
if (!has) {
req.push([peer, self.block.type, hash]);
return next();
}
// Re-download the block (traditional method):
self.getData(peer, self.block.type, hash, { force: true });
// Normally we request the hashContinue.
// In the odd case where we already have
// it, we can do one of two things: either
// force re-downloading of the block to
// continue the sync, or do a getblocks
// from the last hash.
if (i === hashes.length - 1) {
// Request more hashes:
// self.getBlocks(peer, hash, null);
continue;
}
// Re-download the block (traditional method):
req.push([peer, self.block.type, hash, { force: true }]);
return next();
}
return next();
});
}, function(err) {
if (err)
return self.emit('error', err);
req.forEach(function(item) {
self.getData(item[0], item[1], item[2], item[3]);
});
});
self.getData(peer, self.block.type, hash);
}
});
// Reset interval to avoid calling getblocks unnecessarily
@ -667,15 +683,10 @@ Pool.prototype._handleInv = function _handleInv(hashes, peer) {
for (i = 0; i < hashes.length; i++) {
hash = utils.toHex(hashes[i]);
this.chain.has(hash, function(err, has) {
assert(!err);
if (has)
return;
if (this.options.headers)
this.getHeaders(this.peers.load, null, hash);
else
this.getData(peer, this.block.type, hash);
});
if (this.options.headers)
this.getHeaders(this.peers.load, null, hash);
else
this.getData(peer, this.block.type, hash);
}
};
@ -1457,56 +1468,57 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) {
if (Buffer.isBuffer(hash))
hash = utils.toHex(hash);
function has(cb) {
function hasItem(cb) {
if (!options.force && type !== self.tx.type)
return self.chain.has(hash, cb);
return cb(null, false);
}
has(function(err, res) {
assert(!err);
if (res)
return;
if (self.request.map[hash]) {
if (callback)
self.request.map[hash].callback.push(callback);
return;
}
if (self.request.map[hash]) {
if (callback)
self.request.map[hash].callback.push(callback);
return;
}
item = new LoadRequest(self, peer, type, hash, callback);
item = new LoadRequest(self, peer, type, hash, callback);
if (options.noQueue)
return;
if (options.noQueue)
return;
if (type === self.tx.type) {
if (peer.queue.tx.length === 0) {
utils.nextTick(function() {
utils.debug(
'Requesting %d/%d txs from %s with getdata',
peer.queue.tx.length,
self.request.activeTX,
peer.host);
if (type === self.tx.type) {
if (peer.queue.tx.length === 0) {
utils.nextTick(function() {
utils.debug(
'Requesting %d/%d txs from %s with getdata',
peer.queue.tx.length,
self.request.activeTX,
peer.host);
peer.getData(peer.queue.tx);
peer.queue.tx.length = 0;
});
}
peer.queue.tx.push(item.start());
return;
}
if (peer.queue.block.length === 0) {
self.chain.onFlush(function() {
utils.nextTick(function() {
self.scheduleRequests(peer);
});
peer.getData(peer.queue.tx);
peer.queue.tx.length = 0;
});
}
peer.queue.block.push(item);
peer.queue.tx.push(item.start());
return;
}
if (peer.queue.block.length === 0) {
self.chain.onFlush(function() {
utils.nextTick(function() {
self.scheduleRequests(peer);
});
});
}
peer.queue.block.push(item);
hasItem(function(err, exists) {
assert(!err);
if (exists)
return item.finish();
});
};