improve sync. misc fixes.

This commit is contained in:
Christopher Jeffrey 2016-03-06 00:57:18 -08:00
parent 30f454e184
commit 51b2e1f9a9
4 changed files with 210 additions and 137 deletions

View File

@ -33,7 +33,7 @@ ec.generatePrivateKey = function generatePrivateKey() {
ec.publicKeyCreate = function publicKeyCreate(priv, compressed) { ec.publicKeyCreate = function publicKeyCreate(priv, compressed) {
assert(Buffer.isBuffer(priv)); assert(Buffer.isBuffer(priv));
if (bcoin.ecdsa.secp256k1) if (bcoin.secp256k1)
return bcoin.secp256k1.publicKeyCreate(priv, compressed); return bcoin.secp256k1.publicKeyCreate(priv, compressed);
priv = bcoin.ecdsa.keyPair({ priv: priv }).getPublic(compressed, 'array'); priv = bcoin.ecdsa.keyPair({ priv: priv }).getPublic(compressed, 'array');

View File

@ -163,14 +163,17 @@ Peer.prototype._init = function init() {
self.destroy(); self.destroy();
return; return;
} }
self.ack = true; self.ack = true;
self.emit('ack'); self.emit('ack');
self.ts = utils.now(); self.ts = utils.now();
self._write(self.framer.packet('getaddr', new Buffer([]))); self._write(self.framer.packet('getaddr', new Buffer([])));
// if (self.pool.options.headers) {
// if (self.version && self.version.v > 70012) if (self.pool.options.headers) {
// self._write(self.framer.packet('sendheaders', new Buffer([]))); if (self.version && self.version.v > 70012)
// } self._write(self.framer.packet('sendheaders', new Buffer([])));
}
}); });
// Send hello // Send hello

View File

