net: rewrite hostlist. bitcoind-style management.

This commit is contained in:
Christopher Jeffrey 2016-12-21 11:53:09 -08:00
parent 25101f1784
commit 24e6ed7a26
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
10 changed files with 1197 additions and 427 deletions

View File

@ -407,7 +407,7 @@ RPC.prototype.addnode = co(function* addnode(args) {
break;
case 'onetry':
if (!this.pool.peers.get(addr.hostname)) {
peer = this.pool.createPeer(addr.port, addr.host);
peer = this.pool.createPeer(addr);
this.pool.peers.add(peer);
}
break;
@ -499,6 +499,7 @@ RPC.prototype.getnettotals = co(function* getnettotals(args) {
RPC.prototype.getpeerinfo = co(function* getpeerinfo(args) {
var peers = [];
var id = 0;
var peer;
if (args.help || args.length !== 0)
@ -506,7 +507,7 @@ RPC.prototype.getpeerinfo = co(function* getpeerinfo(args) {
for (peer = this.pool.peers.head(); peer; peer = peer.next) {
peers.push({
id: peer.id,
id: id++,
addr: peer.hostname,
addrlocal: peer.hostname,
relaytxes: peer.outbound,
@ -560,7 +561,7 @@ RPC.prototype.setban = co(function* setban(args) {
case 'add':
peer = this.pool.peers.get(addr.hostname);
if (peer) {
this.pool.ban(peer);
peer.ban();
break;
}
this.pool.hosts.ban(addr.host);
@ -588,7 +589,7 @@ RPC.prototype.listbanned = co(function* listbanned(args) {
time = this.pool.hosts.banned[host];
banned.push({
address: host,
banned_until: time + constants.BAN_TIME,
banned_until: time + this.pool.hosts.banTime,
ban_created: time,
ban_reason: ''
});
@ -601,7 +602,7 @@ RPC.prototype.clearbanned = co(function* clearbanned(args) {
if (args.help || args.length !== 0)
throw new RPCError('clearbanned');
this.pool.hosts.clear();
this.pool.hosts.clearBanned();
return null;
});

View File

@ -9,7 +9,6 @@
var assert = require('assert');
var EventEmitter = require('events').EventEmitter;
var tcp = require('./tcp');
var util = require('../utils/util');
var co = require('../utils/co');
var Parser = require('./parser');
@ -26,7 +25,7 @@ var Block = require('../primitives/block');
var TX = require('../primitives/tx');
var errors = require('../btc/errors');
var List = require('../utils/list');
var IP = require('../utils/ip');
var NetAddress = require('../primitives/netaddress');
var invTypes = constants.inv;
var packetTypes = packets.types;
var VerifyResult = errors.VerifyResult;
@ -43,7 +42,6 @@ var VerifyResult = errors.VerifyResult;
* @property {String} host
* @property {Number} port
* @property {String} hostname
* @property {Number} port
* @property {Parser} parser
* @property {Framer} framer
* @property {Chain} chain
@ -68,7 +66,6 @@ var VerifyResult = errors.VerifyResult;
* @property {Number} lastPing - Timestamp for last `ping`
* sent (unix time).
* @property {Number} minPing - Lowest ping time seen.
* @property {String} id - Peer's uid.
* @property {Number} banScore
* @emits Peer#ack
*/
@ -86,16 +83,12 @@ function Peer(pool) {
this.mempool = this.pool.mempool;
this.network = this.chain.network;
this.locker = new Locker();
this.createSocket = this.options.createSocket;
this.next = null;
this.prev = null;
this.id = Peer.uid++;
this.socket = null;
this.outbound = false;
this.host = '0.0.0.0';
this.port = 0;
this.hostname = '0.0.0.0:0';
this.address = new NetAddress();
this.destroyed = false;
this.ack = false;
this.connected = false;
@ -123,6 +116,7 @@ function Peer(pool) {
this.waitingTX = 0;
this.syncSent = false;
this.sentAddr = false;
this.sentGetAddr = false;
this.challenge = null;
this.lastPong = -1;
this.lastPing = -1;
@ -164,13 +158,17 @@ function Peer(pool) {
util.inherits(Peer, EventEmitter);
/**
* Globally incremented unique id.
* @private
* @type {Number}
*/
Peer.prototype.__defineGetter__('host', function() {
return this.address.host;
});
Peer.uid = 0;
Peer.prototype.__defineGetter__('port', function() {
return this.address.port;
});
Peer.prototype.__defineGetter__('hostname', function() {
return this.address.hostname;
});
/**
* Begin peer initialization.
@ -206,20 +204,6 @@ Peer.prototype._init = function init() {
}
};
/**
* Set peer host, port, and hostname.
* @param {String} host
* @param {Number} port
*/
Peer.prototype.setHost = function setHost(host, port) {
assert(typeof host === 'string');
assert(typeof port === 'number');
this.host = host;
this.port = port;
this.hostname = IP.hostname(host, port);
};
/**
* Bind to socket.
* @param {net.Socket} socket
@ -229,25 +213,19 @@ Peer.prototype.bind = function bind(socket) {
var self = this;
assert(!this.socket);
this.socket = socket;
this.socket.once('connect', function() {
self.logger.info('Connected to %s.', self.hostname);
});
this.socket.once('error', function(err) {
if (!self.connected)
return;
self.error(err);
self.destroy();
switch (err.code) {
case 'ECONNREFUSED':
case 'EHOSTUNREACH':
case 'ENETUNREACH':
case 'ENOTFOUND':
case 'ECONNRESET':
self.ignore();
break;
default:
if (!self.connected)
self.ignore();
break;
}
});
this.socket.once('close', function() {
@ -270,74 +248,48 @@ Peer.prototype.bind = function bind(socket) {
/**
* Accept an inbound socket.
* @param {net.Socket} socket
* @returns {net.Socket}
*/
Peer.prototype.accept = function accept(socket) {
var host = IP.normalize(socket.remoteAddress);
var port = socket.remotePort;
assert(!this.socket);
this.bind(socket);
this.setHost(host, port);
this.address = NetAddress.fromSocket(socket, this.network);
this.address.services = 0;
this.ts = util.now();
this.outbound = false;
this.connected = true;
this.bind(socket);
};
/**
* Create the socket and begin connecting. This method
* will use `options.createSocket` if provided.
* @param {Number} port
* @param {String} host
* @param {NetAddress} addr
* @returns {net.Socket}
*/
Peer.prototype.connect = function connect(port, host) {
var self = this;
Peer.prototype.connect = function connect(addr) {
var proxy = this.pool.proxyServer;
var socket;
assert(!this.socket);
if (this.createSocket)
socket = this.createSocket(port, host, proxy);
else
socket = tcp.connect(port, host, proxy);
socket = this.pool.createSocket(addr.port, addr.host, proxy);
this.bind(socket);
this.setHost(host, port);
this.address = addr;
this.ts = util.now();
this.outbound = true;
this.connected = false;
this.bind(socket);
this.logger.debug('Connecting to %s.', this.hostname);
socket.once('connect', function() {
self.logger.info('Connected to %s.', self.hostname);
});
return socket;
};
/**
* Open and perform initial handshake.
* @returns {Promise}
*/
Peer.prototype.open = co(function* open() {
yield this.initConnect();
yield this.initStall();
yield this.initBIP151();
yield this.initBIP150();
yield this.initVerack();
yield this.finalize();
if (this.destroyed)
throw new Error('Peer was destroyed before handshake.');
// Finally we can let the pool know
// that this peer is ready to go.
this.emit('open');
});
/**
* Open and perform initial handshake (without rejection).
* @returns {Promise}
@ -346,12 +298,58 @@ Peer.prototype.open = co(function* open() {
Peer.prototype.tryOpen = co(function* tryOpen() {
try {
yield this.open();
} catch (e) {
;
}
});
/**
* Open and perform initial handshake.
* @returns {Promise}
*/
Peer.prototype.open = co(function* open() {
try {
yield this._open();
} catch (e) {
this.error(e);
this.destroy();
throw e;
}
});
/**
* Open and perform initial handshake.
* @returns {Promise}
*/
Peer.prototype._open = co(function* open() {
// Mark address attempt.
this.markAttempt();
// Connect to peer.
yield this.initConnect();
yield this.initStall();
// Mark address success.
this.markSuccess();
// Handshake.
yield this.initBIP151();
yield this.initBIP150();
yield this.initVersion();
yield this.finalize();
assert(!this.destroyed);
// Mark address ack.
this.markAck();
// Finally we can let the pool know
// that this peer is ready to go.
this.emit('open');
});
/**
* Wait for connection.
* @private
@ -366,23 +364,33 @@ Peer.prototype.initConnect = function initConnect() {
}
return new Promise(function(resolve, reject) {
function cleanup() {
if (self.connectTimeout != null) {
clearTimeout(self.connectTimeout);
self.connectTimeout = null;
}
self.socket.removeListener('error', onError);
}
function onError(err) {
cleanup();
reject(err);
}
self.socket.once('connect', function() {
self.ts = util.now();
self.connected = true;
self.emit('connect');
if (self.connectTimeout != null) {
clearTimeout(self.connectTimeout);
self.connectTimeout = null;
}
cleanup();
resolve();
});
self.socket.once('error', onError);
self.connectTimeout = setTimeout(function() {
self.connectTimeout = null;
reject(new Error('Connection timed out.'));
self.ignore();
}, 10000);
});
};
@ -424,6 +432,9 @@ Peer.prototype.initBIP151 = co(function* initBIP151() {
this.error(err);
}
if (this.destroyed)
throw new Error('Peer was destroyed during BIP151 handshake.');
assert(this.bip151.completed);
if (this.bip151.handshake) {
@ -456,6 +467,9 @@ Peer.prototype.initBIP150 = co(function* initBIP150() {
yield this.bip150.wait(3000);
if (this.destroyed)
throw new Error('Peer was destroyed during BIP150 handshake.');
assert(this.bip150.completed);
if (this.bip150.auth) {
@ -470,12 +484,12 @@ Peer.prototype.initBIP150 = co(function* initBIP150() {
* @private
*/
Peer.prototype.initVerack = co(function* initVerack() {
Peer.prototype.initVersion = co(function* initVersion() {
// Say hello.
this.sendVersion();
// Advertise our address.
if (this.pool.address.host !== '0.0.0.0'
if (!this.pool.address.isNull()
&& !this.options.selfish
&& this.pool.server) {
this.send(new packets.AddrPacket([this.pool.address]));
@ -483,6 +497,9 @@ Peer.prototype.initVerack = co(function* initVerack() {
yield this.request('verack');
if (this.destroyed)
throw new Error('Peer was destroyed during version handshake.');
// Wait for _their_ version.
if (!this.version) {
this.logger.debug(
@ -491,9 +508,52 @@ Peer.prototype.initVerack = co(function* initVerack() {
yield this.request('version');
if (this.destroyed)
throw new Error('Peer was destroyed during version handshake.');
assert(this.version);
}
if (!this.network.selfConnect) {
if (util.equal(this.version.nonce, this.pool.localNonce))
throw new Error('We connected to ourself. Oops.');
}
if (this.version.version < constants.MIN_VERSION)
throw new Error('Peer does not support required protocol version.');
if (this.options.witness) {
this.haveWitness = this.version.hasWitness();
if (!this.haveWitness && this.network.oldWitness) {
try {
yield this.request('havewitness');
this.haveWitness = true;
} catch (err) {
;
}
}
}
if (this.outbound) {
if (!this.version.hasNetwork())
throw new Error('Peer does not support network services.');
if (this.options.headers) {
if (!this.version.hasHeaders())
throw new Error('Peer does not support getheaders.');
}
if (this.options.spv) {
if (!this.version.hasBloom())
throw new Error('Peer does not support BIP37.');
}
if (this.options.witness) {
if (!this.haveWitness)
throw new Error('Peer does not support segregated witness.');
}
}
this.ack = true;
this.logger.debug('Received verack (%s).', this.hostname);
@ -533,7 +593,8 @@ Peer.prototype.finalize = co(function* finalize() {
}
// Find some more peers.
this.send(new packets.GetAddrPacket());
if (!this.pool.hosts.isFull())
this.sendGetAddr();
// Relay our spv filter if we have one.
this.updateWatch();
@ -768,7 +829,7 @@ Peer.prototype.sendHeaders = function sendHeaders(items) {
Peer.prototype.sendVersion = function sendVersion() {
var packet = new packets.VersionPacket();
packet.version = constants.VERSION;
packet.services = this.pool.services;
packet.services = this.pool.address.services;
packet.ts = this.network.now();
packet.from = this.pool.address;
packet.nonce = this.pool.localNonce;
@ -778,6 +839,18 @@ Peer.prototype.sendVersion = function sendVersion() {
this.send(packet);
};
/**
* Send a `getaddr` packet.
*/
Peer.prototype.sendGetAddr = function sendGetAddr() {
if (this.sentGetAddr)
return;
this.sentGetAddr = true;
this.send(new packets.GetAddrPacket());
};
/**
* Send a `ping` packet.
*/
@ -1556,64 +1629,12 @@ Peer.prototype.handleVersion = co(function* handleVersion(version) {
if (this.version)
throw new Error('Peer sent a duplicate version.');
if (!this.network.selfConnect) {
if (util.equal(version.nonce, this.pool.localNonce)) {
this.ignore();
throw new Error('We connected to ourself. Oops.');
}
}
if (version.version < constants.MIN_VERSION) {
this.ignore();
throw new Error('Peer does not support required protocol version.');
}
if (this.options.witness) {
this.haveWitness = version.hasWitness();
if (!this.haveWitness && this.network.oldWitness) {
try {
yield this.request('havewitness');
this.haveWitness = true;
} catch (err) {
;
}
}
}
if (this.outbound) {
if (!version.hasNetwork()) {
this.ignore();
throw new Error('Peer does not support network services.');
}
if (this.options.headers) {
if (!version.hasHeaders()) {
this.ignore();
throw new Error('Peer does not support getheaders.');
}
}
if (this.options.spv) {
if (!version.hasBloom()) {
this.ignore();
throw new Error('Peer does not support BIP37.');
}
}
if (this.options.witness) {
if (!this.haveWitness) {
this.ignore();
throw new Error('Peer does not support segregated witness.');
}
}
}
this.relay = version.relay;
this.version = version;
this.fire('version', version);
this.send(new packets.VerackPacket());
this.fire('version', version);
});
/**
@ -2030,7 +2051,7 @@ Peer.prototype.handlePong = co(function* handlePong(packet) {
Peer.prototype.handleGetAddr = co(function* handleGetAddr(packet) {
var items = [];
var addr;
var i, addrs, addr;
if (this.options.selfish)
return;
@ -2042,7 +2063,11 @@ Peer.prototype.handleGetAddr = co(function* handleGetAddr(packet) {
this.sentAddr = true;
for (addr = this.pool.hosts.head(); addr; addr = addr.next) {
addrs = this.pool.hosts.toArray();
for (i = 0; i < addrs.length; i++) {
addr = addrs[i];
if (!this.addrFilter.added(addr.hostname, 'ascii'))
continue;
@ -2646,9 +2671,9 @@ Peer.prototype.sendCompact = function sendCompact() {
Peer.prototype.increaseBan = function increaseBan(score) {
this.banScore += score;
if (this.banScore >= constants.BAN_SCORE) {
if (this.banScore >= this.pool.banScore) {
this.logger.debug('Ban threshold exceeded (%s).', this.hostname);
this.pool.ban(this);
this.ban();
return true;
}
@ -2656,11 +2681,39 @@ Peer.prototype.increaseBan = function increaseBan(score) {
};
/**
* Ignore peer.
* Ban peer.
*/
Peer.prototype.ignore = function ignore() {
return this.pool.ignore(this);
Peer.prototype.ban = function ban() {
this.logger.debug('Banning peer (%s).', this.hostname);
this.pool.hosts.ban(this.host);
this.pool.hosts.remove(this.hostname);
this.destroy();
};
/**
* Mark connection attempt.
*/
Peer.prototype.markAttempt = function markAttempt() {
this.pool.hosts.markAttempt(this.hostname);
};
/**
* Mark connection success.
*/
Peer.prototype.markSuccess = function markSuccess() {
this.pool.hosts.markSuccess(this.hostname);
};
/**
* Mark ack success.
*/
Peer.prototype.markAck = function markAck() {
assert(this.version);
this.pool.hosts.markAck(this.hostname, this.version.services);
};
/**
@ -2674,8 +2727,7 @@ Peer.prototype.ignore = function ignore() {
Peer.prototype.reject = function reject(obj, code, reason, score) {
this.sendReject(code, reason, obj);
if (score > 0)
this.increaseBan(score);
this.increaseBan(score);
};
/**

File diff suppressed because it is too large Load Diff

View File

@ -9,8 +9,8 @@
var ProxySocket = require('./proxysocket');
var tcp = exports;
tcp.connect = function connect(port, host, uri) {
return ProxySocket.connect(uri, port, host);
tcp.createSocket = function createSocket(port, host, proxy) {
return ProxySocket.connect(proxy, port, host);
};
tcp.Server = null;
tcp.createServer = null;

View File

@ -9,8 +9,10 @@
var net = require('net');
var tcp = exports;
tcp.connect = function connect(port, host) {
tcp.createSocket = function createSocket(port, host, proxy) {
return net.connect(port, host);
};
tcp.Server = net.Server;
tcp.createServer = function createServer() {
return new net.Server();
};

View File

@ -6,11 +6,11 @@
'use strict';
var assert = require('assert');
var constants = require('../protocol/constants');
var Network = require('../protocol/network');
var util = require('../utils/util');
var IP = require('../utils/ip');
var assert = require('assert');
var StaticWriter = require('../utils/staticwriter');
var BufferReader = require('../utils/reader');
@ -39,9 +39,6 @@ function NetAddress(options) {
this.ts = 0;
this.hostname = '0.0.0.0:0';
this.prev = null;
this.next = null;
if (options)
this.fromOptions(options);
}
@ -122,6 +119,15 @@ NetAddress.prototype.hasWitness = function hasWitness() {
return (this.services & constants.services.WITNESS) !== 0;
};
/**
* Test whether the host is null.
* @returns {Boolean}
*/
NetAddress.prototype.isNull = function isNull() {
return this.host === '0.0.0.0' || this.host === '::';
};
/**
* Set host.
* @param {String} host
@ -326,8 +332,7 @@ NetAddress.prototype.toRaw = function toRaw(full) {
NetAddress.prototype.inspect = function inspect() {
return '<NetAddress:'
+ ' id=' + this.id
+ ' hostname=' + IP.hostname(this.host, this.port)
+ ' hostname=' + this.hostname
+ ' services=' + this.services.toString(2)
+ ' date=' + util.date(this.ts)
+ '>';

View File

@ -49,12 +49,12 @@ main.type = 'main';
*/
main.seeds = [
'seed.bitcoin.sipa.be', // Pieter Wuille
'dnsseed.bluematt.me', // Matt Corallo
'dnsseed.bitcoin.dashjr.org', // Luke Dashjr
'seed.bitcoinstats.com', // Christian Decker
'bitseed.xf2.org', // Jeff Garzik
'seed.bitcoin.jonasschnelli.ch', // Jonas Schnelli
'seed.bitcoin.sipa.be', // Pieter Wuille
'dnsseed.bluematt.me' // Matt Corallo
'seed.bitcoin.jonasschnelli.ch' // Jonas Schnelli
];
/**

View File

@ -100,6 +100,26 @@ TimeData.prototype.now = function now() {
return util.now() + this.offset;
};
/**
* Adjust a timestamp.
* @param {Number} time
* @returns {Number} Adjusted Time.
*/
TimeData.prototype.adjust = function adjust(time) {
return time + this.offset;
};
/**
* Unadjust a timestamp.
* @param {Number} time
* @returns {Number} Local Time.
*/
TimeData.prototype.local = function local(time) {
return time - this.offset;
};
/*
* Helpers
*/

View File

@ -175,6 +175,32 @@ List.prototype.remove = function remove(item) {
return true;
};
/**
* Replace an item in-place.
* @param {ListItem} ref
* @param {ListItem} item
*/
List.prototype.replace = function replace(ref, item) {
if (ref.prev)
ref.prev.next = item;
if (ref.next)
ref.next.prev = item;
item.prev = ref.prev;
item.next = ref.next;
ref.next = null;
ref.prev = null;
if (this.head === ref)
this.head = item;
if (this.tail === ref)
this.tail = item;
};
/**
* Slice the list to an array of items.
* Will remove the items sliced.

View File

@ -541,6 +541,17 @@ util.time = function time(date) {
return new Date(date) / 1000 | 0;
};
/**
* Get random range.
* @param {Number} min
* @param {Number} max
* @returns {Number}
*/
util.random = function random(min, max) {
return Math.floor(Math.random() * (max - min)) + min;
};
/**
* Create a 64 bit nonce.
* @returns {Buffer}
@ -548,8 +559,8 @@ util.time = function time(date) {
util.nonce = function _nonce() {
var nonce = new Buffer(8);
nonce.writeUInt32LE((Math.random() * 0x100000000) >>> 0, 0, true);
nonce.writeUInt32LE((Math.random() * 0x100000000) >>> 0, 4, true);
nonce.writeUInt32LE(util.random(0, 0x100000000), 0, true);
nonce.writeUInt32LE(util.random(0, 0x100000000), 4, true);
return nonce;
};