pool: more refactoring.

This commit is contained in:
Christopher Jeffrey 2016-08-26 00:57:07 -07:00
parent d21aadf6b3
commit 54dd7c4eb3
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
3 changed files with 126 additions and 106 deletions

View File

@ -102,6 +102,8 @@ Framer.prototype.verack = function verack() {
*/
Framer.prototype.ping = function ping(nonce) {
if (!nonce)
return this.packet('ping', DUMMY);
return this.packet('ping', framePing(nonce));
};

View File

@ -27,17 +27,13 @@ var GetUTXOsPacket = bcoin.packets.GetUTXOsPacket;
* @exports Peer
* @constructor
* @param {Pool} pool
* @param {Object} options
* @param {Function?} options.createSocket - Callback which returns a
* node.js-like socket object. Necessary for browser.
* @param {Chain} options.chain
* @param {Mempool} options.mempool
* @param {Number?} options.ts - Time at which peer was discovered (unix time).
* @param {net.Socket?} options.socket
* @param {NetworkAddress?} options.host - Host to connect to.
* @param {NetworkAddress} addr
* @param {net.Socket?} socket
* @property {Pool} pool
* @property {net.Socket?} socket
* @property {String?} host
* @property {String} host
* @property {Number} port
* @property {String} hostname
* @property {Number} port
* @property {Parser} parser
* @property {Framer} framer
@ -68,9 +64,9 @@ var GetUTXOsPacket = bcoin.packets.GetUTXOsPacket;
* @emits Peer#ack
*/
function Peer(pool, host, socket) {
function Peer(pool, addr, socket) {
if (!(this instanceof Peer))
return new Peer(pool, host, socket);
return new Peer(pool, addr, socket);
EventEmitter.call(this);
@ -103,7 +99,7 @@ function Peer(pool, host, socket) {
this.lastBlock = null;
this.waiting = 0;
this.syncSent = false;
this._connectTimeout = null;
this.connectTimeout = null;
this.compactMode = null;
this.compactBlocks = {};
this.sentAddr = false;
@ -119,7 +115,7 @@ function Peer(pool, host, socket) {
this.banScore = 0;
this.pingTimer = null;
this.pingTimeout = null;
this.pingInterval = 120000;
this.requestTimeout = 10000;
@ -133,11 +129,11 @@ function Peer(pool, host, socket) {
this.setMaxListeners(10000);
assert(host, 'Host required.');
assert(addr, 'Host required.');
this.host = host.host;
this.port = host.port;
this.hostname = host.hostname;
this.host = addr.host;
this.port = addr.port;
this.hostname = addr.hostname;
if (!socket) {
this.socket = this.connect(this.port, this.host);
@ -189,7 +185,7 @@ Peer.prototype._init = function init() {
});
this.socket.once('error', function(err) {
self._error(err);
self.error(err);
switch (err.code) {
case 'ECONNREFUSED':
@ -203,7 +199,7 @@ Peer.prototype._init = function init() {
});
this.socket.once('close', function() {
self._error('socket hangup');
self.error('socket hangup');
});
this.socket.on('data', function(chunk) {
@ -215,14 +211,14 @@ Peer.prototype._init = function init() {
});
this.parser.on('error', function(err) {
self._error(err, true);
self.error(err, true);
self.reject(null, 'malformed', 'error parsing message', 10);
});
if (this.bip151) {
this.bip151.on('error', function(err) {
self.reject(null, 'malformed', 'error parsing message', 10);
self._error(err, true);
self.error(err, true);
});
this.bip151.on('rekey', function() {
self.logger.debug('Rekeying with peer (%s).', self.hostname);
@ -260,9 +256,9 @@ Peer.prototype._onConnect = function _onConnect() {
this.emit('connect');
if (this._connectTimeout != null) {
clearTimeout(this._connectTimeout);
this._connectTimeout = null;
if (this.connectTimeout != null) {
clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
// Send encinit. Wait for handshake to complete.
@ -272,7 +268,7 @@ Peer.prototype._onConnect = function _onConnect() {
this.write(this.framer.encinit(this.bip151.toEncinit()));
return this.bip151.wait(3000, function(err) {
if (err)
self._error(err, true);
self.error(err, true);
self._onBIP151();
});
}
@ -300,19 +296,19 @@ Peer.prototype._onBIP151 = function _onBIP151() {
assert(!this.bip150.completed);
if (!this.bip151.handshake)
return this._error('BIP151 handshake was not completed for BIP150.');
return this.error('BIP151 handshake was not completed for BIP150.');
this.logger.info('Attempting BIP150 handshake (%s).', this.hostname);
if (this.bip150.outbound) {
if (!this.bip150.peerIdentity)
return this._error('No known identity for peer.');
return this.error('No known identity for peer.');
this.write(this.framer.authChallenge(this.bip150.toChallenge()));
}
return this.bip150.wait(3000, function(err) {
if (err)
return self._error(err);
return self.error(err);
self._onHandshake();
});
}
@ -362,7 +358,7 @@ Peer.prototype._onAck = function _onAck(err) {
var self = this;
if (err) {
this._error(err);
this.error(err);
return;
}
@ -378,7 +374,7 @@ Peer.prototype._onAck = function _onAck(err) {
this.ack = true;
// Setup the ping interval.
this.pingTimer = setInterval(function() {
this.pingTimeout = setInterval(function() {
self.sendPing();
}, this.pingInterval);
@ -463,8 +459,8 @@ Peer.prototype.connect = function connect(port, host) {
self.logger.info('Connected to %s.', self.hostname);
});
this._connectTimeout = setTimeout(function() {
self._error('Connection timed out.');
this.connectTimeout = setTimeout(function() {
self.error('Connection timed out.');
self.ignore();
}, 10000);
@ -633,7 +629,7 @@ Peer.prototype.sendPing = function sendPing() {
return;
if (this.version.version <= 60000) {
this.write(this.framer.packet('ping', new Buffer([])));
this.write(this.framer.ping());
return;
}
@ -712,14 +708,14 @@ Peer.prototype.destroy = function destroy() {
if (this.bip150)
this.bip150.destroy();
if (this.pingTimer != null) {
clearInterval(this.pingTimer);
this.pingTimer = null;
if (this.pingTimeout != null) {
clearInterval(this.pingTimeout);
this.pingTimeout = null;
}
if (this._connectTimeout != null) {
clearTimeout(this._connectTimeout);
this._connectTimeout = null;
if (this.connectTimeout != null) {
clearTimeout(this.connectTimeout);
this.connectTimeout = null;
}
keys = Object.keys(this.requestMap);
@ -756,7 +752,7 @@ Peer.prototype.write = function write(chunk) {
* @param {String|Error} err
*/
Peer.prototype._error = function error(err, keep) {
Peer.prototype.error = function error(err, keep) {
if (this.destroyed)
return;
@ -783,7 +779,7 @@ Peer.prototype.request = function request(cmd, callback) {
var entry;
if (this.destroyed)
return utils.asyncify(callback)(new Error('Destroyed, sorry'));
return callback(new Error('Destroyed'));
entry = new RequestEntry(this, cmd, callback);
@ -1328,21 +1324,21 @@ Peer.prototype._handleVersion = function _handleVersion(version) {
if (!this.network.selfConnect) {
if (version.nonce.cmp(this.pool.localNonce) === 0) {
this._error('We connected to ourself. Oops.');
this.error('We connected to ourself. Oops.');
this.ignore();
return;
}
}
if (version.version < constants.MIN_VERSION) {
this._error('Peer doesn\'t support required protocol version.');
this.error('Peer does not support required protocol version.');
this.ignore();
return;
}
if (this.outbound) {
if (!version.hasNetwork()) {
this._error('Peer does not support network services.');
this.error('Peer does not support network services.');
this.ignore();
return;
}
@ -1350,7 +1346,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) {
if (this.options.headers) {
if (!version.hasHeaders()) {
this._error('Peer doesn\'t support getheaders.');
this.error('Peer does not support getheaders.');
this.ignore();
return;
}
@ -1358,7 +1354,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) {
if (this.options.spv) {
if (!version.hasBloom()) {
this._error('Peer does not support bip37.');
this.error('Peer does not support BIP37.');
this.ignore();
return;
}
@ -1367,13 +1363,13 @@ Peer.prototype._handleVersion = function _handleVersion(version) {
if (this.options.witness) {
if (!version.hasWitness()) {
if (!this.network.oldWitness) {
this._error('Peer does not support segregated witness.');
this.error('Peer does not support segregated witness.');
this.ignore();
return;
}
this.request('havewitness', function(err) {
if (err) {
self._error('Peer does not support segregated witness.');
self.error('Peer does not support segregated witness.');
self.ignore();
}
});
@ -1499,7 +1495,7 @@ Peer.prototype._handleGetData = function _handleGetData(items) {
}
if (items.length > 50000) {
this._error('getdata size too large (%s).', items.length);
this.error('getdata size too large (%s).', items.length);
return done();
}
@ -1705,7 +1701,7 @@ Peer.prototype._handlePong = function _handlePong(nonce) {
Peer.prototype._handleGetAddr = function _handleGetAddr() {
var items = [];
var i, host;
var i, addr;
if (this.options.selfish)
return;
@ -1718,15 +1714,15 @@ Peer.prototype._handleGetAddr = function _handleGetAddr() {
this.sentAddr = true;
for (i = 0; i < this.pool.hosts.items.length; i++) {
host = this.pool.hosts.items[i];
addr = this.pool.hosts.items[i];
if (!host.isIP())
if (!addr.isIP())
continue;
if (!this.addrFilter.added(host.host, 'ascii'))
if (!this.addrFilter.added(addr.host, 'ascii'))
continue;
items.push(host);
items.push(addr);
if (items.length === 1000)
break;
@ -1848,7 +1844,7 @@ Peer.prototype._handleEncinit = function _handleEncinit(data) {
try {
this.bip151.encinit(data);
} catch (e) {
this._error(e);
this.error(e);
return;
}
@ -1870,7 +1866,7 @@ Peer.prototype._handleEncack = function _handleEncack(data) {
try {
this.bip151.encack(data);
} catch (e) {
this._error(e);
this.error(e);
return;
}
@ -1892,7 +1888,7 @@ Peer.prototype._handleAuthChallenge = function _handleAuthChallenge(data) {
try {
result = this.bip150.challenge(data);
} catch (e) {
this._error(e);
this.error(e);
return;
}
@ -1916,7 +1912,7 @@ Peer.prototype._handleAuthReply = function _handleAuthReply(data) {
try {
result = this.bip150.reply(data);
} catch (e) {
this._error(e);
this.error(e);
return;
}
@ -1941,7 +1937,7 @@ Peer.prototype._handleAuthPropose = function _handleAuthPropose(data) {
try {
result = this.bip150.propose(data);
} catch (e) {
this._error(e);
this.error(e);
return;
}
@ -2420,6 +2416,7 @@ Peer.prototype.inspect = function inspect() {
+ ' id=' + this.id
+ ' ack=' + this.ack
+ ' host=' + this.hostname
+ ' outbound=' + this.outbound
+ ' ping=' + this.minPing
+ '>';
};

View File

@ -141,7 +141,7 @@ function Pool(options) {
this.scheduled = false;
this.pendingWatch = null;
this.timer = null;
this.timeout = null;
this.interval = null;
this._initOptions();
@ -274,7 +274,7 @@ Pool.prototype._init = function _init() {
});
this.chain.on('full', function() {
self.stopTimer();
self.stopTimeout();
self.stopInterval();
if (!self.synced) {
@ -376,7 +376,12 @@ Pool.prototype._close = function close(callback) {
this.peers.destroy();
this.stopInterval();
this.stopTimer();
this.stopTimeout();
if (this.pendingWatch != null) {
clearTimeout(this.pendingWatch);
this.pendingWatch = null;
}
this.unlisten(callback);
};
@ -521,15 +526,13 @@ Pool.prototype._handleLeech = function _handleLeech(socket) {
};
/**
* Start timer to detect stalling.
* Start timeout to detect stalling.
* @private
*/
Pool.prototype.startTimer = function startTimer() {
Pool.prototype.startTimeout = function startTimeout() {
var self = this;
this.stopTimer();
function destroy() {
if (!self.syncing)
return;
@ -543,25 +546,27 @@ Pool.prototype.startTimer = function startTimer() {
}
}
this.timer = setTimeout(destroy, this.loadTimeout);
this.stopTimeout();
this.timeout = setTimeout(destroy, this.loadTimeout);
};
/**
* Stop the stall timer (done on chain sync).
* Stop the stall timeout (done on chain sync).
* @private
*/
Pool.prototype.stopTimer = function stopTimer() {
if (this.timer != null) {
clearTimeout(this.timer);
this.timer = null;
Pool.prototype.stopTimeout = function stopTimeout() {
if (this.timeout != null) {
clearTimeout(this.timeout);
this.timeout = null;
}
};
/**
* Start the stall interval (shorter than the
* stall timer, inteded to give warnings and
* reset the stall *timer* if the chain is
* stall timeout, inteded to give warnings and
* reset the stall *timeout* if the chain is
* busy). Stopped on chain sync.
* @private
*/
@ -569,9 +574,10 @@ Pool.prototype.stopTimer = function stopTimer() {
Pool.prototype.startInterval = function startInterval() {
var self = this;
this.stopInterval();
function load() {
var peer = self.peers.load;
var hostname = peer ? peer.hostname : null;
if (!self.syncing)
return;
@ -579,11 +585,13 @@ Pool.prototype.startInterval = function startInterval() {
return self.stopInterval();
if (self.chain.isBusy())
return self.startTimer();
return self.startTimeout();
self.logger.warning('Stalling.');
self.logger.warning('Loader peer is stalling (%s).', hostname);
}
this.stopInterval();
this.interval = setInterval(load, this.loadTimeout / 6 | 0);
};
@ -620,9 +628,15 @@ Pool.prototype.addLoader = function addLoader() {
addr = this.getLoaderHost();
peer = this.peers.get(addr);
if (this.syncing) {
this.startTimeout();
this.startInterval();
}
if (peer) {
this.logger.info('Repurposing peer for loader (%s).', peer.hostname);
this.peers.repurpose(peer);
this.logger.info('Repurposed peer for loader (%s).', peer.hostname);
this.fillPeers();
utils.nextTick(function() {
self.emit('loader', peer);
});
@ -649,7 +663,7 @@ Pool.prototype.startSync = function startSync() {
this.syncing = true;
this.startInterval();
this.startTimer();
this.startTimeout();
this.connect();
@ -692,7 +706,7 @@ Pool.prototype.stopSync = function stopSync() {
return;
this.stopInterval();
this.stopTimer();
this.stopTimeout();
if (this.peers.load)
this.peers.load.syncSent = false;
@ -733,7 +747,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer, callback)
// Reset interval to avoid stall behavior.
this.startInterval();
// Reset timeout to avoid killing the loader.
this.startTimer();
this.startTimeout();
}
utils.forEachSerial(headers, function(header, next) {
@ -802,7 +816,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer, callback) {
// Reset interval to avoid stall behavior.
this.startInterval();
// Reset timeout to avoid killing the loader.
this.startTimer();
this.startTimeout();
}
utils.forEachSerial(hashes, function(hash, next, i) {
@ -1029,7 +1043,7 @@ Pool.prototype.createPeer = function createPeer(addr, socket) {
}
self.stopInterval();
self.stopTimer();
self.stopTimeout();
if (self.peers.regular.length === 0) {
self.logger.warning('%s %s %s',
@ -1060,7 +1074,7 @@ Pool.prototype.createPeer = function createPeer(addr, socket) {
if (peer.isLoader()) {
self.startInterval();
self.startTimer();
self.startTimeout();
}
});
});
@ -1080,7 +1094,7 @@ Pool.prototype.createPeer = function createPeer(addr, socket) {
if (peer.isLoader()) {
self.startInterval();
self.startTimer();
self.startTimeout();
}
});
});
@ -1133,33 +1147,33 @@ Pool.prototype.createPeer = function createPeer(addr, socket) {
});
});
peer.on('addr', function(hosts) {
var i, host;
peer.on('addr', function(addrs) {
var i, addr;
if (self.options.ignoreDiscovery)
return;
for (i = 0; i < hosts.length; i++) {
host = hosts[i];
for (i = 0; i < addrs.length; i++) {
addr = addrs[i];
if (!host.hasNetwork())
if (!addr.hasNetwork())
continue;
if (self.options.spv) {
if (!host.hasBloom())
if (!addr.hasBloom())
continue;
}
if (self.options.witness) {
if (!host.hasWitness())
if (!addr.hasWitness())
continue;
}
if (self.hosts.add(host))
self.emit('host', host, peer);
if (self.hosts.add(addr))
self.emit('host', addr, peer);
}
self.emit('addr', hosts, peer);
self.emit('addr', addrs, peer);
self.fillPeers();
});
@ -1398,11 +1412,13 @@ Pool.prototype.addPeer = function addPeer() {
*/
Pool.prototype.fillPeers = function fillPeers() {
this.logger.debug('Refilling peers (%d/%d).',
this.peers.pending.length + this.peers.regular.length,
this.maxPeers - 1);
var i;
for (var i = 0; i < this.maxPeers - 1; i++)
this.logger.debug('Refilling peers (%d/%d).',
this.peers.all.length - this.peers.leeches.length,
this.maxPeers);
for (i = 0; i < this.maxPeers - 1; i++)
this.addPeer();
};
@ -1415,6 +1431,11 @@ Pool.prototype.fillPeers = function fillPeers() {
Pool.prototype.removePeer = function removePeer(peer) {
var i, hashes, hash, item;
if (peer.isLoader() && this.syncing) {
this.stopTimeout();
this.stopInterval();
}
this.peers.remove(peer);
hashes = Object.keys(this.requestMap);
@ -1770,7 +1791,7 @@ Pool.prototype.setMisbehavior = function setMisbehavior(peer, score) {
if (peer.banScore >= constants.BAN_SCORE) {
this.ban(peer);
this.logger.debug('Ban threshold exceeded (%s).', peer.host);
this.logger.debug('Ban threshold exceeded (%s).', peer.hostname);
return true;
}
@ -1779,7 +1800,7 @@ Pool.prototype.setMisbehavior = function setMisbehavior(peer, score) {
/**
* Ban a peer.
* @param {NetworkAddress} host
* @param {NetworkAddress} addr
*/
Pool.prototype.ban = function ban(addr) {
@ -1802,7 +1823,7 @@ Pool.prototype.unban = function unban(addr) {
};
/**
* Test whether the host/peer is banned.
* Test whether the host is banned.
* @param {NetworkAddress} addr
* @returns {Boolean}
*/
@ -1827,7 +1848,7 @@ Pool.prototype.ignore = function ignore(addr) {
};
/**
* Test whether the host/peer is ignored.
* Test whether the host is ignored.
* @param {NetworkAddress} addr
* @returns {Boolean}
*/
@ -2160,7 +2181,7 @@ HostList.prototype.unban = function unban(addr) {
};
/**
* Test whether the host/peer is banned.
* Test whether the host is banned.
* @param {NetworkAddress} addr
* @returns {Boolean}
*/
@ -2291,7 +2312,7 @@ LoadRequest.prototype.addCallback = function addCallback(callback) {
};
/**
* Mark the request as in-flight. Start timeout timer.
* Mark the request as in-flight. Start timeout.
*/
LoadRequest.prototype.start = function start() {