pool/peer: major refactor.

This commit is contained in:
Christopher Jeffrey 2016-08-23 19:54:06 -07:00
parent 581e4e3ce5
commit 73f9a2a622
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
4 changed files with 818 additions and 639 deletions

View File

@ -122,6 +122,7 @@ BIP150.prototype.reply = function reply(payload) {
sig = bcoin.ec.toDER(data);
msg = this.hash(this.output.sid, type, this.peerIdentity);
result = bcoin.ec.verify(msg, sig, this.peerIdentity);
if (!result)
@ -279,10 +280,15 @@ BIP150.prototype.wait = function wait(timeout, callback) {
};
BIP150.prototype.getAddress = function getAddress() {
assert(this.peerIdentity, 'Cannot serialize address.');
return BIP150.address(this.peerIdentity);
};
BIP150.address = function address(key) {
var p = new bcoin.writer();
p.writeU8(0x0f);
p.writeU16BE(0xff01);
p.writeBytes(utils.hash160(this.peerIdentity));
p.writeBytes(utils.hash160(key));
p.writeChecksum();
return utils.toBase58(p.render());
};

View File

@ -414,7 +414,7 @@ RPC.prototype.disconnectnode = function disconnectnode(args, callback) {
node = toString(args[0]);
node = IP.normalize(node);
peer = this.pool.getPeer(node);
peer = this.pool.peers.get(node);
if (peer)
peer.destroy();
@ -430,7 +430,7 @@ RPC.prototype.getaddednodeinfo = function getaddednodeinfo(args, callback) {
if (args.length === 2) {
host = toString(args[1]);
peer = this.pool.getPeer(host);
peer = this.pool.peers.get(host);
if (!peer)
return callback(new RPCError('Node has not been added.'));
peers = [peer];
@ -533,7 +533,7 @@ RPC.prototype.ping = function ping(args, callback) {
};
RPC.prototype.setban = function setban(args, callback) {
var host, peer, ip;
var host, ip;
if (args.help
|| args.length < 2
@ -547,14 +547,10 @@ RPC.prototype.setban = function setban(args, callback) {
switch (args[1]) {
case 'add':
peer = this.pool.getPeer(ip);
if (peer)
peer.setMisbehavior(100);
else
this.pool.peers.misbehaving[ip] = utils.now();
this.pool.ban(ip);
break;
case 'remove':
delete this.pool.peers.misbehaving[ip];
this.pool.unban(ip);
break;
}
@ -568,11 +564,11 @@ RPC.prototype.listbanned = function listbanned(args, callback) {
return callback(new RPCError('listbanned'));
banned = [];
keys = Object.keys(this.pool.peers.misbehaving);
keys = Object.keys(this.pool.hosts.misbehaving);
for (i = 0; i < keys.length; i++) {
host = keys[i];
time = this.pool.peers.misbehaving[host];
time = this.pool.hosts.misbehaving[host];
banned.push({
address: host,
banned_until: time + constants.BAN_TIME,
@ -588,8 +584,7 @@ RPC.prototype.clearbanned = function clearbanned(args, callback) {
if (args.help || args.length !== 0)
return callback(new RPCError('clearbanned'));
this.pool.peers.ignored = {};
this.pool.peers.misbehaving = {};
this.pool.hosts.clear();
callback(null, null);
};

View File

@ -74,6 +74,8 @@ function Peer(pool, options) {
if (!options)
options = {};
assert(typeof options.type === 'number', 'Peer must have a type.');
this.options = options;
this.pool = pool;
this.logger = pool.logger;
@ -119,7 +121,19 @@ function Peer(pool, options) {
this.banScore = 0;
assert(typeof this.type === 'number', 'Peer must have a type.');
this.pingTimer = null;
this.pingInterval = options.pingInterval || 120000;
this.requestTimeout = options.requestTimeout || 10000;
this.requestMap = {};
this.queueBlock = [];
this.queueTX = [];
this.uid = 0;
this.id = Peer.uid++;
this.setMaxListeners(10000);
if (options.socket) {
this.socket = options.socket;
@ -156,27 +170,6 @@ function Peer(pool, options) {
this.parser = new bcoin.protocol.parser(this);
this.framer = new bcoin.protocol.framer(this);
this.requests = {
timeout: options.requestTimeout || 10000,
skip: {},
map: {}
};
this.ping = {
timer: null,
interval: options.pingInterval || 120000
};
this.queue = {
block: [],
tx: []
};
this.uid = 0;
this.id = Peer.uid++;
this.setMaxListeners(10000);
this._init();
}
@ -396,9 +389,9 @@ Peer.prototype._onAck = function _onAck(err) {
this.ts = utils.now();
// Setup the ping interval.
this.ping.timer = setInterval(function() {
this.pingTimer = setInterval(function() {
self.sendPing();
}, this.ping.interval);
}, this.pingInterval);
// Ask for headers-only.
if (this.pool.options.headers) {
@ -427,7 +420,7 @@ Peer.prototype._onAck = function _onAck(err) {
this.updateWatch();
// Announce our currently broadcasted items.
this.announce(this.pool.inv.items);
this.announce(this.pool.invItems);
// Set a fee rate filter.
if (this.pool.feeRate !== -1)
@ -496,7 +489,7 @@ Peer.prototype.createSocket = function createSocket(port, host) {
Peer.prototype.announce = function announce(items) {
var inv = [];
var headers = [];
var i, item;
var i, item, entry;
if (this.destroyed)
return;
@ -507,9 +500,13 @@ Peer.prototype.announce = function announce(items) {
for (i = 0; i < items.length; i++) {
item = items[i];
// Check the peer's bloom
// filter if they're using spv.
if (!this.isWatched(item))
continue;
// Convert item to block headers
// for peers that request it.
if (this.preferHeaders && item.toHeaders) {
item = item.toHeaders();
if (this.invFilter.test(item.hash()))
@ -521,11 +518,23 @@ Peer.prototype.announce = function announce(items) {
if (item.toInv)
item = item.toInv();
// Do not send txs to spv clients
// that have relay unset.
if (!this.relay) {
if (item.type === constants.inv.TX)
continue;
}
// Filter according to peer's fee filter.
if (this.feeRate !== -1 && this.mempool) {
if (item.type === constants.inv.TX) {
entry = this.mempool.getEntry(item.hash);
if (entry && entry.getRate() < this.feeRate)
continue;
}
}
// Don't send if they already have it.
if (this.invFilter.test(item.hash, 'hex'))
continue;
@ -710,19 +719,19 @@ Peer.prototype.destroy = function destroy() {
this.emit('close');
if (this.ping.timer) {
clearInterval(this.ping.timer);
this.ping.timer = null;
if (this.pingTimer) {
clearInterval(this.pingTimer);
this.pingTimer = null;
}
keys = Object.keys(this.requests.map);
keys = Object.keys(this.requestMap);
for (i = 0; i < keys.length; i++) {
cmd = keys[i];
queue = this.requests.map[cmd];
queue = this.requestMap[cmd];
for (j = 0; j < queue.length; j++)
clearTimeout(queue[j].timer);
queue[j].destroy();
}
};
@ -771,37 +780,17 @@ Peer.prototype._error = function error(err, keep) {
*/
Peer.prototype.request = function request(cmd, callback) {
var self = this;
var entry;
if (this.destroyed)
return utils.asyncify(callback)(new Error('Destroyed, sorry'));
entry = {
cmd: cmd,
callback: callback,
id: this.uid++,
ontimeout: function() {
var queue = self.requests.map[cmd];
entry = new RequestEntry(this, cmd, callback);
if (!queue)
return;
if (!this.requestMap[cmd])
this.requestMap[cmd] = [];
if (utils.binaryRemove(queue, entry, compare)) {
if (queue.length === 0)
delete self.requests.map[cmd];
callback(new Error('Timed out: ' + cmd));
}
},
timer: null
};
entry.timer = setTimeout(entry.ontimeout, this.requests.timeout);
if (!this.requests.map[cmd])
this.requests.map[cmd] = [];
this.requests.map[cmd].push(entry);
this.requestMap[cmd].push(entry);
return entry;
};
@ -814,7 +803,7 @@ Peer.prototype.request = function request(cmd, callback) {
*/
Peer.prototype.response = function response(cmd, payload) {
var queue = this.requests.map[cmd];
var queue = this.requestMap[cmd];
var entry, res;
if (!queue)
@ -827,16 +816,15 @@ Peer.prototype.response = function response(cmd, payload) {
res = entry.callback(null, payload, cmd);
if (res === this.requests.skip)
if (res === false)
return false;
queue.shift();
if (queue.length === 0)
delete this.requests.map[cmd];
delete this.requestMap[cmd];
clearTimeout(entry.timer);
entry.timer = null;
entry.destroy();
return true;
};
@ -1465,7 +1453,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) {
if (version.hasWitness())
this.haveWitness = true;
if (version.relay === false)
if (!version.relay)
this.relay = false;
// ACK
@ -1524,7 +1512,7 @@ Peer.prototype._handleMempool = function _handleMempool() {
*/
Peer.prototype._getItem = function _getItem(item, callback) {
var entry = this.pool.inv.map[item.hash];
var entry = this.pool.invMap[item.hash];
if (entry) {
this.logger.debug(
@ -1610,9 +1598,7 @@ Peer.prototype._handleGetData = function _handleGetData(items) {
return next();
}
// We should technically calculate this in
// the `mempool` handler, but it would be
// too slow.
// Fallback in case fee filter fails in `announce`.
if (self.feeRate !== -1) {
if (entry.getRate() < self.feeRate)
return next();
@ -1725,7 +1711,7 @@ Peer.prototype._handleAddr = function _handleAddr(addrs) {
this.logger.info(
'Received %d addrs (hosts=%d, peers=%d) (%s).',
addrs.length,
this.pool.hosts.length,
this.pool.hosts.items.length,
this.pool.peers.all.length,
this.hostname);
@ -1801,8 +1787,8 @@ Peer.prototype._handleGetAddr = function _handleGetAddr() {
this.sentAddr = true;
for (i = 0; i < this.pool.hosts.length; i++) {
host = this.pool.hosts[i];
for (i = 0; i < this.pool.hosts.items.length; i++) {
host = this.pool.hosts.items[i];
if (!host.isIP())
continue;
@ -1900,7 +1886,7 @@ Peer.prototype._handleReject = function _handleReject(payload) {
return;
hash = payload.data;
entry = this.pool.inv.map[hash];
entry = this.pool.invMap[hash];
if (!entry)
return;
@ -2317,7 +2303,7 @@ Peer.prototype.sendCompact = function sendCompact() {
*/
Peer.prototype.isMisbehaving = function isMisbehaving() {
return this.pool.isMisbehaving(this.host);
return this.pool.hosts.isMisbehaving(this.host);
};
/**
@ -2326,7 +2312,7 @@ Peer.prototype.isMisbehaving = function isMisbehaving() {
*/
Peer.prototype.isIgnored = function isIgnored() {
return this.pool.isIgnored(this.host);
return this.pool.hosts.isIgnored(this.host);
};
/**
@ -2490,6 +2476,40 @@ Peer.prototype.inspect = function inspect() {
+ '>';
};
/**
* RequestEntry
* @constructor
*/
function RequestEntry(peer, cmd, callback) {
this.peer = peer;
this.cmd = cmd;
this.callback = callback;
this.id = peer.uid++;
this.onTimeout = this._onTimeout.bind(this);
this.timeout = setTimeout(this.onTimeout, this.peer.requestTimeout);
}
RequestEntry.prototype._onTimeout = function _onTimeout() {
var queue = this.peer.requestMap[this.cmd];
if (!queue)
return;
if (utils.binaryRemove(queue, this, compare)) {
if (queue.length === 0)
delete this.peer.requestMap[this.cmd];
this.callback(new Error('Timed out: ' + this.cmd));
}
};
RequestEntry.prototype.destroy = function destroy() {
if (this.timeout != null) {
clearTimeout(this.timeout);
this.timeout = null;
}
};
/*
* Helpers
*/

File diff suppressed because it is too large Load Diff