better loader management.

This commit is contained in:
Christopher Jeffrey 2016-01-19 17:52:29 -08:00
parent 8abaef49a9
commit c11a651e9d
2 changed files with 129 additions and 43 deletions

View File

@ -24,11 +24,11 @@ try {
* Peer
*/
function Peer(pool, createSocket, options) {
function Peer(pool, createConnection, options) {
var self = this;
if (!(this instanceof Peer))
return new Peer(pool, createSocket, options);
return new Peer(pool, createConnection, options);
EventEmitter.call(this);
@ -42,6 +42,7 @@ function Peer(pool, createSocket, options) {
this.version = null;
this.destroyed = false;
this.ack = false;
this.connected = false;
this.ts = this.options.ts || 0;
this.host = null;
@ -52,13 +53,13 @@ function Peer(pool, createSocket, options) {
if (this.options.backoff) {
setTimeout(function() {
self.socket = createSocket(self, pool);
self.socket = createConnection(self, pool, options);
if (!self.socket)
throw new Error('No socket');
self.emit('socket');
}, this.options.backoff);
} else {
this.socket = createSocket(this, pool);
this.socket = createConnection(this, pool, options);
if (!this.socket)
throw new Error('No socket');
}
@ -107,10 +108,12 @@ Peer.prototype._init = function init() {
self.host = self.socket.remoteAddress;
if (!self.port)
self.port = self.socket.remotePort;
self.emit('connect');
});
this.socket.once('error', function(err) {
self._error(err);
self.emit('misbehave');
});
this.socket.once('close', function() {
@ -131,6 +134,7 @@ Peer.prototype._init = function init() {
// Something is wrong here.
// Ignore this peer.
self.destroy();
self.emit('misbehave');
});
if (this.pool.options.fullNode) {
@ -152,12 +156,17 @@ Peer.prototype._init = function init() {
if (err) {
self._error(err);
self.destroy();
self.emit('misbehave');
return;
}
self.ack = true;
self.emit('ack');
self.ts = utils.now();
self._write(self.framer.packet('getaddr', []));
// if (self.pool.options.headers) {
// if (self.version.v > 70012)
// self._write(self.framer.packet('sendheaders', []));
// }
});
// Send hello
@ -291,8 +300,13 @@ Peer.prototype._error = function error(err) {
if (this.destroyed)
return;
if (typeof err === 'string')
err = new Error(err);
err.message += ' (' + this.host + ')';
this.destroy();
this.emit('error', typeof err === 'string' ? new Error(err) : err);
this.emit('error', err);
};
Peer.prototype._req = function _req(cmd, cb) {

View File

@ -37,9 +37,12 @@ function Pool(options) {
this.options.relay = this.options.relay == null
? (this.options.fullNode ? true : false)
: this.options.relay;
this.options.seeds = options.seeds || network.seeds;
this.setSeeds(this.options.seeds);
this.originalSeeds = (options.seeds || network.seeds).map(utils.parseHost);
this.setSeeds([]);
this._priorityTries = {};
this._regularTries = {};
this._misbehaving = {};
this.storage = this.options.storage;
this.destroyed = false;
@ -266,26 +269,32 @@ Pool.prototype._stopInterval = function _stopInterval() {
delete this._interval;
};
Pool.prototype.createConnection = function createConnection(peer, pool) {
Pool.prototype.createConnection = function createConnection(peer, pool, options) {
var addr, net, socket;
if (pool._createConnection)
return pool._createConnection(peer, pool);
addr = pool.usableSeed(true);
addr = pool.usableSeed(options.priority, true);
peer.host = addr.host;
peer.port = addr.port;
if (this._createSocket) {
socket = this._createSocket(addr.port, addr.host);
if (pool._createSocket) {
socket = pool._createSocket(addr.port, addr.host);
} else {
net = require('net');
socket = net.connect(addr.port, addr.host);
}
pool.emit('debug',
'Connecting to %s:%d (priority=%s)',
addr.host, addr.port, options.priority);
socket.on('connect', function() {
pool.emit('debug', 'Connected to %s:%d', addr.host, addr.port);
pool.emit('debug',
'Connected to %s:%d (priority=%s)',
addr.host, addr.port, options.priority);
});
return socket;
@ -301,7 +310,7 @@ Pool.prototype._addLoader = function _addLoader() {
if (this.peers.load != null)
return;
peer = this._createPeer(750 * Math.random());
peer = this._createPeer(750 * Math.random(), true);
peer.once('socket', function() {
self.emit('debug', 'Added loader peer: %s', peer.host);
@ -658,13 +667,14 @@ Pool.prototype.loadMempool = function loadMempool() {
});
};
Pool.prototype._createPeer = function _createPeer(backoff) {
Pool.prototype._createPeer = function _createPeer(backoff, priority) {
var self = this;
var peer = new bcoin.peer(this, this.createConnection, {
backoff: backoff,
startHeight: this.options.startHeight,
relay: this.options.relay
relay: this.options.relay,
priority: priority
});
peer._retry = 0;
@ -678,15 +688,19 @@ Pool.prototype._createPeer = function _createPeer(backoff) {
self.emit.apply(self, ['debug'].concat(args));
});
peer.once('misbehave', function() {
self._misbehaving[peer.host] = true;
});
peer.on('reject', function(payload) {
payload.data = utils.revHex(utils.toHex(payload.data));
var data = utils.revHex(utils.toHex(payload.data));
self.emit('debug',
'Reject: msg=%s ccode=%s reason=%s data=%s',
payload.message,
payload.ccode,
payload.reason,
payload.data);
data);
self.emit('reject', payload, peer);
});
@ -709,7 +723,7 @@ Pool.prototype._createPeer = function _createPeer(backoff) {
peer.on('addr', function(data) {
if (self.seeds.length > 1000)
self.setSeeds(self.options.seeds.concat(self.seeds.slice(-500)));
self.setSeeds(self.seeds.slice(-500));
self.addSeed(data);
@ -724,6 +738,9 @@ Pool.prototype._createPeer = function _createPeer(backoff) {
if (version.height > self.block.bestHeight)
self.block.bestHeight = version.height;
self.emit('version', version, peer);
self.emit('debug',
'Received version from %s: version=%d height=%d agent=%s',
peer.host, peer.version.v, peer.version.height, peer.version.agent);
});
return peer;
@ -744,7 +761,7 @@ Pool.prototype._addPeer = function _addPeer(backoff) {
return;
}
peer = this._createPeer(backoff);
peer = this._createPeer(backoff, false);
this.peers.pending.push(peer);
this.peers.all.push(peer);
@ -1477,40 +1494,95 @@ Pool.prototype.getPeer = function getPeer(addr) {
}
};
Pool.prototype.usableSeed = function usableSeed(addrs, force) {
Pool.prototype.usableSeed = function usableSeed(priority, connecting) {
var i, addr;
var original = this.originalSeeds;
var seeds = this.seeds;
var tries = priority ? this._priorityTries : this._regularTries;
if (typeof addrs === 'boolean') {
force = addrs;
addrs = null;
// Hang back if we don't have a loader peer yet.
if (!connecting && !priority && (!this.peers.load || !this.peers.load.socket))
return;
// Randomize the non-original peers.
seeds = seeds.slice().sort(function() {
return Math.random() > 0.50 ? 1 : -1;
});
// Try to avoid connecting to a peer twice.
// Try the original peers first.
for (i = 0; i < original.length; i++) {
addr = original[i];
assert(addr.host);
if (this.getPeer(addr))
continue;
if (this._misbehaving[addr.host])
continue;
if (tries[addr.host])
continue;
if (connecting)
tries[addr.host] = true;
return addr;
}
if (!addrs)
addrs = this.seeds;
assert(addrs.length);
if (this.peers.load) {
addrs = addrs.slice().sort(function() {
return Math.random() > 0.50 ? 1 : -1;
});
for (i = 0; i < addrs.length; i++) {
if (!this.getPeer(addrs[i]))
return addrs[i];
// If we are a priority socket, try to find a
// peer this time with looser requirements.
if (priority) {
for (i = 0; i < original.length; i++) {
addr = original[i];
assert(addr.host);
if (this.peers.load && this.getPeer(addr) === this.peers.load)
continue;
if (this._misbehaving[addr.host])
continue;
if (tries[addr.host])
continue;
if (connecting)
tries[addr.host] = true;
return addr;
}
}
if (addrs.length === 1)
return addrs[0];
// Try the rest of the peers second.
for (i = 0; i < seeds.length; i++) {
addr = seeds[i];
assert(addr.host);
if (this.getPeer(addr))
continue;
if (this._misbehaving[addr.host])
continue;
return addr;
}
if (!force)
return;
// If we are a priority socket, try to find a
// peer this time with looser requirements.
if (priority) {
for (i = 0; i < seeds.length; i++) {
addr = seeds[i];
assert(addr.host);
if (this.peers.load && this.getPeer(addr) === this.peers.load)
continue;
if (this._misbehaving[addr.host])
continue;
return addr;
}
}
do {
addr = addrs[Math.random() * (addrs.length - 1) | 0];
} while (this.peers.load && this.getPeer(addr) === this.peers.load);
// If we have no block peers, always return
// an address.
if (!priority) {
if (this.peers.pending.length + this.peers.block.length === 0)
return original[Math.random() * (original.length - 1) | 0];
}
return addr;
// This should never happen: priority sockets
// should _always_ get an address.
if (priority) {
this.emit('debug',
'We had to connect to a random peer. Something is not right.');
return seeds[Math.random() * (seeds.length - 1) | 0];
}
};
Pool.prototype.setSeeds = function setSeeds(seeds) {