From c11a651e9dc5fc30455b29376461b11f5a93d8eb Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Tue, 19 Jan 2016 17:52:29 -0800 Subject: [PATCH] better loader management. --- lib/bcoin/peer.js | 24 ++++++-- lib/bcoin/pool.js | 148 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 129 insertions(+), 43 deletions(-) diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 56fcead8..cb45da20 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -24,11 +24,11 @@ try { * Peer */ -function Peer(pool, createSocket, options) { +function Peer(pool, createConnection, options) { var self = this; if (!(this instanceof Peer)) - return new Peer(pool, createSocket, options); + return new Peer(pool, createConnection, options); EventEmitter.call(this); @@ -42,6 +42,7 @@ function Peer(pool, createSocket, options) { this.version = null; this.destroyed = false; this.ack = false; + this.connected = false; this.ts = this.options.ts || 0; this.host = null; @@ -52,13 +53,13 @@ function Peer(pool, createSocket, options) { if (this.options.backoff) { setTimeout(function() { - self.socket = createSocket(self, pool); + self.socket = createConnection(self, pool, options); if (!self.socket) throw new Error('No socket'); self.emit('socket'); }, this.options.backoff); } else { - this.socket = createSocket(this, pool); + this.socket = createConnection(this, pool, options); if (!this.socket) throw new Error('No socket'); } @@ -107,10 +108,12 @@ Peer.prototype._init = function init() { self.host = self.socket.remoteAddress; if (!self.port) self.port = self.socket.remotePort; + self.emit('connect'); }); this.socket.once('error', function(err) { self._error(err); + self.emit('misbehave'); }); this.socket.once('close', function() { @@ -131,6 +134,7 @@ Peer.prototype._init = function init() { // Something is wrong here. // Ignore this peer. self.destroy(); + self.emit('misbehave'); }); if (this.pool.options.fullNode) { @@ -152,12 +156,17 @@ Peer.prototype._init = function init() { if (err) { self._error(err); self.destroy(); + self.emit('misbehave'); return; } self.ack = true; self.emit('ack'); self.ts = utils.now(); self._write(self.framer.packet('getaddr', [])); + // if (self.pool.options.headers) { + // if (self.version.v > 70012) + // self._write(self.framer.packet('sendheaders', [])); + // } }); // Send hello @@ -291,8 +300,13 @@ Peer.prototype._error = function error(err) { if (this.destroyed) return; + if (typeof err === 'string') + err = new Error(err); + + err.message += ' (' + this.host + ')'; + this.destroy(); - this.emit('error', typeof err === 'string' ? new Error(err) : err); + this.emit('error', err); }; Peer.prototype._req = function _req(cmd, cb) { diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 04ec7ebf..b423422c 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -37,9 +37,12 @@ function Pool(options) { this.options.relay = this.options.relay == null ? (this.options.fullNode ? true : false) : this.options.relay; - this.options.seeds = options.seeds || network.seeds; - this.setSeeds(this.options.seeds); + this.originalSeeds = (options.seeds || network.seeds).map(utils.parseHost); + this.setSeeds([]); + this._priorityTries = {}; + this._regularTries = {}; + this._misbehaving = {}; this.storage = this.options.storage; this.destroyed = false; @@ -266,26 +269,32 @@ Pool.prototype._stopInterval = function _stopInterval() { delete this._interval; }; -Pool.prototype.createConnection = function createConnection(peer, pool) { +Pool.prototype.createConnection = function createConnection(peer, pool, options) { var addr, net, socket; if (pool._createConnection) return pool._createConnection(peer, pool); - addr = pool.usableSeed(true); + addr = pool.usableSeed(options.priority, true); peer.host = addr.host; peer.port = addr.port; - if (this._createSocket) { - socket = this._createSocket(addr.port, addr.host); + if (pool._createSocket) { + socket = pool._createSocket(addr.port, addr.host); } else { net = require('net'); socket = net.connect(addr.port, addr.host); } + pool.emit('debug', + 'Connecting to %s:%d (priority=%s)', + addr.host, addr.port, options.priority); + socket.on('connect', function() { - pool.emit('debug', 'Connected to %s:%d', addr.host, addr.port); + pool.emit('debug', + 'Connected to %s:%d (priority=%s)', + addr.host, addr.port, options.priority); }); return socket; @@ -301,7 +310,7 @@ Pool.prototype._addLoader = function _addLoader() { if (this.peers.load != null) return; - peer = this._createPeer(750 * Math.random()); + peer = this._createPeer(750 * Math.random(), true); peer.once('socket', function() { self.emit('debug', 'Added loader peer: %s', peer.host); @@ -658,13 +667,14 @@ Pool.prototype.loadMempool = function loadMempool() { }); }; -Pool.prototype._createPeer = function _createPeer(backoff) { +Pool.prototype._createPeer = function _createPeer(backoff, priority) { var self = this; var peer = new bcoin.peer(this, this.createConnection, { backoff: backoff, startHeight: this.options.startHeight, - relay: this.options.relay + relay: this.options.relay, + priority: priority }); peer._retry = 0; @@ -678,15 +688,19 @@ Pool.prototype._createPeer = function _createPeer(backoff) { self.emit.apply(self, ['debug'].concat(args)); }); + peer.once('misbehave', function() { + self._misbehaving[peer.host] = true; + }); + peer.on('reject', function(payload) { - payload.data = utils.revHex(utils.toHex(payload.data)); + var data = utils.revHex(utils.toHex(payload.data)); self.emit('debug', 'Reject: msg=%s ccode=%s reason=%s data=%s', payload.message, payload.ccode, payload.reason, - payload.data); + data); self.emit('reject', payload, peer); }); @@ -709,7 +723,7 @@ Pool.prototype._createPeer = function _createPeer(backoff) { peer.on('addr', function(data) { if (self.seeds.length > 1000) - self.setSeeds(self.options.seeds.concat(self.seeds.slice(-500))); + self.setSeeds(self.seeds.slice(-500)); self.addSeed(data); @@ -724,6 +738,9 @@ Pool.prototype._createPeer = function _createPeer(backoff) { if (version.height > self.block.bestHeight) self.block.bestHeight = version.height; self.emit('version', version, peer); + self.emit('debug', + 'Received version from %s: version=%d height=%d agent=%s', + peer.host, peer.version.v, peer.version.height, peer.version.agent); }); return peer; @@ -744,7 +761,7 @@ Pool.prototype._addPeer = function _addPeer(backoff) { return; } - peer = this._createPeer(backoff); + peer = this._createPeer(backoff, false); this.peers.pending.push(peer); this.peers.all.push(peer); @@ -1477,40 +1494,95 @@ Pool.prototype.getPeer = function getPeer(addr) { } }; -Pool.prototype.usableSeed = function usableSeed(addrs, force) { +Pool.prototype.usableSeed = function usableSeed(priority, connecting) { var i, addr; + var original = this.originalSeeds; + var seeds = this.seeds; + var tries = priority ? this._priorityTries : this._regularTries; - if (typeof addrs === 'boolean') { - force = addrs; - addrs = null; + // Hang back if we don't have a loader peer yet. + if (!connecting && !priority && (!this.peers.load || !this.peers.load.socket)) + return; + + // Randomize the non-original peers. + seeds = seeds.slice().sort(function() { + return Math.random() > 0.50 ? 1 : -1; + }); + + // Try to avoid connecting to a peer twice. + // Try the original peers first. + for (i = 0; i < original.length; i++) { + addr = original[i]; + assert(addr.host); + if (this.getPeer(addr)) + continue; + if (this._misbehaving[addr.host]) + continue; + if (tries[addr.host]) + continue; + if (connecting) + tries[addr.host] = true; + return addr; } - if (!addrs) - addrs = this.seeds; - - assert(addrs.length); - - if (this.peers.load) { - addrs = addrs.slice().sort(function() { - return Math.random() > 0.50 ? 1 : -1; - }); - for (i = 0; i < addrs.length; i++) { - if (!this.getPeer(addrs[i])) - return addrs[i]; + // If we are a priority socket, try to find a + // peer this time with looser requirements. + if (priority) { + for (i = 0; i < original.length; i++) { + addr = original[i]; + assert(addr.host); + if (this.peers.load && this.getPeer(addr) === this.peers.load) + continue; + if (this._misbehaving[addr.host]) + continue; + if (tries[addr.host]) + continue; + if (connecting) + tries[addr.host] = true; + return addr; } } - if (addrs.length === 1) - return addrs[0]; + // Try the rest of the peers second. + for (i = 0; i < seeds.length; i++) { + addr = seeds[i]; + assert(addr.host); + if (this.getPeer(addr)) + continue; + if (this._misbehaving[addr.host]) + continue; + return addr; + } - if (!force) - return; + // If we are a priority socket, try to find a + // peer this time with looser requirements. + if (priority) { + for (i = 0; i < seeds.length; i++) { + addr = seeds[i]; + assert(addr.host); + if (this.peers.load && this.getPeer(addr) === this.peers.load) + continue; + if (this._misbehaving[addr.host]) + continue; + return addr; + } + } - do { - addr = addrs[Math.random() * (addrs.length - 1) | 0]; - } while (this.peers.load && this.getPeer(addr) === this.peers.load); + // If we have no block peers, always return + // an address. + if (!priority) { + if (this.peers.pending.length + this.peers.block.length === 0) + return original[Math.random() * (original.length - 1) | 0]; + } - return addr; + // This should never happen: priority sockets + // should _always_ get an address. + if (priority) { + this.emit('debug', + 'We had to connect to a random peer. Something is not right.'); + + return seeds[Math.random() * (seeds.length - 1) | 0]; + } }; Pool.prototype.setSeeds = function setSeeds(seeds) {