From 51b2e1f9a9fc436363bbd748a7c656d7641cdb64 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Sun, 6 Mar 2016 00:57:18 -0800 Subject: [PATCH] improve sync. misc fixes. --- lib/bcoin/ec.js | 2 +- lib/bcoin/peer.js | 11 +- lib/bcoin/pool.js | 330 +++++++++++++++++++++++++----------------- test/protocol-test.js | 4 +- 4 files changed, 210 insertions(+), 137 deletions(-) diff --git a/lib/bcoin/ec.js b/lib/bcoin/ec.js index 40fa01de..a0b39dcf 100644 --- a/lib/bcoin/ec.js +++ b/lib/bcoin/ec.js @@ -33,7 +33,7 @@ ec.generatePrivateKey = function generatePrivateKey() { ec.publicKeyCreate = function publicKeyCreate(priv, compressed) { assert(Buffer.isBuffer(priv)); - if (bcoin.ecdsa.secp256k1) + if (bcoin.secp256k1) return bcoin.secp256k1.publicKeyCreate(priv, compressed); priv = bcoin.ecdsa.keyPair({ priv: priv }).getPublic(compressed, 'array'); diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 3cdf3492..ac91dc1d 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -163,14 +163,17 @@ Peer.prototype._init = function init() { self.destroy(); return; } + self.ack = true; self.emit('ack'); self.ts = utils.now(); + self._write(self.framer.packet('getaddr', new Buffer([]))); - // if (self.pool.options.headers) { - // if (self.version && self.version.v > 70012) - // self._write(self.framer.packet('sendheaders', new Buffer([]))); - // } + + if (self.pool.options.headers) { + if (self.version && self.version.v > 70012) + self._write(self.framer.packet('sendheaders', new Buffer([]))); + } }); // Send hello diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 9b2d1fd8..21583c01 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -59,13 +59,14 @@ function Pool(node, options) { options.headers = true; } else { if (options.headers == null) - options.headers = false; + options.headers = true; } this.syncing = false; this.synced = false; this.busy = false; this.jobs = []; + this._scheduled = false; this.load = { timeout: options.loadTimeout || 120000, @@ -269,64 +270,80 @@ Pool.prototype._lock = function _lock(func, args, force) { }; }; -Pool.prototype.getBlocks = function getBlocks(peer, top, stop) { +Pool.prototype.getBlocks = function getBlocks(peer, top, stop, callback) { var self = this; + + callback = utils.ensure(callback); + this.chain.onFlush(function() { self.chain.getLocator(top, function(err, locator) { if (err) - throw err; + return callback(err); peer.getBlocks(locator, stop); + + callback(); }); }); }; -Pool.prototype.resolveOrphan = function resolveOrphan(peer, top, orphan) { +Pool.prototype.resolveOrphan = function resolveOrphan(peer, top, orphan, callback) { var self = this; + + callback = utils.ensure(callback); + assert(orphan); + this.chain.onFlush(function() { self.chain.getLocator(top, function(err, locator) { if (err) - throw err; + return callback(err); orphan = self.chain.getOrphanRoot(orphan); // Was probably resolved. if (!orphan) { utils.debug('Orphan root was already resolved.'); - return; + return callback(); } // If we're already processing the block // that would resolve this, ignore. // if (self.request.map[orphan.soil]) { // utils.debug('Already requested orphan "soil".'); - // return; + // return callback(); // } if (self.chain.hasPending(orphan.soil)) { utils.debug('Already processing orphan "soil".'); - return; + return callback(); } // if (self.chain.has(orphan.soil)) { // utils.debug('Already have orphan "soil". Race condition?'); - // return; + // return callback(); // } peer.getBlocks(locator, orphan.root); + + callback(); }); }); }; -Pool.prototype.getHeaders = function getHeaders(peer, top, stop) { +Pool.prototype.getHeaders = function getHeaders(peer, top, stop, callback) { var self = this; + + callback = utils.ensure(callback); + this.chain.onFlush(function() { self.chain.getLocator(top, function(err, locator) { if (err) - throw err; + return callback(err); peer.getHeaders(locator, stop); + + callback(); }); }); }; @@ -513,19 +530,31 @@ Pool.prototype._addLoader = function _addLoader() { peer.on('blocks', function(hashes) { if (!self.syncing) return; - self._handleInv(hashes, peer); + + self._handleInv(hashes, peer, function(err) { + if (err) + self.emit('error', err); + }); }); peer.on('headers', function(headers) { if (!self.syncing) return; - self._handleHeaders(headers, peer); + + self._handleHeaders(headers, peer, function(err) { + if (err) + self.emit('error', err); + }); }); } else { peer.on('blocks', function(hashes) { if (!self.syncing) return; - self._handleBlocks(hashes, peer); + + self._handleBlocks(hashes, peer, function(err) { + if (err) + self.emit('error', err); + }); }); } }; @@ -561,14 +590,16 @@ Pool.prototype.stopSync = function stopSync() { this._stopTimer(); }; -Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { +Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) { var self = this; - var i, header, last, block; + var last; assert(this.options.headers); + callback = utils.ensure(callback); + if (headers.length === 0) - return; + return callback(); utils.debug( 'Recieved %s headers from %s', @@ -577,26 +608,32 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { if (headers.length > 2000) { this.setMisbehavior(peer, 100); - return; + return callback(); } this.emit('headers', headers); - this.chain.onFlush(function() { - for (i = 0; i < headers.length; i++) { - header = headers[i]; - hash = header.hash('hex'); + // Reset interval to avoid calling getheaders unnecessarily + this._startInterval(); - if (last && header.prevBlock !== last) - break; + utils.forEachSerial(headers, function(header, next) { + hash = header.hash('hex'); - if (!header.verify()) - break; + if (last && header.prevBlock !== last) + return next(new Error('Bad header chain.')); - self.getData(peer, self.block.type, hash); + if (!header.verify()) + return next(new Error('Headers invalid.')); - last = hash; - } + last = hash; + + self.getData(peer, self.block.type, hash, next); + }, function(err) { + if (err) + return callback(err); + + // Schedule the getdata's we just added. + self.scheduleRequests(peer); // Restart the getheaders process // Technically `last` is not indexed yet so @@ -606,21 +643,21 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { // simply tries to find the latest block in // the peer's chain. if (last && headers.length === 2000) - self.getHeaders(peer, last, null); + self.getHeaders(peer, last, null, callback); + else + callback(); }); - - // Reset interval to avoid calling getheaders unnecessarily - this._startInterval(); }; -Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { +Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { var self = this; - var i, hash; assert(!this.options.headers); + callback = utils.ensure(callback); + if (hashes.length === 0) - return; + return callback(); utils.debug( 'Recieved %s block hashes from %s', @@ -636,58 +673,71 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { this.emit('blocks', hashes); - this.chain.onFlush(function() { - for (i = 0; i < hashes.length; i++) { - hash = hashes[i]; - - // Resolve orphan chain. - if (self.chain.hasOrphan(hash)) { - utils.debug('Peer sent a hash that is already a known orphan.'); - self.resolveOrphan(peer, null, hash); - continue; - } - - // Normally we request the hashContinue. - // In the odd case where we already have - // it, we can do one of two things: either - // force re-downloading of the block to - // continue the sync, or do a getblocks - // from the last hash. - if (i === hashes.length - 1) { - // Request more hashes: - // self.getBlocks(peer, hash, null); - - // Re-download the block (traditional method): - self.getData(peer, self.block.type, hash, { force: true }); - - continue; - } - - self.getData(peer, self.block.type, hash); - } - }); - // Reset interval to avoid calling getblocks unnecessarily this._startInterval(); // Reset timeout to avoid killing the loader this._startTimer(); + + utils.forEachSerial(hashes, function(hash, next, i) { + hash = utils.toHex(hash); + + // Resolve orphan chain. + if (self.chain.hasOrphan(hash)) { + utils.debug('Peer sent a hash that is already a known orphan.'); + self.resolveOrphan(peer, null, hash, next); + return; + } + + // Normally we request the hashContinue. + // In the odd case where we already have + // it, we can do one of two things: either + // force re-downloading of the block to + // continue the sync, or do a getblocks + // from the last hash. + if (i === hashes.length - 1) { + // Request more hashes: + // self.getBlocks(peer, hash, null, next); + // Re-download the block (traditional method): + self.getData(peer, self.block.type, hash, { force: true }, next); + return; + } + + // Request block. + self.getData(peer, self.block.type, hash, next); + }, function(err) { + if (err) + return callback(err); + + self.scheduleRequests(peer); + + callback(); + }); }; -Pool.prototype._handleInv = function _handleInv(hashes, peer) { - var i, hash; +Pool.prototype._handleInv = function _handleInv(hashes, peer, callback) { + var self = this; // Ignore for now if we're still syncing if (!this.synced) return; - for (i = 0; i < hashes.length; i++) { - hash = utils.toHex(hashes[i]); - if (this.options.headers) - this.getHeaders(this.peers.load, null, hash); + callback = utils.ensure(callback); + + utils.forEachSerial(hashes, function(hash, next) { + hash = utils.toHex(hash); + if (self.options.headers) + self.getHeaders(peer, null, hash, next); else - this.getData(peer, this.block.type, hash); - } + self.getData(peer, self.block.type, hash, next); + }, function(err) { + if (err) + return callback(err); + + self.scheduleRequests(peer); + + callback(); + }); }; Pool.prototype._prehandleTX = function _prehandleTX(tx, peer, callback) { @@ -825,8 +875,8 @@ Pool.prototype._createPeer = function _createPeer(options) { if (self.options.discoverPeers === false) return; - if (self.seeds.length > 1000) - self.setSeeds(self.seeds.slice(-500)); + if (self.seeds.length > 300) + self.setSeeds(self.seeds.slice(-150)); self.addSeed(data); @@ -837,7 +887,7 @@ Pool.prototype._createPeer = function _createPeer(options) { self.emit('txs', txs, peer); if (!self.options.spv) { - if (!self.synced) + if (self.syncing && !self.synced) return; } @@ -973,17 +1023,22 @@ Pool.prototype._addLeech = function _addLeech(socket) { peer.on('merkleblock', function(block) { if (!self.options.spv) return; + self._handleBlock(block, peer); }); peer.on('block', function(block) { if (self.options.spv) return; + self._handleBlock(block, peer); }); peer.on('blocks', function(hashes) { - self._handleInv(hashes, peer); + self._handleInv(hashes, peer, function(err) { + if (err) + self.emit('error', err); + }); }); utils.nextTick(function() { @@ -1459,8 +1514,10 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) { options = {}; } + callback = utils.ensure(callback); + if (this.destroyed) - return; + return callback(); if (!options) options = {}; @@ -1468,61 +1525,72 @@ Pool.prototype.getData = function getData(peer, type, hash, options, callback) { if (Buffer.isBuffer(hash)) hash = utils.toHex(hash); - function hasItem(cb) { - if (!options.force && type !== self.tx.type) - return self.chain.has(hash, cb); - return cb(null, false); - } - - if (self.request.map[hash]) { - if (callback) - self.request.map[hash].callback.push(callback); - return; - } - - item = new LoadRequest(self, peer, type, hash, callback); - - if (options.noQueue) - return; - - if (type === self.tx.type) { - if (peer.queue.tx.length === 0) { - utils.nextTick(function() { - utils.debug( - 'Requesting %d/%d txs from %s with getdata', - peer.queue.tx.length, - self.request.activeTX, - peer.host); - - peer.getData(peer.queue.tx); - peer.queue.tx.length = 0; - }); - } - - peer.queue.tx.push(item.start()); - - return; - } - - if (peer.queue.block.length === 0) { - self.chain.onFlush(function() { - utils.nextTick(function() { - self.scheduleRequests(peer); - }); - }); - } - - peer.queue.block.push(item); - - hasItem(function(err, exists) { - assert(!err); + function done(err, exists) { + if (err) + return callback(err); if (exists) - return item.finish(); - }); + return callback(); + + if (self.request.map[hash]) { + // if (callback) + // self.request.map[hash].callback.push(callback); + return callback(); + } + + // item = new LoadRequest(self, peer, type, hash, callback); + item = new LoadRequest(self, peer, type, hash); + + if (options.noQueue) + return callback(); + + if (type === self.tx.type) { + if (peer.queue.tx.length === 0) { + utils.nextTick(function() { + utils.debug( + 'Requesting %d/%d txs from %s with getdata', + peer.queue.tx.length, + self.request.activeTX, + peer.host); + + peer.getData(peer.queue.tx); + peer.queue.tx.length = 0; + }); + } + + peer.queue.tx.push(item.start()); + + return callback(); + } + + peer.queue.block.push(item); + + return callback(); + } + + if (!options.force && type !== self.tx.type) + return self.chain.has(hash, done); + + return done(null, false); }; Pool.prototype.scheduleRequests = function scheduleRequests(peer) { + var self = this; + + if (this._scheduled) + return; + + this._scheduled = true; + + this.chain.onFlush(function() { + utils.nextTick(function() { + self._sendRequests(peer); + self._scheduled = false; + }); + }); +}; + +Pool.prototype._sendRequests = function _sendRequests(peer) { var size, items; if (this.chain.pending.length > 0) @@ -1590,6 +1658,8 @@ Pool.prototype.getBlock = function getBlock(hash, callback) { this.getData(this.peers.load, 'block', hash, { force: true }, function(block) { callback(null, block); }); + + this.scheduleRequests(this.peers.load); }; Pool.prototype.sendBlock = function sendBlock(block) { diff --git a/test/protocol-test.js b/test/protocol-test.js index 187291f1..478e51a8 100644 --- a/test/protocol-test.js +++ b/test/protocol-test.js @@ -72,13 +72,13 @@ describe('Protocol', function() { assert.equal(payload.length, 2); assert.equal(typeof payload[0].ts, 'number'); - assert.equal(payload[0].services, 1); + assert.equal(payload[0].services, bcoin.protocol.constants.bcoinServices); assert.equal(payload[0].ipv6, peers[0]._ipv6); assert.equal(payload[0].ipv4, peers[0]._ipv4); assert.equal(payload[0].port, peers[0].port); assert.equal(typeof payload[1].ts, 'number'); - assert.equal(payload[1].services, 1); + assert.equal(payload[1].services, bcoin.protocol.constants.bcoinServices); assert.equal(payload[1].ipv6, peers[1]._ipv6); assert.equal(payload[1].ipv4, peers[1]._ipv4); assert.equal(payload[1].port, peers[1].port);