pool: more peer management.

This commit is contained in:
Christopher Jeffrey 2016-08-25 19:42:18 -07:00
parent 0ff93dc705
commit fa366e56fd
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 38 additions and 50 deletions

View File

@ -300,7 +300,7 @@ RPC.prototype.getinfo = function getinfo(args, callback) {
connections: self.pool.peers.all.length,
proxy: '',
difficulty: self._getDifficulty(),
testnet: self.network.type !== 'main',
testnet: self.network.type !== bcoin.network.main,
keypoololdest: 0,
keypoolsize: 0,
unlocked_until: self.wallet.master.until,
@ -447,7 +447,7 @@ RPC.prototype.getaddednodeinfo = function getaddednodeinfo(args, callback) {
addresses: [
{
address: peer.hostname,
connected: peer.type !== bcoin.peer.types.LEECH
connected: peer.outbound
? 'outbound'
: 'inbound'
}
@ -499,7 +499,7 @@ RPC.prototype.getpeerinfo = function getpeerinfo(args, callback) {
id: peer.id,
addr: peer.hostname,
addrlocal: peer.hostname,
relaytxes: peer.type !== bcoin.peer.types.LEECH,
relaytxes: peer.outbound,
lastsend: peer.lastSend / 1000 | 0,
lastrecv: peer.lastRecv / 1000 | 0,
bytessent: peer.socket.bytesWritten,
@ -510,7 +510,7 @@ RPC.prototype.getpeerinfo = function getpeerinfo(args, callback) {
minping: peer.minPing !== -1 ? peer.minPing / 1000 : 0,
version: peer.version ? peer.version.version : 0,
subver: peer.version ? peer.version.agent : '',
inbound: peer.type === bcoin.peer.types.LEECH,
inbound: !peer.outbound,
startingheight: peer.version ? peer.version.height : -1,
banscore: peer.banScore,
inflight: [],
@ -1597,7 +1597,7 @@ RPC.prototype.getmininginfo = function getmininginfo(args, callback) {
genproclimit: self.proclimit,
networkhashps: hashps,
pooledtx: self.mempool.total,
testnet: self.network.type !== 'main',
testnet: self.network.type !== bcoin.network.main,
chain: self.network.type,
generate: self.mining
});

View File

@ -77,13 +77,11 @@ function Peer(pool, options) {
if (!options)
options = {};
assert(typeof options.type === 'number', 'Peer must have a type.');
this.options = options;
this.pool = pool;
this.logger = pool.logger;
this.type = options.type;
this.socket = null;
this.outbound = false;
this.host = null;
this.port = 0;
this.hostname = null;
@ -143,10 +141,12 @@ function Peer(pool, options) {
this.host = IP.normalize(this.socket.remoteAddress);
this.port = this.socket.remotePort;
this.connected = true;
this.outbound = false;
} else if (options.host) {
this.host = options.host.host;
this.port = options.host.port;
this.socket = this.createSocket(this.port, this.host);
this.outbound = true;
} else {
assert(false, 'No seed or socket.');
}
@ -163,7 +163,7 @@ function Peer(pool, options) {
this.bip150 = new bcoin.bip150(
this.bip151,
this.hostname,
this.type !== Peer.types.LEECH,
this.outbound,
this.pool.auth,
this.pool.identityKey);
this.bip151.bip150 = this.bip150;
@ -186,18 +186,6 @@ utils.inherits(Peer, EventEmitter);
Peer.uid = 0;
/**
* Peer types.
* @enum {Number}
* @default
*/
Peer.types = {
LOADER: 0,
REGULAR: 1,
LEECH: 2
};
/**
* Begin peer initialization.
* @private
@ -443,7 +431,7 @@ Peer.prototype._onAck = function _onAck(err) {
// Ask for the mempool if we're synced.
if (this.network.requestMempool) {
if (this.type === Peer.types.LOADER && this.pool.synced)
if (this.isLoader() && this.pool.synced)
this.sendMempool();
}
@ -493,6 +481,15 @@ Peer.prototype.createSocket = function createSocket(port, host) {
return socket;
};
/**
* Test whether the peer is the loader peer.
* @returns {Boolean}
*/
Peer.prototype.isLoader = function isLoader() {
return this === this.pool.peers.load;
};
/**
* Broadcast items to peer (transactions or blocks).
* @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items
@ -1348,7 +1345,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) {
return;
}
if (this.type !== Peer.types.LEECH) {
if (this.outbound) {
if (!version.hasNetwork()) {
this._error('Peer does not support network services.');
this.ignore();
@ -2401,7 +2398,7 @@ Peer.prototype.sync = function sync(callback) {
if (!this.version.hasNetwork())
return;
if (this.type !== Peer.types.LOADER) {
if (!this.isLoader()) {
if (!this.chain.isFull())
return;
}

View File

@ -615,17 +615,14 @@ Pool.prototype.addLoader = function addLoader() {
if (peer) {
this.peers.repurpose(peer);
this.logger.info('Repurposed loader peer (%s).', peer.hostname);
this.logger.info('Repurposed peer for loader (%s).', peer.hostname);
utils.nextTick(function() {
self.emit('loader', peer);
});
return;
}
peer = this.createPeer({
host: addr,
type: bcoin.peer.types.LOADER
});
peer = this.createPeer({ host: addr });
this.logger.info('Added loader peer (%s).', peer.hostname);
@ -725,7 +722,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback)
this.emit('headers', headers);
if (peer === this.peers.load) {
if (peer.isLoader()) {
// Reset interval to avoid stall behavior.
this.startInterval();
// Reset timeout to avoid killing the loader.
@ -794,7 +791,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
this.emit('blocks', hashes);
if (peer === this.peers.load) {
if (peer.isLoader()) {
// Reset interval to avoid stall behavior.
this.startInterval();
// Reset timeout to avoid killing the loader.
@ -865,7 +862,7 @@ Pool.prototype._handleInv = function _handleInv(hashes, peer, callback) {
return;
// Ignore for now if we're still syncing
if (!this.synced && peer !== this.peers.load)
if (!this.synced && !peer.isLoader())
return callback();
if (!this.options.headers)
@ -1019,7 +1016,7 @@ Pool.prototype.createPeer = function createPeer(options) {
if (!self.loaded)
return;
if (peer.type !== bcoin.peer.types.LOADER) {
if (!peer.isLoader()) {
self.fillPeers();
return;
}
@ -1054,7 +1051,7 @@ Pool.prototype.createPeer = function createPeer(options) {
if (err)
return self.emit('error', err);
if (peer.type === bcoin.peer.types.LOADER) {
if (peer.isLoader()) {
self.startInterval();
self.startTimer();
}
@ -1074,7 +1071,7 @@ Pool.prototype.createPeer = function createPeer(options) {
if (err)
return self.emit('error', err);
if (peer.type === bcoin.peer.types.LOADER) {
if (peer.isLoader()) {
self.startInterval();
self.startTimer();
}
@ -1252,7 +1249,7 @@ Pool.prototype._handleAlert = function _handleAlert(alert, peer) {
}
// Keep alert disabled on main.
if (this.network.type === 'main') {
if (this.network === bcoin.network.main) {
// https://github.com/bitcoin/bitcoin/pull/7692#issuecomment-197967429
this.logger.warning('The Japanese government sent an alert packet.');
this.logger.warning('Here is their IP: %s.', peer.hostname);
@ -1339,10 +1336,7 @@ Pool.prototype.addLeech = function addLeech(socket) {
if (!this.loaded)
return socket.destroy();
peer = this.createPeer({
socket: socket,
type: bcoin.peer.types.LEECH
});
peer = this.createPeer({ socket: socket });
this.logger.info('Added leech peer (%s).', peer.hostname);
@ -1378,10 +1372,7 @@ Pool.prototype.addPeer = function addPeer() {
if (!addr)
return;
peer = this.createPeer({
host: addr,
type: bcoin.peer.types.REGULAR
});
peer = this.createPeer({ host: addr });
this.peers.addPending(peer);
@ -1939,18 +1930,17 @@ PeerList.prototype.remove = function remove(peer) {
assert(this.map[peer.hostname]);
delete this.map[peer.hostname];
if (this.load === peer) {
if (peer.isLoader()) {
this.pool.logger.info('Removed loader peer (%s).', peer.hostname);
this.load = null;
}
};
PeerList.prototype.repurpose = function repurpose(peer) {
assert(peer.type === bcoin.peer.types.REGULAR);
assert(peer.outbound);
assert(!this.load);
utils.binaryRemove(this.pending, peer, compare);
utils.binaryRemove(this.regular, peer, compare);
peer.type = bcoin.peer.types.LOADER;
assert(!this.load);
this.load = peer;
};
@ -1961,12 +1951,14 @@ PeerList.prototype.isFull = function isFull() {
PeerList.prototype.addLeech = function addLeech(peer) {
this.leeches.push(peer);
this.all.push(peer);
assert(!this.map[peer.hostname]);
this.map[peer.hostname] = peer;
};
PeerList.prototype.addLoader = function addLoader(peer) {
this.load = peer;
this.all.push(peer);
assert(!this.map[peer.hostname]);
this.map[peer.hostname] = peer;
};
@ -2269,8 +2261,7 @@ LoadRequest.prototype.destroy = function destroy() {
*/
LoadRequest.prototype._onTimeout = function _onTimeout() {
if (this.type !== this.pool.txType
&& this.peer === this.pool.peers.load) {
if (this.type !== this.pool.txType && this.peer.isLoader()) {
this.pool.logger.debug(
'Loader took too long serving a block. Finding a new one.');
this.peer.destroy();