@ -59,13 +59,14 @@ function Pool(node, options) {
options.headers = true; options.headers = true;
} else { } else {
if (options.headers == null) if (options.headers == null)
options.headers = false; options.headers = true;
} }
this.syncing = false; this.syncing = false;
this.synced = false; this.synced = false;
this.busy = false; this.busy = false;
this.jobs = []; this.jobs = [];
this._scheduled = false;
this.load = { this.load = {
timeout: options.loadTimeout || 120000, timeout: options.loadTimeout || 120000,
@ -269,64 +270,80 @@ Pool.prototype._lock = function _lock(func, args, force) {
}; };
}; };
Pool.prototype.getBlocks = function getBlocks(peer, top, stop) { Pool.prototype.getBlocks = function getBlocks(peer, top, stop, callback) {
var self = this; var self = this;
callback = utils.ensure(callback);
this.chain.onFlush(function() { this.chain.onFlush(function() {
self.chain.getLocator(top, function(err, locator) { self.chain.getLocator(top, function(err, locator) {
if (err) if (err)
throw err; return callback(err);
peer.getBlocks(locator, stop); peer.getBlocks(locator, stop);
callback();
}); });
}); });
}; };
Pool.prototype.resolveOrphan = function resolveOrphan(peer, top, orphan) { Pool.prototype.resolveOrphan = function resolveOrphan(peer, top, orphan, callback) {
var self = this; var self = this;
callback = utils.ensure(callback);
assert(orphan); assert(orphan);
this.chain.onFlush(function() { this.chain.onFlush(function() {
self.chain.getLocator(top, function(err, locator) { self.chain.getLocator(top, function(err, locator) {
if (err) if (err)
throw err; return callback(err);
orphan = self.chain.getOrphanRoot(orphan); orphan = self.chain.getOrphanRoot(orphan);
// Was probably resolved. // Was probably resolved.
if (!orphan) { if (!orphan) {
utils.debug('Orphan root was already resolved.'); utils.debug('Orphan root was already resolved.');
return; return callback();
} }
// If we're already processing the block // If we're already processing the block
// that would resolve this, ignore. // that would resolve this, ignore.
// if (self.request.map[orphan.soil]) { // if (self.request.map[orphan.soil]) {
// utils.debug('Already requested orphan "soil".'); // utils.debug('Already requested orphan "soil".');
// return; // return callback();
// } // }
if (self.chain.hasPending(orphan.soil)) { if (self.chain.hasPending(orphan.soil)) {
utils.debug('Already processing orphan "soil".'); utils.debug('Already processing orphan "soil".');
return; return callback();
} }
// if (self.chain.has(orphan.soil)) { // if (self.chain.has(orphan.soil)) {
// utils.debug('Already have orphan "soil". Race condition?'); // utils.debug('Already have orphan "soil". Race condition?');
// return; // return callback();
// } // }
peer.getBlocks(locator, orphan.root); peer.getBlocks(locator, orphan.root);
callback();
}); });
}); });
}; };
Pool.prototype.getHeaders = function getHeaders(peer, top, stop) { Pool.prototype.getHeaders = function getHeaders(peer, top, stop, callback) {
var self = this; var self = this;
callback = utils.ensure(callback);
this.chain.onFlush(function() { this.chain.onFlush(function() {
self.chain.getLocator(top, function(err, locator) { self.chain.getLocator(top, function(err, locator) {
if (err) if (err)
throw err; return callback(err);
peer.getHeaders(locator, stop); peer.getHeaders(locator, stop);
callback();
}); });
}); });
}; };
@ -513,19 +530,31 @@ Pool.prototype._addLoader = function _addLoader() {
peer.on('blocks', function(hashes) { peer.on('blocks', function(hashes) {
if (!self.syncing) if (!self.syncing)
return; return;
self._handleInv(hashes, peer);
self._handleInv(hashes, peer, function(err) {
if (err)
self.emit('error', err);
});
}); });
peer.on('headers', function(headers) { peer.on('headers', function(headers) {
if (!self.syncing) if (!self.syncing)
return; return;
self._handleHeaders(headers, peer);
self._handleHeaders(headers, peer, function(err) {
if (err)
self.emit('error', err);
});
}); });
} else { } else {
peer.on('blocks', function(hashes) { peer.on('blocks', function(hashes) {
if (!self.syncing) if (!self.syncing)
return; return;
self._handleBlocks(hashes, peer);
self._handleBlocks(hashes, peer, function(err) {
if (err)
self.emit('error', err);
});
}); });
} }
}; };
@ -561,14 +590,16 @@ Pool.prototype.stopSync = function stopSync() {
this._stopTimer(); this._stopTimer();
}; };
Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) {
var self = this; var self = this;
var i, header, last, block; var last;
assert(this.options.headers); assert(this.options.headers);
callback = utils.ensure(callback);
if (headers.length === 0) if (headers.length === 0)
return; return callback();
utils.debug( utils.debug(
'Recieved %s headers from %s', 'Recieved %s headers from %s',
@ -577,26 +608,32 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) {
if (headers.length > 2000) { if (headers.length > 2000) {
this.setMisbehavior(peer, 100); this.setMisbehavior(peer, 100);
return; return callback();
} }
this.emit('headers', headers); this.emit('headers', headers);
this.chain.onFlush(function() { // Reset interval to avoid calling getheaders unnecessarily
for (i = 0; i < headers.length; i++) { this._startInterval();
header = headers[i];
hash = header.hash('hex');
if (last && header.prevBlock !== last) utils.forEachSerial(headers, function(header, next) {
break; hash = header.hash('hex');
if (!header.verify()) if (last && header.prevBlock !== last)
break; return next(new Error('Bad header chain.'));
self.getData(peer, self.block.type, hash); if (!header.verify())
return next(new Error('Headers invalid.'));
last = hash; last = hash;
}
self.getData(peer, self.block.type, hash, next);
}, function(err) {
if (err)
return callback(err);
// Schedule the getdata's we just added.
self.scheduleRequests(peer);
// Restart the getheaders process // Restart the getheaders process
// Technically `last` is not indexed yet so // Technically `last` is not indexed yet so
@ -606,21 +643,21 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) {
// simply tries to find the latest block in // simply tries to find the latest block in
// the peer's chain. // the peer's chain.
if (last && headers.length === 2000) if (last && headers.length === 2000)
self.getHeaders(peer, last, null); self.getHeaders(peer, last, null, callback);
else
callback();
}); });
// Reset interval to avoid calling getheaders unnecessarily
this._startInterval();
}; };
Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
var self = this; var self = this;
var i, hash;
assert(!this.options.headers); assert(!this.options.headers);
callback = utils.ensure(callback);
if (hashes.length === 0) if (hashes.length === 0)
return; return callback();
utils.debug( utils.debug(
'Recieved %s block hashes from %s', 'Recieved %s block hashes from %s',
@ -636,58 +673,71 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
this.emit('blocks', hashes); this.emit('blocks', hashes);
this.chain.onFlush(function() {
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
// Resolve orphan chain.
if (self.chain.hasOrphan(hash)) {
utils.debug('Peer sent a hash that is already a known orphan.');
self.resolveOrphan(peer, null, hash);
continue;
}
// Normally we request the hashContinue.
// In the odd case where we already have
// it, we can do one of two things: either
// force re-downloading of the block to
// continue the sync, or do a getblocks
// from the last hash.
if (i === hashes.length - 1) {
// Request more hashes:
// self.getBlocks(peer, hash, null);
// Re-download the block (traditional method):
self.getData(peer, self.block.type, hash, { force: true });
continue;
}
self.getData(peer, self.block.type, hash);
}
});
// Reset interval to avoid calling getblocks unnecessarily // Reset interval to avoid calling getblocks unnecessarily
this._startInterval(); this._startInterval();
// Reset timeout to avoid killing the loader // Reset timeout to avoid killing the loader
this._startTimer(); this._startTimer();
utils.forEachSerial(hashes, function(hash, next, i) {
hash = utils.toHex(hash);
// Resolve orphan chain.
if (self.chain.hasOrphan(hash)) {
utils.debug('Peer sent a hash that is already a known orphan.');
self.resolveOrphan(peer, null, hash, next);
return;
}
// Normally we request the hashContinue.
// In the odd case where we already have
// it, we can do one of two things: either
// force re-downloading of the block to
// continue the sync, or do a getblocks
// from the last hash.
if (i === hashes.length - 1) {
// Request more hashes:
// self.getBlocks(peer, hash, null, next);
// Re-download the block (traditional method):
self.getData(peer, self.block.type, hash, { force: true }, next);
return;
}
// Request block.
self.getData(peer, self.block.type, hash, next);
}, function(err) {
if (err)
return callback(err);
self.scheduleRequests(peer);
callback();
});
}; };
Pool.prototype._handleInv = function _handleInv(hashes, peer) { Pool.prototype._handleInv = function _handleInv(hashes, peer, callback) {
var i, hash; var self = this;
// Ignore for now if we're still syncing // Ignore for now if we're still syncing
if (!this.synced) if (!this.synced)
return; return;
for (i = 0; i < hashes.length; i++) { callback = utils.ensure(callback);
hash = utils.toHex(hashes[i]);
if (this.options.headers) utils.forEachSerial(hashes, function(hash, next) {
this.getHeaders(this.peers.load, null, hash); hash = utils.toHex(hash);
if (self.options.headers)
self.getHeaders(peer, null, hash, next);
else else
this.getData(peer, this.block.type, hash); self.getData(peer, self.block.type, hash, next);
} }, function(err) {
if (err)
return callback(err);
self.scheduleRequests(peer);
callback();
});
}; };
Pool.prototype._prehandleTX = function _prehandleTX(tx, peer, callback) { Pool.prototype._prehandleTX = function _prehandleTX(tx, peer, callback) {
@ -825,8 +875,8 @@ Pool.prototype._createPeer = function _createPeer(options) {
if (self.options.discoverPeers === false) if (self.options.discoverPeers === false)
return; return;
if (self.seeds.length > 1000) if (self.seeds.length > 300)
self.setSeeds(self.seeds.slice(-500)); self.setSeeds(self.seeds.slice(-150));
self.addSeed(data); self.addSeed(data);
@ -837,7 +887,7 @@ Pool.prototype._createPeer = function _createPeer(options) {
self.emit('txs', txs, peer); self.emit('txs', txs, peer);
if (!self.options.spv) { if (!self.options.spv) {
if (!self.synced) if (self.syncing && !self.synced)
return; return;
} }
@ -973,17 +1023,22 @@ Pool.prototype._addLeech = function _addLeech(socket) {
peer.on('merkleblock', function(block) { peer.on('merkleblock', function(block) {
if (!self.options.spv) if (!self.options.spv)
return; return;
self._handleBlock(block, peer); self._handleBlock(block, peer);
}); });
peer.on('block', function(block) { peer.on('block', function(block) {
if (self.options.spv) if (self.options.spv)
return; return;
self._handleBlock(block, peer); self._handleBlock(block, peer);
}); });
peer.on('blocks', function(hashes) { peer.on('blocks', function(hashes) {
self._handleInv(hashes, peer); self._handleInv(hashes, peer, function(err) {
if (err)
self.emit('error', err);
});
}); });
utils.nextTick(function() { utils.nextTick(function() {
@ -1459,8 +1514,10 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) {
options = {}; options = {};
} }
callback = utils.ensure(callback);
if (this.destroyed) if (this.destroyed)
return; return callback();
if (!options) if (!options)
options = {}; options = {};
@ -1468,61 +1525,72 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) {
if (Buffer.isBuffer(hash)) if (Buffer.isBuffer(hash))
hash = utils.toHex(hash); hash = utils.toHex(hash);
function hasItem(cb) { function done(err, exists) {
if (!options.force && type !== self.tx.type) if (err)
return self.chain.has(hash, cb); return callback(err);
return cb(null, false);
}
if (self.request.map[hash]) {
if (callback)
self.request.map[hash].callback.push(callback);
return;
}
item = new LoadRequest(self, peer, type, hash, callback);
if (options.noQueue)
return;
if (type === self.tx.type) {
if (peer.queue.tx.length === 0) {
utils.nextTick(function() {
utils.debug(
'Requesting %d/%d txs from %s with getdata',
peer.queue.tx.length,
self.request.activeTX,
peer.host);
peer.getData(peer.queue.tx);
peer.queue.tx.length = 0;
});
}
peer.queue.tx.push(item.start());
return;
}
if (peer.queue.block.length === 0) {
self.chain.onFlush(function() {
utils.nextTick(function() {
self.scheduleRequests(peer);
});
});
}
peer.queue.block.push(item);
hasItem(function(err, exists) {
assert(!err);
if (exists) if (exists)
return item.finish(); return callback();
});
if (self.request.map[hash]) {
// if (callback)
// self.request.map[hash].callback.push(callback);
return callback();
}
// item = new LoadRequest(self, peer, type, hash, callback);
item = new LoadRequest(self, peer, type, hash);
if (options.noQueue)
return callback();
if (type === self.tx.type) {
if (peer.queue.tx.length === 0) {
utils.nextTick(function() {
utils.debug(
'Requesting %d/%d txs from %s with getdata',
peer.queue.tx.length,
self.request.activeTX,
peer.host);
peer.getData(peer.queue.tx);
peer.queue.tx.length = 0;
});
}
peer.queue.tx.push(item.start());
return callback();
}
peer.queue.block.push(item);
return callback();
}
if (!options.force && type !== self.tx.type)
return self.chain.has(hash, done);
return done(null, false);
}; };
Pool.prototype.scheduleRequests = function scheduleRequests(peer) { Pool.prototype.scheduleRequests = function scheduleRequests(peer) {
var self = this;
if (this._scheduled)
return;
this._scheduled = true;
this.chain.onFlush(function() {
utils.nextTick(function() {
self._sendRequests(peer);
self._scheduled = false;
});
});
};
Pool.prototype._sendRequests = function _sendRequests(peer) {
var size, items; var size, items;
if (this.chain.pending.length > 0) if (this.chain.pending.length > 0)
@ -1590,6 +1658,8 @@ Pool.prototype.getBlock = function getBlock(hash, callback) {
this.getData(this.peers.load, 'block', hash, { force: true }, function(block) { this.getData(this.peers.load, 'block', hash, { force: true }, function(block) {
callback(null, block); callback(null, block);
}); });
this.scheduleRequests(this.peers.load);
}; };
Pool.prototype.sendBlock = function sendBlock(block) { Pool.prototype.sendBlock = function sendBlock(block) {

View File

@ -72,13 +72,13 @@ describe('Protocol', function() {
assert.equal(payload.length, 2); assert.equal(payload.length, 2);
assert.equal(typeof payload[0].ts, 'number'); assert.equal(typeof payload[0].ts, 'number');
assert.equal(payload[0].services, 1); assert.equal(payload[0].services, bcoin.protocol.constants.bcoinServices);
assert.equal(payload[0].ipv6, peers[0]._ipv6); assert.equal(payload[0].ipv6, peers[0]._ipv6);
assert.equal(payload[0].ipv4, peers[0]._ipv4); assert.equal(payload[0].ipv4, peers[0]._ipv4);
assert.equal(payload[0].port, peers[0].port); assert.equal(payload[0].port, peers[0].port);
assert.equal(typeof payload[1].ts, 'number'); assert.equal(typeof payload[1].ts, 'number');
assert.equal(payload[1].services, 1); assert.equal(payload[1].services, bcoin.protocol.constants.bcoinServices);
assert.equal(payload[1].ipv6, peers[1]._ipv6); assert.equal(payload[1].ipv6, peers[1]._ipv6);
assert.equal(payload[1].ipv4, peers[1]._ipv4); assert.equal(payload[1].ipv4, peers[1]._ipv4);
assert.equal(payload[1].port, peers[1].port); assert.equal(payload[1].port, peers[1].port);