From 54dd7c4eb353d78cfca94dbc01ba4a45d99d319b Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Fri, 26 Aug 2016 00:57:07 -0700 Subject: [PATCH] pool: more refactoring. --- lib/net/framer.js | 2 + lib/net/peer.js | 115 ++++++++++++++++++++++------------------------ lib/net/pool.js | 115 +++++++++++++++++++++++++++------------------- 3 files changed, 126 insertions(+), 106 deletions(-) diff --git a/lib/net/framer.js b/lib/net/framer.js index ff9e67bc..6356eb05 100644 --- a/lib/net/framer.js +++ b/lib/net/framer.js @@ -102,6 +102,8 @@ Framer.prototype.verack = function verack() { */ Framer.prototype.ping = function ping(nonce) { + if (!nonce) + return this.packet('ping', DUMMY); return this.packet('ping', framePing(nonce)); }; diff --git a/lib/net/peer.js b/lib/net/peer.js index fc872257..1ef9b43e 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -27,17 +27,13 @@ var GetUTXOsPacket = bcoin.packets.GetUTXOsPacket; * @exports Peer * @constructor * @param {Pool} pool - * @param {Object} options - * @param {Function?} options.createSocket - Callback which returns a - * node.js-like socket object. Necessary for browser. - * @param {Chain} options.chain - * @param {Mempool} options.mempool - * @param {Number?} options.ts - Time at which peer was discovered (unix time). - * @param {net.Socket?} options.socket - * @param {NetworkAddress?} options.host - Host to connect to. + * @param {NetworkAddress} addr + * @param {net.Socket?} socket * @property {Pool} pool * @property {net.Socket?} socket - * @property {String?} host + * @property {String} host + * @property {Number} port + * @property {String} hostname * @property {Number} port * @property {Parser} parser * @property {Framer} framer @@ -68,9 +64,9 @@ var GetUTXOsPacket = bcoin.packets.GetUTXOsPacket; * @emits Peer#ack */ -function Peer(pool, host, socket) { +function Peer(pool, addr, socket) { if (!(this instanceof Peer)) - return new Peer(pool, host, socket); + return new Peer(pool, addr, socket); EventEmitter.call(this); @@ -103,7 +99,7 @@ function Peer(pool, host, socket) { this.lastBlock = null; this.waiting = 0; this.syncSent = false; - this._connectTimeout = null; + this.connectTimeout = null; this.compactMode = null; this.compactBlocks = {}; this.sentAddr = false; @@ -119,7 +115,7 @@ function Peer(pool, host, socket) { this.banScore = 0; - this.pingTimer = null; + this.pingTimeout = null; this.pingInterval = 120000; this.requestTimeout = 10000; @@ -133,11 +129,11 @@ function Peer(pool, host, socket) { this.setMaxListeners(10000); - assert(host, 'Host required.'); + assert(addr, 'Host required.'); - this.host = host.host; - this.port = host.port; - this.hostname = host.hostname; + this.host = addr.host; + this.port = addr.port; + this.hostname = addr.hostname; if (!socket) { this.socket = this.connect(this.port, this.host); @@ -189,7 +185,7 @@ Peer.prototype._init = function init() { }); this.socket.once('error', function(err) { - self._error(err); + self.error(err); switch (err.code) { case 'ECONNREFUSED': @@ -203,7 +199,7 @@ Peer.prototype._init = function init() { }); this.socket.once('close', function() { - self._error('socket hangup'); + self.error('socket hangup'); }); this.socket.on('data', function(chunk) { @@ -215,14 +211,14 @@ Peer.prototype._init = function init() { }); this.parser.on('error', function(err) { - self._error(err, true); + self.error(err, true); self.reject(null, 'malformed', 'error parsing message', 10); }); if (this.bip151) { this.bip151.on('error', function(err) { self.reject(null, 'malformed', 'error parsing message', 10); - self._error(err, true); + self.error(err, true); }); this.bip151.on('rekey', function() { self.logger.debug('Rekeying with peer (%s).', self.hostname); @@ -260,9 +256,9 @@ Peer.prototype._onConnect = function _onConnect() { this.emit('connect'); - if (this._connectTimeout != null) { - clearTimeout(this._connectTimeout); - this._connectTimeout = null; + if (this.connectTimeout != null) { + clearTimeout(this.connectTimeout); + this.connectTimeout = null; } // Send encinit. Wait for handshake to complete. @@ -272,7 +268,7 @@ Peer.prototype._onConnect = function _onConnect() { this.write(this.framer.encinit(this.bip151.toEncinit())); return this.bip151.wait(3000, function(err) { if (err) - self._error(err, true); + self.error(err, true); self._onBIP151(); }); } @@ -300,19 +296,19 @@ Peer.prototype._onBIP151 = function _onBIP151() { assert(!this.bip150.completed); if (!this.bip151.handshake) - return this._error('BIP151 handshake was not completed for BIP150.'); + return this.error('BIP151 handshake was not completed for BIP150.'); this.logger.info('Attempting BIP150 handshake (%s).', this.hostname); if (this.bip150.outbound) { if (!this.bip150.peerIdentity) - return this._error('No known identity for peer.'); + return this.error('No known identity for peer.'); this.write(this.framer.authChallenge(this.bip150.toChallenge())); } return this.bip150.wait(3000, function(err) { if (err) - return self._error(err); + return self.error(err); self._onHandshake(); }); } @@ -362,7 +358,7 @@ Peer.prototype._onAck = function _onAck(err) { var self = this; if (err) { - this._error(err); + this.error(err); return; } @@ -378,7 +374,7 @@ Peer.prototype._onAck = function _onAck(err) { this.ack = true; // Setup the ping interval. - this.pingTimer = setInterval(function() { + this.pingTimeout = setInterval(function() { self.sendPing(); }, this.pingInterval); @@ -463,8 +459,8 @@ Peer.prototype.connect = function connect(port, host) { self.logger.info('Connected to %s.', self.hostname); }); - this._connectTimeout = setTimeout(function() { - self._error('Connection timed out.'); + this.connectTimeout = setTimeout(function() { + self.error('Connection timed out.'); self.ignore(); }, 10000); @@ -633,7 +629,7 @@ Peer.prototype.sendPing = function sendPing() { return; if (this.version.version <= 60000) { - this.write(this.framer.packet('ping', new Buffer([]))); + this.write(this.framer.ping()); return; } @@ -712,14 +708,14 @@ Peer.prototype.destroy = function destroy() { if (this.bip150) this.bip150.destroy(); - if (this.pingTimer != null) { - clearInterval(this.pingTimer); - this.pingTimer = null; + if (this.pingTimeout != null) { + clearInterval(this.pingTimeout); + this.pingTimeout = null; } - if (this._connectTimeout != null) { - clearTimeout(this._connectTimeout); - this._connectTimeout = null; + if (this.connectTimeout != null) { + clearTimeout(this.connectTimeout); + this.connectTimeout = null; } keys = Object.keys(this.requestMap); @@ -756,7 +752,7 @@ Peer.prototype.write = function write(chunk) { * @param {String|Error} err */ -Peer.prototype._error = function error(err, keep) { +Peer.prototype.error = function error(err, keep) { if (this.destroyed) return; @@ -783,7 +779,7 @@ Peer.prototype.request = function request(cmd, callback) { var entry; if (this.destroyed) - return utils.asyncify(callback)(new Error('Destroyed, sorry')); + return callback(new Error('Destroyed')); entry = new RequestEntry(this, cmd, callback); @@ -1328,21 +1324,21 @@ Peer.prototype._handleVersion = function _handleVersion(version) { if (!this.network.selfConnect) { if (version.nonce.cmp(this.pool.localNonce) === 0) { - this._error('We connected to ourself. Oops.'); + this.error('We connected to ourself. Oops.'); this.ignore(); return; } } if (version.version < constants.MIN_VERSION) { - this._error('Peer doesn\'t support required protocol version.'); + this.error('Peer does not support required protocol version.'); this.ignore(); return; } if (this.outbound) { if (!version.hasNetwork()) { - this._error('Peer does not support network services.'); + this.error('Peer does not support network services.'); this.ignore(); return; } @@ -1350,7 +1346,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) { if (this.options.headers) { if (!version.hasHeaders()) { - this._error('Peer doesn\'t support getheaders.'); + this.error('Peer does not support getheaders.'); this.ignore(); return; } @@ -1358,7 +1354,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) { if (this.options.spv) { if (!version.hasBloom()) { - this._error('Peer does not support bip37.'); + this.error('Peer does not support BIP37.'); this.ignore(); return; } @@ -1367,13 +1363,13 @@ Peer.prototype._handleVersion = function _handleVersion(version) { if (this.options.witness) { if (!version.hasWitness()) { if (!this.network.oldWitness) { - this._error('Peer does not support segregated witness.'); + this.error('Peer does not support segregated witness.'); this.ignore(); return; } this.request('havewitness', function(err) { if (err) { - self._error('Peer does not support segregated witness.'); + self.error('Peer does not support segregated witness.'); self.ignore(); } }); @@ -1499,7 +1495,7 @@ Peer.prototype._handleGetData = function _handleGetData(items) { } if (items.length > 50000) { - this._error('getdata size too large (%s).', items.length); + this.error('getdata size too large (%s).', items.length); return done(); } @@ -1705,7 +1701,7 @@ Peer.prototype._handlePong = function _handlePong(nonce) { Peer.prototype._handleGetAddr = function _handleGetAddr() { var items = []; - var i, host; + var i, addr; if (this.options.selfish) return; @@ -1718,15 +1714,15 @@ Peer.prototype._handleGetAddr = function _handleGetAddr() { this.sentAddr = true; for (i = 0; i < this.pool.hosts.items.length; i++) { - host = this.pool.hosts.items[i]; + addr = this.pool.hosts.items[i]; - if (!host.isIP()) + if (!addr.isIP()) continue; - if (!this.addrFilter.added(host.host, 'ascii')) + if (!this.addrFilter.added(addr.host, 'ascii')) continue; - items.push(host); + items.push(addr); if (items.length === 1000) break; @@ -1848,7 +1844,7 @@ Peer.prototype._handleEncinit = function _handleEncinit(data) { try { this.bip151.encinit(data); } catch (e) { - this._error(e); + this.error(e); return; } @@ -1870,7 +1866,7 @@ Peer.prototype._handleEncack = function _handleEncack(data) { try { this.bip151.encack(data); } catch (e) { - this._error(e); + this.error(e); return; } @@ -1892,7 +1888,7 @@ Peer.prototype._handleAuthChallenge = function _handleAuthChallenge(data) { try { result = this.bip150.challenge(data); } catch (e) { - this._error(e); + this.error(e); return; } @@ -1916,7 +1912,7 @@ Peer.prototype._handleAuthReply = function _handleAuthReply(data) { try { result = this.bip150.reply(data); } catch (e) { - this._error(e); + this.error(e); return; } @@ -1941,7 +1937,7 @@ Peer.prototype._handleAuthPropose = function _handleAuthPropose(data) { try { result = this.bip150.propose(data); } catch (e) { - this._error(e); + this.error(e); return; } @@ -2420,6 +2416,7 @@ Peer.prototype.inspect = function inspect() { + ' id=' + this.id + ' ack=' + this.ack + ' host=' + this.hostname + + ' outbound=' + this.outbound + ' ping=' + this.minPing + '>'; }; diff --git a/lib/net/pool.js b/lib/net/pool.js index 90d264b1..c2d610a1 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -141,7 +141,7 @@ function Pool(options) { this.scheduled = false; this.pendingWatch = null; - this.timer = null; + this.timeout = null; this.interval = null; this._initOptions(); @@ -274,7 +274,7 @@ Pool.prototype._init = function _init() { }); this.chain.on('full', function() { - self.stopTimer(); + self.stopTimeout(); self.stopInterval(); if (!self.synced) { @@ -376,7 +376,12 @@ Pool.prototype._close = function close(callback) { this.peers.destroy(); this.stopInterval(); - this.stopTimer(); + this.stopTimeout(); + + if (this.pendingWatch != null) { + clearTimeout(this.pendingWatch); + this.pendingWatch = null; + } this.unlisten(callback); }; @@ -521,15 +526,13 @@ Pool.prototype._handleLeech = function _handleLeech(socket) { }; /** - * Start timer to detect stalling. + * Start timeout to detect stalling. * @private */ -Pool.prototype.startTimer = function startTimer() { +Pool.prototype.startTimeout = function startTimeout() { var self = this; - this.stopTimer(); - function destroy() { if (!self.syncing) return; @@ -543,25 +546,27 @@ Pool.prototype.startTimer = function startTimer() { } } - this.timer = setTimeout(destroy, this.loadTimeout); + this.stopTimeout(); + + this.timeout = setTimeout(destroy, this.loadTimeout); }; /** - * Stop the stall timer (done on chain sync). + * Stop the stall timeout (done on chain sync). * @private */ -Pool.prototype.stopTimer = function stopTimer() { - if (this.timer != null) { - clearTimeout(this.timer); - this.timer = null; +Pool.prototype.stopTimeout = function stopTimeout() { + if (this.timeout != null) { + clearTimeout(this.timeout); + this.timeout = null; } }; /** * Start the stall interval (shorter than the - * stall timer, inteded to give warnings and - * reset the stall *timer* if the chain is + * stall timeout, inteded to give warnings and + * reset the stall *timeout* if the chain is * busy). Stopped on chain sync. * @private */ @@ -569,9 +574,10 @@ Pool.prototype.stopTimer = function stopTimer() { Pool.prototype.startInterval = function startInterval() { var self = this; - this.stopInterval(); - function load() { + var peer = self.peers.load; + var hostname = peer ? peer.hostname : null; + if (!self.syncing) return; @@ -579,11 +585,13 @@ Pool.prototype.startInterval = function startInterval() { return self.stopInterval(); if (self.chain.isBusy()) - return self.startTimer(); + return self.startTimeout(); - self.logger.warning('Stalling.'); + self.logger.warning('Loader peer is stalling (%s).', hostname); } + this.stopInterval(); + this.interval = setInterval(load, this.loadTimeout / 6 | 0); }; @@ -620,9 +628,15 @@ Pool.prototype.addLoader = function addLoader() { addr = this.getLoaderHost(); peer = this.peers.get(addr); + if (this.syncing) { + this.startTimeout(); + this.startInterval(); + } + if (peer) { + this.logger.info('Repurposing peer for loader (%s).', peer.hostname); this.peers.repurpose(peer); - this.logger.info('Repurposed peer for loader (%s).', peer.hostname); + this.fillPeers(); utils.nextTick(function() { self.emit('loader', peer); }); @@ -649,7 +663,7 @@ Pool.prototype.startSync = function startSync() { this.syncing = true; this.startInterval(); - this.startTimer(); + this.startTimeout(); this.connect(); @@ -692,7 +706,7 @@ Pool.prototype.stopSync = function stopSync() { return; this.stopInterval(); - this.stopTimer(); + this.stopTimeout(); if (this.peers.load) this.peers.load.syncSent = false; @@ -733,7 +747,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback) // Reset interval to avoid stall behavior. this.startInterval(); // Reset timeout to avoid killing the loader. - this.startTimer(); + this.startTimeout(); } utils.forEachSerial(headers, function(header, next) { @@ -802,7 +816,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) { // Reset interval to avoid stall behavior. this.startInterval(); // Reset timeout to avoid killing the loader. - this.startTimer(); + this.startTimeout(); } utils.forEachSerial(hashes, function(hash, next, i) { @@ -1029,7 +1043,7 @@ Pool.prototype.createPeer = function createPeer(addr, socket) { } self.stopInterval(); - self.stopTimer(); + self.stopTimeout(); if (self.peers.regular.length === 0) { self.logger.warning('%s %s %s', @@ -1060,7 +1074,7 @@ Pool.prototype.createPeer = function createPeer(addr, socket) { if (peer.isLoader()) { self.startInterval(); - self.startTimer(); + self.startTimeout(); } }); }); @@ -1080,7 +1094,7 @@ Pool.prototype.createPeer = function createPeer(addr, socket) { if (peer.isLoader()) { self.startInterval(); - self.startTimer(); + self.startTimeout(); } }); }); @@ -1133,33 +1147,33 @@ Pool.prototype.createPeer = function createPeer(addr, socket) { }); }); - peer.on('addr', function(hosts) { - var i, host; + peer.on('addr', function(addrs) { + var i, addr; if (self.options.ignoreDiscovery) return; - for (i = 0; i < hosts.length; i++) { - host = hosts[i]; + for (i = 0; i < addrs.length; i++) { + addr = addrs[i]; - if (!host.hasNetwork()) + if (!addr.hasNetwork()) continue; if (self.options.spv) { - if (!host.hasBloom()) + if (!addr.hasBloom()) continue; } if (self.options.witness) { - if (!host.hasWitness()) + if (!addr.hasWitness()) continue; } - if (self.hosts.add(host)) - self.emit('host', host, peer); + if (self.hosts.add(addr)) + self.emit('host', addr, peer); } - self.emit('addr', hosts, peer); + self.emit('addr', addrs, peer); self.fillPeers(); }); @@ -1398,11 +1412,13 @@ Pool.prototype.addPeer = function addPeer() { */ Pool.prototype.fillPeers = function fillPeers() { - this.logger.debug('Refilling peers (%d/%d).', - this.peers.pending.length + this.peers.regular.length, - this.maxPeers - 1); + var i; - for (var i = 0; i < this.maxPeers - 1; i++) + this.logger.debug('Refilling peers (%d/%d).', + this.peers.all.length - this.peers.leeches.length, + this.maxPeers); + + for (i = 0; i < this.maxPeers - 1; i++) this.addPeer(); }; @@ -1415,6 +1431,11 @@ Pool.prototype.fillPeers = function fillPeers() { Pool.prototype.removePeer = function removePeer(peer) { var i, hashes, hash, item; + if (peer.isLoader() && this.syncing) { + this.stopTimeout(); + this.stopInterval(); + } + this.peers.remove(peer); hashes = Object.keys(this.requestMap); @@ -1770,7 +1791,7 @@ Pool.prototype.setMisbehavior = function setMisbehavior(peer, score) { if (peer.banScore >= constants.BAN_SCORE) { this.ban(peer); - this.logger.debug('Ban threshold exceeded (%s).', peer.host); + this.logger.debug('Ban threshold exceeded (%s).', peer.hostname); return true; } @@ -1779,7 +1800,7 @@ Pool.prototype.setMisbehavior = function setMisbehavior(peer, score) { /** * Ban a peer. - * @param {NetworkAddress} host + * @param {NetworkAddress} addr */ Pool.prototype.ban = function ban(addr) { @@ -1802,7 +1823,7 @@ Pool.prototype.unban = function unban(addr) { }; /** - * Test whether the host/peer is banned. + * Test whether the host is banned. * @param {NetworkAddress} addr * @returns {Boolean} */ @@ -1827,7 +1848,7 @@ Pool.prototype.ignore = function ignore(addr) { }; /** - * Test whether the host/peer is ignored. + * Test whether the host is ignored. * @param {NetworkAddress} addr * @returns {Boolean} */ @@ -2160,7 +2181,7 @@ HostList.prototype.unban = function unban(addr) { }; /** - * Test whether the host/peer is banned. + * Test whether the host is banned. * @param {NetworkAddress} addr * @returns {Boolean} */ @@ -2291,7 +2312,7 @@ LoadRequest.prototype.addCallback = function addCallback(callback) { }; /** - * Mark the request as in-flight. Start timeout timer. + * Mark the request as in-flight. Start timeout. */ LoadRequest.prototype.start = function start() {