peer: refactor.

This commit is contained in:
Christopher Jeffrey 2016-08-25 23:32:54 -07:00
parent d073f87874
commit d21aadf6b3
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
4 changed files with 118 additions and 75 deletions

View File

@ -934,7 +934,7 @@ NetworkAddress.prototype.inspect = function inspect() {
* Inject properties from hostname and network.
* @private
* @param {String} hostname
* @param {Network|NetworkType} network
* @param {(Network|NetworkType)?} network
*/
NetworkAddress.prototype.fromHostname = function fromHostname(hostname, network) {
@ -966,6 +966,39 @@ NetworkAddress.fromHostname = function fromHostname(hostname, network) {
return new NetworkAddress().fromHostname(hostname, network);
};
/**
* Inject properties from socket.
* @private
* @param {net.Socket} socket
*/
NetworkAddress.prototype.fromSocket = function fromSocket(socket) {
assert(typeof socket.remoteAddress === 'string');
assert(typeof socket.remotePort === 'number');
this.host = IP.normalize(socket.remoteAddress);
this.port = socket.remotePort;
this.services = constants.services.NETWORK
| constants.services.BLOOM
| constants.services.WITNESS;
this.ts = bcoin.now();
this.hostname = IP.hostname(this.host, this.port);
return this;
};
/**
* Instantiate a network address
* from a socket.
* @param {net.Socket} socket
* @returns {NetworkAddress}
*/
NetworkAddress.fromSocket = function fromSocket(hostname) {
return new NetworkAddress().fromSocket(hostname);
};
/**
* Inject properties from serialized data.
* @private

View File

@ -68,24 +68,21 @@ var GetUTXOsPacket = bcoin.packets.GetUTXOsPacket;
* @emits Peer#ack
*/
function Peer(pool, options) {
function Peer(pool, host, socket) {
if (!(this instanceof Peer))
return new Peer(pool, options);
return new Peer(pool, host, socket);
EventEmitter.call(this);
if (!options)
options = {};
this.options = options;
this.pool = pool;
this.options = pool.options;
this.logger = pool.logger;
this.socket = null;
this.outbound = false;
this.host = null;
this.port = 0;
this.hostname = null;
this._createSocket = this.pool.options.createSocket;
this.createSocket = this.options.createSocket;
this.chain = this.pool.chain;
this.mempool = this.pool.mempool;
this.network = this.chain.network;
@ -94,7 +91,7 @@ function Peer(pool, options) {
this.destroyed = false;
this.ack = false;
this.connected = false;
this.ts = options.ts || 0;
this.ts = 0;
this.preferHeaders = false;
this.haveWitness = false;
this.hashContinue = null;
@ -123,9 +120,9 @@ function Peer(pool, options) {
this.banScore = 0;
this.pingTimer = null;
this.pingInterval = options.pingInterval || 120000;
this.pingInterval = 120000;
this.requestTimeout = options.requestTimeout || 10000;
this.requestTimeout = 10000;
this.requestMap = {};
this.queueBlock = [];
@ -136,30 +133,23 @@ function Peer(pool, options) {
this.setMaxListeners(10000);
if (options.socket) {
this.socket = options.socket;
this.host = IP.normalize(this.socket.remoteAddress);
this.port = this.socket.remotePort;
this.connected = true;
this.outbound = false;
} else if (options.host) {
this.host = options.host.host;
this.port = options.host.port;
this.socket = this.createSocket(this.port, this.host);
assert(host, 'Host required.');
this.host = host.host;
this.port = host.port;
this.hostname = host.hostname;
if (!socket) {
this.socket = this.connect(this.port, this.host);
this.outbound = true;
} else {
assert(false, 'No seed or socket.');
this.socket = socket;
this.connected = true;
}
assert(typeof this.host === 'string');
assert(typeof this.port === 'number');
assert(this.socket, 'No socket.');
this.hostname = IP.hostname(this.host, this.port);
if (this.pool.options.bip151) {
if (this.options.bip151) {
this.bip151 = new bcoin.bip151();
if (this.pool.options.bip150) {
if (this.options.bip150) {
this.bip150 = new bcoin.bip150(
this.bip151,
this.hostname,
@ -357,7 +347,7 @@ Peer.prototype._onHandshake = function _onHandshake() {
// Advertise our address.
if (this.pool.address.host !== '0.0.0.0'
&& !this.pool.options.selfish
&& !this.options.selfish
&& this.pool.server) {
this.write(this.framer.addr([this.pool.address]));
}
@ -386,7 +376,6 @@ Peer.prototype._onAck = function _onAck(err) {
}
this.ack = true;
this.ts = utils.now();
// Setup the ping interval.
this.pingTimer = setInterval(function() {
@ -394,7 +383,7 @@ Peer.prototype._onAck = function _onAck(err) {
}, this.pingInterval);
// Ask for headers-only.
if (this.pool.options.headers) {
if (this.options.headers) {
if (this.version.version >= 70012)
this.write(this.framer.sendHeaders());
}
@ -402,13 +391,13 @@ Peer.prototype._onAck = function _onAck(err) {
// Let them know we support segwit (old
// segwit3 nodes require this instead
// of service bits).
if (this.pool.options.witness && this.network.oldWitness) {
if (this.options.witness && this.network.oldWitness) {
if (this.version.version >= 70012)
this.write(this.framer.haveWitness());
}
// We want compact blocks!
if (this.pool.options.compact) {
if (this.options.compact) {
if (this.version.version >= 70014)
this.sendCompact();
}
@ -450,13 +439,14 @@ Peer.prototype._onAck = function _onAck(err) {
* @returns {net.Socket}
*/
Peer.prototype.createSocket = function createSocket(port, host) {
Peer.prototype.connect = function connect(port, host) {
var self = this;
var hostname = IP.hostname(host, port);
var socket, proxy, net;
if (this._createSocket) {
socket = this._createSocket(port, host);
assert(!this.socket);
if (this.createSocket) {
socket = this.createSocket(port, host);
} else {
if (utils.isBrowser) {
proxy = require('./proxysocket');
@ -467,10 +457,10 @@ Peer.prototype.createSocket = function createSocket(port, host) {
}
}
this.logger.debug('Connecting to %s.', hostname);
this.logger.debug('Connecting to %s.', this.hostname);
socket.once('connect', function() {
self.logger.info('Connected to %s.', hostname);
self.logger.info('Connected to %s.', self.hostname);
});
this._connectTimeout = setTimeout(function() {
@ -628,7 +618,7 @@ Peer.prototype.sendVersion = function sendVersion() {
nonce: this.pool.localNonce,
agent: constants.USER_AGENT,
height: this.chain.height,
relay: this.pool.options.relay
relay: this.options.relay
});
this.write(this.framer.version(packet));
@ -685,7 +675,7 @@ Peer.prototype.isWatched = function isWatched(item) {
*/
Peer.prototype.updateWatch = function updateWatch() {
if (!this.pool.options.spv)
if (!this.options.spv)
return;
this.write(this.framer.filterLoad(this.pool.spvFilter));
@ -722,13 +712,16 @@ Peer.prototype.destroy = function destroy() {
if (this.bip150)
this.bip150.destroy();
this.emit('close');
if (this.pingTimer) {
if (this.pingTimer != null) {
clearInterval(this.pingTimer);
this.pingTimer = null;
}
if (this._connectTimeout != null) {
clearTimeout(this._connectTimeout);
this._connectTimeout = null;
}
keys = Object.keys(this.requestMap);
for (i = 0; i < keys.length; i++) {
@ -738,6 +731,8 @@ Peer.prototype.destroy = function destroy() {
for (j = 0; j < queue.length; j++)
queue[j].destroy();
}
this.emit('close');
};
/**
@ -849,7 +844,7 @@ Peer.prototype.getData = function getData(items) {
if (item.toInv)
item = item.toInv();
if (this.pool.options.compact
if (this.options.compact
&& this.compactMode
&& item.isBlock()
&& !item.hasWitness()) {
@ -1115,7 +1110,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
if (!this.pool.synced)
return done();
if (this.pool.options.selfish)
if (this.options.selfish)
return done();
if (this.chain.db.options.spv)
@ -1199,7 +1194,7 @@ Peer.prototype._handleGetHeaders = function _handleGetHeaders(payload) {
if (!this.pool.synced)
return done();
if (this.pool.options.selfish)
if (this.options.selfish)
return done();
if (this.chain.db.options.spv)
@ -1282,7 +1277,7 @@ Peer.prototype._handleGetBlocks = function _handleGetBlocks(payload) {
if (!this.pool.synced)
return done();
if (this.pool.options.selfish)
if (this.options.selfish)
return done();
if (this.chain.db.options.spv)
@ -1353,7 +1348,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) {
}
}
if (this.pool.options.headers) {
if (this.options.headers) {
if (!version.hasHeaders()) {
this._error('Peer doesn\'t support getheaders.');
this.ignore();
@ -1361,7 +1356,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) {
}
}
if (this.pool.options.spv) {
if (this.options.spv) {
if (!version.hasBloom()) {
this._error('Peer does not support bip37.');
this.ignore();
@ -1369,7 +1364,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) {
}
}
if (this.pool.options.witness) {
if (this.options.witness) {
if (!version.hasWitness()) {
if (!this.network.oldWitness) {
this._error('Peer does not support segregated witness.');
@ -1426,7 +1421,7 @@ Peer.prototype._handleMempool = function _handleMempool() {
if (!this.pool.synced)
return done();
if (this.pool.options.selfish)
if (this.options.selfish)
return done();
hashes = this.mempool.getSnapshot();
@ -1463,7 +1458,7 @@ Peer.prototype._getItem = function _getItem(item, callback) {
return callback(null, entry.msg);
}
if (this.pool.options.selfish)
if (this.options.selfish)
return callback();
if (item.isTX()) {
@ -1712,7 +1707,7 @@ Peer.prototype._handleGetAddr = function _handleGetAddr() {
var items = [];
var i, host;
if (this.pool.options.selfish)
if (this.options.selfish)
return;
if (this.sentAddr) {
@ -1993,7 +1988,7 @@ Peer.prototype._handleCmpctBlock = function _handleCmpctBlock(block) {
var hash = block.hash('hex');
var result;
if (!this.pool.options.compact) {
if (!this.options.compact) {
this.logger.info('Peer sent unsolicited cmpctblock (%s).', this.hostname);
return;
}
@ -2062,7 +2057,7 @@ Peer.prototype._handleGetBlockTxn = function _handleGetBlockTxn(req) {
if (this.chain.db.options.prune)
return done();
if (this.pool.options.selfish)
if (this.options.selfish)
return done();
item = new InvItem(constants.inv.BLOCK, req.hash);
@ -2405,7 +2400,7 @@ Peer.prototype.sync = function sync(callback) {
this.syncSent = true;
if (this.pool.options.headers) {
if (this.options.headers) {
if (!this.chain.tip.isGenesis())
tip = this.chain.tip.prevBlock;

View File

@ -481,7 +481,7 @@ Pool.prototype.unlisten = function unlisten(callback) {
*/
Pool.prototype._handleLeech = function _handleLeech(socket) {
var hostname, addr;
var addr;
if (!socket.remoteAddress) {
this.logger.debug('Ignoring disconnected leech.');
@ -489,23 +489,22 @@ Pool.prototype._handleLeech = function _handleLeech(socket) {
return;
}
hostname = IP.hostname(socket.remoteAddress, socket.remotePort);
addr = NetworkAddress.fromHostname(hostname, this.network);
addr = NetworkAddress.fromSocket(socket);
if (this.peers.leeches.length >= this.maxLeeches) {
this.logger.debug('Ignoring leech: too many leeches (%s).', hostname);
this.logger.debug('Ignoring leech: too many leeches (%s).', addr.hostname);
socket.destroy();
return;
}
if (this.hosts.isMisbehaving(addr)) {
this.logger.debug('Ignoring misbehaving leech (%s).', hostname);
this.logger.debug('Ignoring misbehaving leech (%s).', addr.hostname);
socket.destroy();
return;
}
if (this.hosts.isIgnored(addr)) {
this.logger.debug('Ignoring leech (%s).', hostname);
this.logger.debug('Ignoring leech (%s).', addr.hostname);
socket.destroy();
return;
}
@ -513,12 +512,12 @@ Pool.prototype._handleLeech = function _handleLeech(socket) {
// Some kind of weird port collision
// between inbound ports and outbound ports.
if (this.peers.get(addr)) {
this.logger.debug('Port collision (%s).', hostname);
this.logger.debug('Port collision (%s).', addr.hostname);
socket.destroy();
return;
}
this.addLeech(socket);
this.addLeech(addr, socket);
};
/**
@ -630,7 +629,7 @@ Pool.prototype.addLoader = function addLoader() {
return;
}
peer = this.createPeer({ host: addr });
peer = this.createPeer(addr);
this.logger.info('Added loader peer (%s).', peer.hostname);
@ -1014,9 +1013,9 @@ Pool.prototype.sendAlert = function sendAlert(alert) {
* @returns {Peer}
*/
Pool.prototype.createPeer = function createPeer(options) {
Pool.prototype.createPeer = function createPeer(addr, socket) {
var self = this;
var peer = new bcoin.peer(this, options);
var peer = new bcoin.peer(this, addr, socket);
peer.once('close', function() {
self.removePeer(peer);
@ -1337,14 +1336,14 @@ Pool.prototype._handleTX = function _handleTX(tx, peer, callback) {
* @param {net.Socket} socket
*/
Pool.prototype.addLeech = function addLeech(socket) {
Pool.prototype.addLeech = function addLeech(addr, socket) {
var self = this;
var peer;
if (!this.loaded)
return socket.destroy();
peer = this.createPeer({ socket: socket });
peer = this.createPeer(addr, socket);
this.logger.info('Added leech peer (%s).', peer.hostname);
@ -1380,7 +1379,7 @@ Pool.prototype.addPeer = function addPeer() {
if (!addr)
return;
peer = this.createPeer({ host: addr });
peer = this.createPeer(addr);
this.peers.addPending(peer);

View File

@ -8,8 +8,6 @@ var EventEmitter = require('events').EventEmitter;
var IOClient = require('socket.io-client');
function ProxySocket(uri) {
var self = this;
if (!(this instanceof ProxySocket))
return new ProxySocket(uri);
@ -20,9 +18,19 @@ function ProxySocket(uri) {
this.socket = new IOClient(uri, { reconnection: false });
this.sendBuffer = [];
this.snonce = null;
this.bytesWritten = 0;
this.bytesRead = 0;
this.remoteAddress = null;
this.remotePort = 0;
this.closed = false;
this._init();
};
ProxySocket.prototype._init = function _init() {
var self = this;
this.socket.on('info', function(info) {
if (self.closed)
return;
@ -48,6 +56,7 @@ function ProxySocket(uri) {
});
this.socket.on('tcp data', function(data) {
self.bytesRead += data.length / 2;
self.emit('data', new Buffer(data, 'hex'));
});
@ -78,6 +87,9 @@ ProxySocket.prototype.connect = function connect(port, host) {
var nonce = 0;
var i, pow;
this.remoteAddress = host;
this.remotePort = port;
if (this.closed) {
this.sendBuffer.length = 0;
return;
@ -116,11 +128,15 @@ ProxySocket.prototype.connect = function connect(port, host) {
};
ProxySocket.prototype.write = function write(data) {
this.bytesWritten += data.length;
if (!this.info) {
this.sendBuffer.push(data);
return true;
}
this.socket.emit('tcp data', data.toString('hex'));
return true;
};