From d21aadf6b370a1989edc2eec324069dd9204ecec Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Thu, 25 Aug 2016 23:32:54 -0700 Subject: [PATCH] peer: refactor. --- lib/net/packets.js | 35 ++++++++++++- lib/net/peer.js | 111 ++++++++++++++++++++--------------------- lib/net/pool.js | 27 +++++----- lib/net/proxysocket.js | 20 +++++++- 4 files changed, 118 insertions(+), 75 deletions(-) diff --git a/lib/net/packets.js b/lib/net/packets.js index cd645eee..72142417 100644 --- a/lib/net/packets.js +++ b/lib/net/packets.js @@ -934,7 +934,7 @@ NetworkAddress.prototype.inspect = function inspect() { * Inject properties from hostname and network. * @private * @param {String} hostname - * @param {Network|NetworkType} network + * @param {(Network|NetworkType)?} network */ NetworkAddress.prototype.fromHostname = function fromHostname(hostname, network) { @@ -966,6 +966,39 @@ NetworkAddress.fromHostname = function fromHostname(hostname, network) { return new NetworkAddress().fromHostname(hostname, network); }; +/** + * Inject properties from socket. + * @private + * @param {net.Socket} socket + */ + +NetworkAddress.prototype.fromSocket = function fromSocket(socket) { + assert(typeof socket.remoteAddress === 'string'); + assert(typeof socket.remotePort === 'number'); + + this.host = IP.normalize(socket.remoteAddress); + this.port = socket.remotePort; + this.services = constants.services.NETWORK + | constants.services.BLOOM + | constants.services.WITNESS; + this.ts = bcoin.now(); + + this.hostname = IP.hostname(this.host, this.port); + + return this; +}; + +/** + * Instantiate a network address + * from a socket. + * @param {net.Socket} socket + * @returns {NetworkAddress} + */ + +NetworkAddress.fromSocket = function fromSocket(hostname) { + return new NetworkAddress().fromSocket(hostname); +}; + /** * Inject properties from serialized data. * @private diff --git a/lib/net/peer.js b/lib/net/peer.js index 52773505..fc872257 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -68,24 +68,21 @@ var GetUTXOsPacket = bcoin.packets.GetUTXOsPacket; * @emits Peer#ack */ -function Peer(pool, options) { +function Peer(pool, host, socket) { if (!(this instanceof Peer)) - return new Peer(pool, options); + return new Peer(pool, host, socket); EventEmitter.call(this); - if (!options) - options = {}; - - this.options = options; this.pool = pool; + this.options = pool.options; this.logger = pool.logger; this.socket = null; this.outbound = false; this.host = null; this.port = 0; this.hostname = null; - this._createSocket = this.pool.options.createSocket; + this.createSocket = this.options.createSocket; this.chain = this.pool.chain; this.mempool = this.pool.mempool; this.network = this.chain.network; @@ -94,7 +91,7 @@ function Peer(pool, options) { this.destroyed = false; this.ack = false; this.connected = false; - this.ts = options.ts || 0; + this.ts = 0; this.preferHeaders = false; this.haveWitness = false; this.hashContinue = null; @@ -123,9 +120,9 @@ function Peer(pool, options) { this.banScore = 0; this.pingTimer = null; - this.pingInterval = options.pingInterval || 120000; + this.pingInterval = 120000; - this.requestTimeout = options.requestTimeout || 10000; + this.requestTimeout = 10000; this.requestMap = {}; this.queueBlock = []; @@ -136,30 +133,23 @@ function Peer(pool, options) { this.setMaxListeners(10000); - if (options.socket) { - this.socket = options.socket; - this.host = IP.normalize(this.socket.remoteAddress); - this.port = this.socket.remotePort; - this.connected = true; - this.outbound = false; - } else if (options.host) { - this.host = options.host.host; - this.port = options.host.port; - this.socket = this.createSocket(this.port, this.host); + assert(host, 'Host required.'); + + this.host = host.host; + this.port = host.port; + this.hostname = host.hostname; + + if (!socket) { + this.socket = this.connect(this.port, this.host); this.outbound = true; } else { - assert(false, 'No seed or socket.'); + this.socket = socket; + this.connected = true; } - assert(typeof this.host === 'string'); - assert(typeof this.port === 'number'); - assert(this.socket, 'No socket.'); - - this.hostname = IP.hostname(this.host, this.port); - - if (this.pool.options.bip151) { + if (this.options.bip151) { this.bip151 = new bcoin.bip151(); - if (this.pool.options.bip150) { + if (this.options.bip150) { this.bip150 = new bcoin.bip150( this.bip151, this.hostname, @@ -357,7 +347,7 @@ Peer.prototype._onHandshake = function _onHandshake() { // Advertise our address. if (this.pool.address.host !== '0.0.0.0' - && !this.pool.options.selfish + && !this.options.selfish && this.pool.server) { this.write(this.framer.addr([this.pool.address])); } @@ -386,7 +376,6 @@ Peer.prototype._onAck = function _onAck(err) { } this.ack = true; - this.ts = utils.now(); // Setup the ping interval. this.pingTimer = setInterval(function() { @@ -394,7 +383,7 @@ Peer.prototype._onAck = function _onAck(err) { }, this.pingInterval); // Ask for headers-only. - if (this.pool.options.headers) { + if (this.options.headers) { if (this.version.version >= 70012) this.write(this.framer.sendHeaders()); } @@ -402,13 +391,13 @@ Peer.prototype._onAck = function _onAck(err) { // Let them know we support segwit (old // segwit3 nodes require this instead // of service bits). - if (this.pool.options.witness && this.network.oldWitness) { + if (this.options.witness && this.network.oldWitness) { if (this.version.version >= 70012) this.write(this.framer.haveWitness()); } // We want compact blocks! - if (this.pool.options.compact) { + if (this.options.compact) { if (this.version.version >= 70014) this.sendCompact(); } @@ -450,13 +439,14 @@ Peer.prototype._onAck = function _onAck(err) { * @returns {net.Socket} */ -Peer.prototype.createSocket = function createSocket(port, host) { +Peer.prototype.connect = function connect(port, host) { var self = this; - var hostname = IP.hostname(host, port); var socket, proxy, net; - if (this._createSocket) { - socket = this._createSocket(port, host); + assert(!this.socket); + + if (this.createSocket) { + socket = this.createSocket(port, host); } else { if (utils.isBrowser) { proxy = require('./proxysocket'); @@ -467,10 +457,10 @@ Peer.prototype.createSocket = function createSocket(port, host) { } } - this.logger.debug('Connecting to %s.', hostname); + this.logger.debug('Connecting to %s.', this.hostname); socket.once('connect', function() { - self.logger.info('Connected to %s.', hostname); + self.logger.info('Connected to %s.', self.hostname); }); this._connectTimeout = setTimeout(function() { @@ -628,7 +618,7 @@ Peer.prototype.sendVersion = function sendVersion() { nonce: this.pool.localNonce, agent: constants.USER_AGENT, height: this.chain.height, - relay: this.pool.options.relay + relay: this.options.relay }); this.write(this.framer.version(packet)); @@ -685,7 +675,7 @@ Peer.prototype.isWatched = function isWatched(item) { */ Peer.prototype.updateWatch = function updateWatch() { - if (!this.pool.options.spv) + if (!this.options.spv) return; this.write(this.framer.filterLoad(this.pool.spvFilter)); @@ -722,13 +712,16 @@ Peer.prototype.destroy = function destroy() { if (this.bip150) this.bip150.destroy(); - this.emit('close'); - - if (this.pingTimer) { + if (this.pingTimer != null) { clearInterval(this.pingTimer); this.pingTimer = null; } + if (this._connectTimeout != null) { + clearTimeout(this._connectTimeout); + this._connectTimeout = null; + } + keys = Object.keys(this.requestMap); for (i = 0; i < keys.length; i++) { @@ -738,6 +731,8 @@ Peer.prototype.destroy = function destroy() { for (j = 0; j < queue.length; j++) queue[j].destroy(); } + + this.emit('close'); }; /** @@ -849,7 +844,7 @@ Peer.prototype.getData = function getData(items) { if (item.toInv) item = item.toInv(); - if (this.pool.options.compact + if (this.options.compact && this.compactMode && item.isBlock() && !item.hasWitness()) { @@ -1115,7 +1110,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { if (!this.pool.synced) return done(); - if (this.pool.options.selfish) + if (this.options.selfish) return done(); if (this.chain.db.options.spv) @@ -1199,7 +1194,7 @@ Peer.prototype._handleGetHeaders = function _handleGetHeaders(payload) { if (!this.pool.synced) return done(); - if (this.pool.options.selfish) + if (this.options.selfish) return done(); if (this.chain.db.options.spv) @@ -1282,7 +1277,7 @@ Peer.prototype._handleGetBlocks = function _handleGetBlocks(payload) { if (!this.pool.synced) return done(); - if (this.pool.options.selfish) + if (this.options.selfish) return done(); if (this.chain.db.options.spv) @@ -1353,7 +1348,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) { } } - if (this.pool.options.headers) { + if (this.options.headers) { if (!version.hasHeaders()) { this._error('Peer doesn\'t support getheaders.'); this.ignore(); @@ -1361,7 +1356,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) { } } - if (this.pool.options.spv) { + if (this.options.spv) { if (!version.hasBloom()) { this._error('Peer does not support bip37.'); this.ignore(); @@ -1369,7 +1364,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) { } } - if (this.pool.options.witness) { + if (this.options.witness) { if (!version.hasWitness()) { if (!this.network.oldWitness) { this._error('Peer does not support segregated witness.'); @@ -1426,7 +1421,7 @@ Peer.prototype._handleMempool = function _handleMempool() { if (!this.pool.synced) return done(); - if (this.pool.options.selfish) + if (this.options.selfish) return done(); hashes = this.mempool.getSnapshot(); @@ -1463,7 +1458,7 @@ Peer.prototype._getItem = function _getItem(item, callback) { return callback(null, entry.msg); } - if (this.pool.options.selfish) + if (this.options.selfish) return callback(); if (item.isTX()) { @@ -1712,7 +1707,7 @@ Peer.prototype._handleGetAddr = function _handleGetAddr() { var items = []; var i, host; - if (this.pool.options.selfish) + if (this.options.selfish) return; if (this.sentAddr) { @@ -1993,7 +1988,7 @@ Peer.prototype._handleCmpctBlock = function _handleCmpctBlock(block) { var hash = block.hash('hex'); var result; - if (!this.pool.options.compact) { + if (!this.options.compact) { this.logger.info('Peer sent unsolicited cmpctblock (%s).', this.hostname); return; } @@ -2062,7 +2057,7 @@ Peer.prototype._handleGetBlockTxn = function _handleGetBlockTxn(req) { if (this.chain.db.options.prune) return done(); - if (this.pool.options.selfish) + if (this.options.selfish) return done(); item = new InvItem(constants.inv.BLOCK, req.hash); @@ -2405,7 +2400,7 @@ Peer.prototype.sync = function sync(callback) { this.syncSent = true; - if (this.pool.options.headers) { + if (this.options.headers) { if (!this.chain.tip.isGenesis()) tip = this.chain.tip.prevBlock; diff --git a/lib/net/pool.js b/lib/net/pool.js index a355d1d5..90d264b1 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -481,7 +481,7 @@ Pool.prototype.unlisten = function unlisten(callback) { */ Pool.prototype._handleLeech = function _handleLeech(socket) { - var hostname, addr; + var addr; if (!socket.remoteAddress) { this.logger.debug('Ignoring disconnected leech.'); @@ -489,23 +489,22 @@ Pool.prototype._handleLeech = function _handleLeech(socket) { return; } - hostname = IP.hostname(socket.remoteAddress, socket.remotePort); - addr = NetworkAddress.fromHostname(hostname, this.network); + addr = NetworkAddress.fromSocket(socket); if (this.peers.leeches.length >= this.maxLeeches) { - this.logger.debug('Ignoring leech: too many leeches (%s).', hostname); + this.logger.debug('Ignoring leech: too many leeches (%s).', addr.hostname); socket.destroy(); return; } if (this.hosts.isMisbehaving(addr)) { - this.logger.debug('Ignoring misbehaving leech (%s).', hostname); + this.logger.debug('Ignoring misbehaving leech (%s).', addr.hostname); socket.destroy(); return; } if (this.hosts.isIgnored(addr)) { - this.logger.debug('Ignoring leech (%s).', hostname); + this.logger.debug('Ignoring leech (%s).', addr.hostname); socket.destroy(); return; } @@ -513,12 +512,12 @@ Pool.prototype._handleLeech = function _handleLeech(socket) { // Some kind of weird port collision // between inbound ports and outbound ports. if (this.peers.get(addr)) { - this.logger.debug('Port collision (%s).', hostname); + this.logger.debug('Port collision (%s).', addr.hostname); socket.destroy(); return; } - this.addLeech(socket); + this.addLeech(addr, socket); }; /** @@ -630,7 +629,7 @@ Pool.prototype.addLoader = function addLoader() { return; } - peer = this.createPeer({ host: addr }); + peer = this.createPeer(addr); this.logger.info('Added loader peer (%s).', peer.hostname); @@ -1014,9 +1013,9 @@ Pool.prototype.sendAlert = function sendAlert(alert) { * @returns {Peer} */ -Pool.prototype.createPeer = function createPeer(options) { +Pool.prototype.createPeer = function createPeer(addr, socket) { var self = this; - var peer = new bcoin.peer(this, options); + var peer = new bcoin.peer(this, addr, socket); peer.once('close', function() { self.removePeer(peer); @@ -1337,14 +1336,14 @@ Pool.prototype._handleTX = function _handleTX(tx, peer, callback) { * @param {net.Socket} socket */ -Pool.prototype.addLeech = function addLeech(socket) { +Pool.prototype.addLeech = function addLeech(addr, socket) { var self = this; var peer; if (!this.loaded) return socket.destroy(); - peer = this.createPeer({ socket: socket }); + peer = this.createPeer(addr, socket); this.logger.info('Added leech peer (%s).', peer.hostname); @@ -1380,7 +1379,7 @@ Pool.prototype.addPeer = function addPeer() { if (!addr) return; - peer = this.createPeer({ host: addr }); + peer = this.createPeer(addr); this.peers.addPending(peer); diff --git a/lib/net/proxysocket.js b/lib/net/proxysocket.js index cf16e05c..0893cdc8 100644 --- a/lib/net/proxysocket.js +++ b/lib/net/proxysocket.js @@ -8,8 +8,6 @@ var EventEmitter = require('events').EventEmitter; var IOClient = require('socket.io-client'); function ProxySocket(uri) { - var self = this; - if (!(this instanceof ProxySocket)) return new ProxySocket(uri); @@ -20,9 +18,19 @@ function ProxySocket(uri) { this.socket = new IOClient(uri, { reconnection: false }); this.sendBuffer = []; this.snonce = null; + this.bytesWritten = 0; + this.bytesRead = 0; + this.remoteAddress = null; + this.remotePort = 0; this.closed = false; + this._init(); +}; + +ProxySocket.prototype._init = function _init() { + var self = this; + this.socket.on('info', function(info) { if (self.closed) return; @@ -48,6 +56,7 @@ function ProxySocket(uri) { }); this.socket.on('tcp data', function(data) { + self.bytesRead += data.length / 2; self.emit('data', new Buffer(data, 'hex')); }); @@ -78,6 +87,9 @@ ProxySocket.prototype.connect = function connect(port, host) { var nonce = 0; var i, pow; + this.remoteAddress = host; + this.remotePort = port; + if (this.closed) { this.sendBuffer.length = 0; return; @@ -116,11 +128,15 @@ ProxySocket.prototype.connect = function connect(port, host) { }; ProxySocket.prototype.write = function write(data) { + this.bytesWritten += data.length; + if (!this.info) { this.sendBuffer.push(data); return true; } + this.socket.emit('tcp data', data.toString('hex')); + return true; };