fixes. peer improvements.

This commit is contained in:
Christopher Jeffrey 2016-01-05 01:03:11 -08:00
parent 019ecf4f2a
commit 2f618c961b
4 changed files with 166 additions and 63 deletions

View File

@ -20,9 +20,7 @@ bcoin.output = require('./bcoin/output');
bcoin.tx = require('./bcoin/tx');
bcoin.txPool = require('./bcoin/tx-pool');
bcoin.block = require('./bcoin/block');
bcoin.chain = require('./bcoin/chain');
bcoin.spvChain = require('./bcoin/chain');
bcoin.fullChain = require('./bcoin/fullchain');
bcoin.chain = require('./bcoin/fullchain');
bcoin.wallet = require('./bcoin/wallet');
bcoin.peer = require('./bcoin/peer');
bcoin.pool = require('./bcoin/pool');

View File

@ -186,7 +186,7 @@ Chain.prototype.resetHeight = function resetHeight(height) {
var self = this;
var ahead = this.index.entries.slice(height + 1);
assert(height <= this.index.entries.length - 1);
assert(height < this.index.entries.length);
if (height === this.index.entries.length - 1)
return;

View File

@ -42,16 +42,21 @@ function Peer(pool, createSocket, options) {
this.destroyed = false;
this.ack = false;
this.ts = this.options.ts || 0;
this.address = '0.0.0.0';
this.port = network.port;
this.host = null;
this.port = 0;
if (this.options.backoff) {
setTimeout(function() {
self.socket = createSocket(self, pool);
if (!self.socket)
throw new Error('No socket');
self.emit('socket');
}, this.options.backoff);
} else {
this.socket = createSocket(this, pool);
if (!this.socket)
throw new Error('No socket');
}
this._broadcast = {
@ -72,7 +77,7 @@ function Peer(pool, createSocket, options) {
interval: this.options.pingInterval || 30000
};
this.setMaxListeners(4000);
this.setMaxListeners(10000);
if (this.socket)
this._init();
@ -85,10 +90,19 @@ inherits(Peer, EventEmitter);
Peer.prototype._init = function init() {
var self = this;
if (!this.host)
this.host = this.socket.remoteAddress || this.socket._host || null;
if (!this.port)
this.port = this.socket.remotePort || 0;
this.socket.once('connect', function() {
self.ts = utils.now();
self.address = self.socket.remoteAddress;
self.port = self.socket.remotePort;
self.connected = true;
if (!self.host)
self.host = self.socket.remoteAddress;
if (!self.port)
self.port = self.socket.remotePort;
});
this.socket.once('error', function(err) {
@ -97,6 +111,7 @@ Peer.prototype._init = function init() {
this.socket.once('close', function() {
self._error('socket hangup');
self.connected = false;
});
this.socket.on('data', function(chunk) {
@ -115,7 +130,7 @@ Peer.prototype._init = function init() {
this.once('version', function() {
self.pool.emit('debug',
'Sent version (%s): height=%s',
self.address, this.pool.chain.getStartHeight());
self.host, this.pool.chain.getStartHeight());
});
}
@ -133,8 +148,10 @@ Peer.prototype._init = function init() {
}));
this._req('verack', function(err, payload) {
if (err)
if (err) {
self.destroy();
return self._error(err);
}
self.ack = true;
self.emit('ack');
self.ts = utils.now();
@ -358,12 +375,12 @@ Peer.prototype._onPacket = function onPacket(packet) {
if (cmd === 'merkleblock' || cmd === 'block') {
payload.network = true;
payload.relayedBy = this.address;
payload.relayedBy = this.host || '0.0.0.0';
payload = bcoin.block(payload, cmd);
this.lastBlock = payload;
} else if (cmd === 'tx') {
payload.network = true;
payload.relayedBy = this.address;
payload.relayedBy = this.host || '0.0.0.0';
payload = bcoin.tx(payload, this.lastBlock);
}
@ -411,11 +428,14 @@ Peer.prototype._handleAddr = function handleAddr(addrs) {
service: addr.service,
ipv4: addr.ipv4,
ipv6: addr.ipv6,
address: addr.ipv4,
address6: addr.ipv6,
host: addr.ipv4,
host6: addr.ipv6,
port: addr.port,
host: addr.ipv4 + ':' + addr.port,
host6: '[' + addr.ipv6 + ']:' + addr.port
addr: addr.ipv4 + ':' + addr.port,
addr6: '[' + addr.ipv6 + ']:' + addr.port,
// Deprecated:
address: addr.ipv4,
address6: addr.ipv6
});
}, this);
};
@ -432,21 +452,15 @@ Peer.prototype._handleGetAddr = function handleGetAddr() {
var used = [];
var peers, addrs;
peers = [].concat(
this.pool.peers.pending,
this.pool.peers.block,
this.pool.peers.load
).filter(Boolean);
// NOTE: For IPv6 BTC uses:
// '0000:0000:0000:0000:0000:xxxx:xxxx:ffff'
peers = peers.map(function(peer) {
peers = this.pool.peers.all.map(function(peer) {
if (!peer.socket || !peer.socket.remoteAddress)
return;
return {
host: peer.socket.remoteAddress,
port: peer.socket.remotePort || 8333,
port: peer.socket.remotePort || network.port,
ts: peer.ts
};
}).filter(function(peer) {
@ -532,14 +546,14 @@ Peer.prototype._handleHeaders = function handleHeaders(headers) {
Peer.prototype.loadHeaders = function loadHeaders(hashes, stop) {
this.emit('debug',
'Requesting headers packet from %s with getheaders',
this.address);
this.host);
this._write(this.framer.getHeaders(hashes, stop));
};
Peer.prototype.loadBlocks = function loadBlocks(hashes, stop) {
this.emit('debug',
'Requesting inv packet from %s with getblocks',
this.address);
this.host);
this._write(this.framer.getBlocks(hashes, stop));
};

View File

@ -43,6 +43,10 @@ function Pool(options) {
this.size = options.size || 32;
this.parallel = options.parallel || 2000;
this.redundancy = options.redundancy || 2;
this.seeds = network.seeds.slice();
this._createConnection = options.createConnection;
this._createSocket = options.createSocket;
if (!this.options.fullNode) {
if (this.options.headers == null)
@ -79,9 +83,7 @@ function Pool(options) {
this.maxRetries = options.maxRetries || 42;
this.requestTimeout = options.requestTimeout || 10000;
Chain = bcoin.fullChain;
this.chain = new Chain({
this.chain = new bcoin.chain({
storage: this.storage,
fullNode: this.options.fullNode
});
@ -100,7 +102,9 @@ function Pool(options) {
// Peers that are still connecting
pending: [],
// Peers that are loading block ids
load: null
load: null,
// All peers
all: []
};
this.block = {
@ -127,7 +131,7 @@ function Pool(options) {
};
// Currently broadcasted TXs
this.tx = {
this.inv = {
list: [],
timeout: options.txTimeout || 60000
};
@ -136,9 +140,6 @@ function Pool(options) {
this.options.wallets = this.options.wallets || [];
this.wallets = [];
this.createSocket = options.createConnection || options.createSocket;
assert(this.createSocket);
this.chain.on('debug', function() {
var args = Array.prototype.slice.call(arguments);
self.emit.apply(self, ['debug'].concat(args));
@ -240,6 +241,34 @@ Pool.prototype._stopInterval = function _stopInterval() {
delete this._interval;
};
Pool.prototype.createConnection = function createConnection(peer, pool) {
var addr, parts, host, net, socket, port;
if (pool._createConnection)
return pool._createConnection(peer, pool);
addr = pool.usableSeed(true);
parts = addr.split(':');
host = parts[0];
port = +parts[1] || network.port;
peer.host = host;
peer.port = port;
if (this._createSocket) {
socket = this._createSocket(port, host);
} else {
net = require('net');
socket = net.connect(port, host);
}
socket.on('connect', function() {
pool.emit('debug', 'Connected to %s:%d', host, port);
});
return socket;
};
Pool.prototype._addLoader = function _addLoader() {
var self = this;
@ -251,13 +280,14 @@ Pool.prototype._addLoader = function _addLoader() {
if (this.peers.load != null)
return;
peer = new bcoin.peer(this, this.createSocket, {
peer = new bcoin.peer(this, this.createConnection, {
backoff: 750 * Math.random(),
startHeight: this.options.startHeight,
relay: this.options.relay
});
this.peers.load = peer;
this.peers.all.push(peer);
peer.on('error', function(err) {
self.emit('error', err, peer);
@ -278,10 +308,6 @@ Pool.prototype._addLoader = function _addLoader() {
});
peer.once('ack', function() {
if (!peer.socket) {
peer.destroy();
return;
}
peer.updateWatch();
if (!self._load())
self._stopTimer();
@ -330,7 +356,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) {
this.emit('debug',
'Recieved %s headers from %s',
headers.length,
peer.address);
peer.host);
if (headers.length > 2000) {
peer.destroy();
@ -376,7 +402,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) {
this.emit('debug',
'Requesting %s block packets from %s with getdata',
this.request.queue.length,
peer.address
peer.host
);
};
@ -391,7 +417,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
this.emit('debug',
'Recieved %s block hashes from %s',
hashes.length,
peer.address);
peer.host);
if (hashes.length > 500) {
peer.destroy();
@ -431,7 +457,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) {
this.emit('debug',
'Requesting %s block packets from %s with getdata',
this.request.queue.length,
peer.address
peer.host
);
};
@ -584,13 +610,19 @@ Pool.prototype._addPeer = function _addPeer(backoff) {
if (this.peers.block.length + this.peers.pending.length >= this.size)
return;
peer = new bcoin.peer(this, this.createSocket, {
if (!this._createConnection && !this.usableSeed()) {
setTimeout(this._addPeer.bind(this, 0), this.backoff.max);
return;
}
peer = new bcoin.peer(this, this.createConnection, {
backoff: backoff,
startHeight: this.options.startHeight,
relay: this.options.relay
});
this.peers.pending.push(peer);
this.peers.all.push(peer);
peer._retry = 0;
@ -608,17 +640,12 @@ Pool.prototype._addPeer = function _addPeer(backoff) {
self._removePeer(peer);
if (self.destroyed)
return;
self._addPeer(Math.max(backoff + self.backoff.delta, self.backoff.max));
self._addPeer(Math.min(backoff + self.backoff.delta, self.backoff.max));
});
peer.once('ack', function() {
var i;
if (!peer.socket) {
peer.destroy();
return;
}
if (self.destroyed)
return;
@ -630,8 +657,8 @@ Pool.prototype._addPeer = function _addPeer(backoff) {
peer.updateWatch();
self.tx.list.forEach(function(entry) {
var result = peer.broadcast(entry.tx);
self.inv.list.forEach(function(entry) {
var result = peer.broadcast(entry.msg);
if (!result)
return;
@ -677,6 +704,16 @@ Pool.prototype._addPeer = function _addPeer(backoff) {
});
peer.on('addr', function(addr) {
var host = addr.ipv4 + ':' + addr.port;
if (self.seeds.length > self.size * 2)
self.seeds = network.seeds.slice();
if (!~self.seeds.indexOf(host)) {
self.emit('debug', 'Found new peer: %s', host);
self.seeds.push(host);
}
self.emit('addr', addr, peer);
});
@ -720,6 +757,10 @@ Pool.prototype._removePeer = function _removePeer(peer) {
if (i !== -1)
this.peers.block.splice(i, 1);
i = this.peers.all.indexOf(peer);
if (i !== -1)
this.peers.all.splice(i, 1);
if (this.peers.load === peer)
this.peers.load = null;
};
@ -1178,7 +1219,7 @@ Pool.prototype.getBlock = function getBlock(hash, cb) {
};
Pool.prototype.sendBlock = function sendBlock(block) {
return this.sendTX(block);
return this.broadcast(block);
};
Pool.prototype.getTX = function getTX(hash, range, cb) {
@ -1248,23 +1289,27 @@ Pool.prototype.getTX = function getTX(hash, range, cb) {
};
Pool.prototype.sendTX = function sendTX(tx) {
return this.broadcast(tx);
};
Pool.prototype.broadcast = function broadcast(msg) {
var self = this;
var e = new EventEmitter();
var entry = {
tx: tx,
msg: msg,
e: e,
timer: setTimeout(function() {
var i = self.tx.list.indexOf(entry);
var i = self.inv.list.indexOf(entry);
if (i !== -1)
self.tx.list.splice(i, 1);
}, this.tx.timeout)
self.inv.list.splice(i, 1);
}, this.inv.timeout)
};
this.tx.list.push(entry);
this.inv.list.push(entry);
this.peers.block.forEach(function(peer) {
var result = peer.broadcast(tx);
var result = peer.broadcast(msg);
if (!result) return;
result[0].once('request', function() {
e.emit('ack', peer);
@ -1287,9 +1332,9 @@ Pool.prototype.destroy = function destroy() {
item.finish(null);
});
this.tx.list.forEach(function(tx) {
clearTimeout(tx.timer);
tx.timer = null;
this.inv.list.forEach(function(entry) {
clearTimeout(entry.timer);
entry.timer = null;
});
this.peers.pending.slice().forEach(function(peer) {
@ -1306,6 +1351,52 @@ Pool.prototype.destroy = function destroy() {
this.load.timer = null;
};
Pool.prototype.getPeer = function getPeer(addr) {
var parts, host, port, i, peer;
if (!addr)
return;
parts = addr.split(':');
host = parts[0];
port = +parts[1] || network.port;
for (i = 0; i < this.peers.all.length; i++) {
peer = this.peers.all[i];
if (peer.host === host)
return peer;
}
};
Pool.prototype.usableSeed = function usableSeed(addrs, force) {
var i, addr;
if (typeof addrs === 'boolean') {
force = addrs;
addrs = null;
}
if (!addrs)
addrs = this.seeds;
for (i = 0; i < addrs.length; i++) {
if (!this.getPeer(addrs[i]))
return addrs[i];
}
if (!force)
return;
if (addrs.length === 1)
return addrs[0];
do {
addr = addrs[Math.random() * (addrs.length - 1) | 0];
} while (this.peers.load && this.getPeer(addr) === this.peers.load);
return addr;
};
Pool.prototype.toJSON = function toJSON() {
return {
v: 1,