better block sync.

This commit is contained in:
Christopher Jeffrey 2016-02-19 07:13:54 -08:00
parent 7c3191aee8
commit cefeaf779b
2 changed files with 130 additions and 41 deletions

View File

@ -42,6 +42,7 @@ function Chain(options) {
this.locked = false;
this.handling = false;
this.busy = false;
this.jobQueue = [];
this.pending = [];
this.pendingBlocks = {};
this.pendingSize = 0;
@ -928,36 +929,90 @@ Chain.prototype._onUnlock = function _onUnlock(callback) {
this.once('unlock', callback);
};
Chain.prototype._onBored = function _onBored(callback) {
if (!this.busy)
return callback();
this.once('bored', callback);
};
// REALLY? Did it have to be this fucking complicated?
Chain.prototype._onReady = function _onReady(internal, callback) {
var self = this;
if (typeof internal === 'function') {
callback = internal;
internal = false;
}
if (internal)
return callback();
return callback(function() {});
if (this.busy) {
this.jobQueue.push(callback);
return;
}
self.busy = true;
this._onUnlock(function() {
if (self.locked)
return self._onReady(internal, callback);
assert(!self.locked);
assert(!self.handling);
assert(self.busy);
self.locked = true;
self._onBored(function() {
if (self.busy)
return self._onReady(internal, callback);
self.busy = true;
callback(function unlock() {
self.busy = false;
self.locked = false;
self.emit('unlock');
self.emit('bored');
});
callback(function unlock() {
var item;
assert(self.locked);
assert(!self.handling);
assert(self.busy);
self.busy = false;
self.locked = false;
if (self.jobQueue.length > 0) {
item = self.jobQueue.shift();
self._onReady(false, item);
return;
}
if (self.pending.length === 0)
return;
item = self.pending.shift();
delete self.pendingBlocks[item[0].hash('hex')];
self.pendingSize -= item[0].getSize();
self.add(item[0], item[1], item[2]);
});
});
};
function wrap(method) {
return function wrapper() {
var self = this;
var args = Array.prototype.slice.call(arguments);
var callback, internal;
if (typeof args[args.length - 1] === 'boolean')
internal = args.pop();
if (typeof args[args.length - 1] === 'function')
callback = args.pop();
this._onReady(internal, function(unlock) {
args.push(function() {
unlock();
if (!callback)
return;
return callback.apply(null, arguments);
});
method.apply(self, args);
if (!callback)
unlock();
});
};
}
Chain.prototype.add = function add(initial, peer, callback) {
var self = this;
var host = peer ? peer.host : 'unknown';
@ -1294,6 +1349,9 @@ Chain.prototype.add = function add(initial, peer, callback) {
self.emit('unlock');
if (self.locked)
return;
// Start resolving the queue
// (I love asynchronous IO).
if (self.pending.length === 0) {
@ -1551,7 +1609,6 @@ Chain.prototype.getHashRangeAsync = function getHashRangeAsync(start, end, callb
});
};
Chain.prototype.getLocator = function getLocator(start) {
var hashes = [];
var top = this.height;

View File

@ -653,7 +653,7 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
utils.debug(
'Recieved unrequested block: %s (%s)',
block.rhash, peer.host);
return;
return callback(null, false);
}
this._prehandleBlock(block, peer, function(err) {
@ -664,6 +664,8 @@ Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) {
if (err)
return callback(err);
self._startRequests(peer);
if (added === 0)
return callback(null, false);
@ -776,6 +778,10 @@ Pool.prototype._createPeer = function _createPeer(options) {
peer.on('txs', function(txs) {
self.emit('txs', txs, peer);
if (self.blockdb && !self.chain.isFull())
return;
txs.forEach(function(hash) {
hash = utils.toHex(hash);
if (self._addTX(hash, 0))
@ -1456,12 +1462,12 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) {
return;
}
item = new LoadRequest(this, peer, type, hash, cb);
if (options.noQueue)
return;
if (item.type === 'tx') {
item = new LoadRequest(this, peer, type, hash, cb);
if (type === 'tx') {
if (peer._txQueue.length === 0) {
utils.nextTick(function() {
utils.debug(
@ -1475,10 +1481,7 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) {
});
}
peer._txQueue.push({
type: type,
hash: hash
});
peer._txQueue.push(item.start());
return;
}
@ -1486,22 +1489,47 @@ Pool.prototype._request = function _request(peer, type, hash, options, cb) {
if (peer._blockQueue.length === 0) {
this.chain._onFlush(function() {
utils.nextTick(function() {
utils.debug(
'Requesting %d/%d blocks from %s with getdata',
peer._blockQueue.length,
self.request.activeBlocks,
peer.host);
peer.getData(peer._blockQueue);
peer._blockQueue.length = 0;
self._startRequests(peer);
});
});
}
peer._blockQueue.push({
type: type,
hash: hash
peer._blockQueue.push(item);
};
Pool.prototype._startRequests = function _startRequests(peer) {
var size, items;
if (this.chain.pending.length > 0)
return;
if (peer._blockQueue.length === 0)
return;
if (!this.blockdb) {
items = peer._blockQueue.slice();
peer._blockQueue.length = 0;
} else {
// Blocks start getting big after 150k
if (this.chain.height <= 150000)
size = 500;
else
size = 1;
items = peer._blockQueue.slice(0, size);
peer._blockQueue = peer._blockQueue.slice(size);
}
items = items.map(function(item) {
return item.start();
});
utils.debug(
'Requesting %d/%d blocks from %s with getdata',
items.length,
this.request.activeBlocks,
peer.host);
peer.getData(items);
};
Pool.prototype._response = function _response(hash) {
@ -1865,7 +1893,9 @@ function LoadRequest(pool, peer, type, hash, cb) {
this.cb.push(cb);
this._finish = this.finish.bind(this);
}
LoadRequest.prototype.start = function start() {
this.timeout = setTimeout(this._finish, this.pool.requestTimeout);
this.peer.on('close', this._finish);
@ -1877,7 +1907,9 @@ function LoadRequest(pool, peer, type, hash, cb) {
assert(!this.pool.request.map[this.hash]);
this.pool.request.map[this.hash] = this;
}
return this;
};
LoadRequest.prototype.finish = function finish() {
var index;