From 2f618c961baad00e7e5920522cb617543b0fd447 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Tue, 5 Jan 2016 01:03:11 -0800 Subject: [PATCH] fixes. peer improvements. --- lib/bcoin.js | 4 +- lib/bcoin/fullchain.js | 2 +- lib/bcoin/peer.js | 60 +++++++++------ lib/bcoin/pool.js | 163 ++++++++++++++++++++++++++++++++--------- 4 files changed, 166 insertions(+), 63 deletions(-) diff --git a/lib/bcoin.js b/lib/bcoin.js index 7f13baf4..6c3e0578 100644 --- a/lib/bcoin.js +++ b/lib/bcoin.js @@ -20,9 +20,7 @@ bcoin.output = require('./bcoin/output'); bcoin.tx = require('./bcoin/tx'); bcoin.txPool = require('./bcoin/tx-pool'); bcoin.block = require('./bcoin/block'); -bcoin.chain = require('./bcoin/chain'); -bcoin.spvChain = require('./bcoin/chain'); -bcoin.fullChain = require('./bcoin/fullchain'); +bcoin.chain = require('./bcoin/fullchain'); bcoin.wallet = require('./bcoin/wallet'); bcoin.peer = require('./bcoin/peer'); bcoin.pool = require('./bcoin/pool'); diff --git a/lib/bcoin/fullchain.js b/lib/bcoin/fullchain.js index f2fcff90..30aa9d52 100644 --- a/lib/bcoin/fullchain.js +++ b/lib/bcoin/fullchain.js @@ -186,7 +186,7 @@ Chain.prototype.resetHeight = function resetHeight(height) { var self = this; var ahead = this.index.entries.slice(height + 1); - assert(height <= this.index.entries.length - 1); + assert(height < this.index.entries.length); if (height === this.index.entries.length - 1) return; diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 03aad18e..6650f492 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -42,16 +42,21 @@ function Peer(pool, createSocket, options) { this.destroyed = false; this.ack = false; this.ts = this.options.ts || 0; - this.address = '0.0.0.0'; - this.port = network.port; + + this.host = null; + this.port = 0; if (this.options.backoff) { setTimeout(function() { self.socket = createSocket(self, pool); + if (!self.socket) + throw new Error('No socket'); self.emit('socket'); }, this.options.backoff); } else { this.socket = createSocket(this, pool); + if (!this.socket) + throw new Error('No socket'); } this._broadcast = { @@ -72,7 +77,7 @@ function Peer(pool, createSocket, options) { interval: this.options.pingInterval || 30000 }; - this.setMaxListeners(4000); + this.setMaxListeners(10000); if (this.socket) this._init(); @@ -85,10 +90,19 @@ inherits(Peer, EventEmitter); Peer.prototype._init = function init() { var self = this; + if (!this.host) + this.host = this.socket.remoteAddress || this.socket._host || null; + + if (!this.port) + this.port = this.socket.remotePort || 0; + this.socket.once('connect', function() { self.ts = utils.now(); - self.address = self.socket.remoteAddress; - self.port = self.socket.remotePort; + self.connected = true; + if (!self.host) + self.host = self.socket.remoteAddress; + if (!self.port) + self.port = self.socket.remotePort; }); this.socket.once('error', function(err) { @@ -97,6 +111,7 @@ Peer.prototype._init = function init() { this.socket.once('close', function() { self._error('socket hangup'); + self.connected = false; }); this.socket.on('data', function(chunk) { @@ -115,7 +130,7 @@ Peer.prototype._init = function init() { this.once('version', function() { self.pool.emit('debug', 'Sent version (%s): height=%s', - self.address, this.pool.chain.getStartHeight()); + self.host, this.pool.chain.getStartHeight()); }); } @@ -133,8 +148,10 @@ Peer.prototype._init = function init() { })); this._req('verack', function(err, payload) { - if (err) + if (err) { + self.destroy(); return self._error(err); + } self.ack = true; self.emit('ack'); self.ts = utils.now(); @@ -358,12 +375,12 @@ Peer.prototype._onPacket = function onPacket(packet) { if (cmd === 'merkleblock' || cmd === 'block') { payload.network = true; - payload.relayedBy = this.address; + payload.relayedBy = this.host || '0.0.0.0'; payload = bcoin.block(payload, cmd); this.lastBlock = payload; } else if (cmd === 'tx') { payload.network = true; - payload.relayedBy = this.address; + payload.relayedBy = this.host || '0.0.0.0'; payload = bcoin.tx(payload, this.lastBlock); } @@ -411,11 +428,14 @@ Peer.prototype._handleAddr = function handleAddr(addrs) { service: addr.service, ipv4: addr.ipv4, ipv6: addr.ipv6, - address: addr.ipv4, - address6: addr.ipv6, + host: addr.ipv4, + host6: addr.ipv6, port: addr.port, - host: addr.ipv4 + ':' + addr.port, - host6: '[' + addr.ipv6 + ']:' + addr.port + addr: addr.ipv4 + ':' + addr.port, + addr6: '[' + addr.ipv6 + ']:' + addr.port, + // Deprecated: + address: addr.ipv4, + address6: addr.ipv6 }); }, this); }; @@ -432,21 +452,15 @@ Peer.prototype._handleGetAddr = function handleGetAddr() { var used = []; var peers, addrs; - peers = [].concat( - this.pool.peers.pending, - this.pool.peers.block, - this.pool.peers.load - ).filter(Boolean); - // NOTE: For IPv6 BTC uses: // '0000:0000:0000:0000:0000:xxxx:xxxx:ffff' - peers = peers.map(function(peer) { + peers = this.pool.peers.all.map(function(peer) { if (!peer.socket || !peer.socket.remoteAddress) return; return { host: peer.socket.remoteAddress, - port: peer.socket.remotePort || 8333, + port: peer.socket.remotePort || network.port, ts: peer.ts }; }).filter(function(peer) { @@ -532,14 +546,14 @@ Peer.prototype._handleHeaders = function handleHeaders(headers) { Peer.prototype.loadHeaders = function loadHeaders(hashes, stop) { this.emit('debug', 'Requesting headers packet from %s with getheaders', - this.address); + this.host); this._write(this.framer.getHeaders(hashes, stop)); }; Peer.prototype.loadBlocks = function loadBlocks(hashes, stop) { this.emit('debug', 'Requesting inv packet from %s with getblocks', - this.address); + this.host); this._write(this.framer.getBlocks(hashes, stop)); }; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index fcbf2abe..2aa3a3ec 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -43,6 +43,10 @@ function Pool(options) { this.size = options.size || 32; this.parallel = options.parallel || 2000; this.redundancy = options.redundancy || 2; + this.seeds = network.seeds.slice(); + + this._createConnection = options.createConnection; + this._createSocket = options.createSocket; if (!this.options.fullNode) { if (this.options.headers == null) @@ -79,9 +83,7 @@ function Pool(options) { this.maxRetries = options.maxRetries || 42; this.requestTimeout = options.requestTimeout || 10000; - Chain = bcoin.fullChain; - - this.chain = new Chain({ + this.chain = new bcoin.chain({ storage: this.storage, fullNode: this.options.fullNode }); @@ -100,7 +102,9 @@ function Pool(options) { // Peers that are still connecting pending: [], // Peers that are loading block ids - load: null + load: null, + // All peers + all: [] }; this.block = { @@ -127,7 +131,7 @@ function Pool(options) { }; // Currently broadcasted TXs - this.tx = { + this.inv = { list: [], timeout: options.txTimeout || 60000 }; @@ -136,9 +140,6 @@ function Pool(options) { this.options.wallets = this.options.wallets || []; this.wallets = []; - this.createSocket = options.createConnection || options.createSocket; - assert(this.createSocket); - this.chain.on('debug', function() { var args = Array.prototype.slice.call(arguments); self.emit.apply(self, ['debug'].concat(args)); @@ -240,6 +241,34 @@ Pool.prototype._stopInterval = function _stopInterval() { delete this._interval; }; +Pool.prototype.createConnection = function createConnection(peer, pool) { + var addr, parts, host, net, socket, port; + + if (pool._createConnection) + return pool._createConnection(peer, pool); + + addr = pool.usableSeed(true); + + parts = addr.split(':'); + host = parts[0]; + port = +parts[1] || network.port; + + peer.host = host; + peer.port = port; + + if (this._createSocket) { + socket = this._createSocket(port, host); + } else { + net = require('net'); + socket = net.connect(port, host); + } + + socket.on('connect', function() { + pool.emit('debug', 'Connected to %s:%d', host, port); + }); + + return socket; +}; Pool.prototype._addLoader = function _addLoader() { var self = this; @@ -251,13 +280,14 @@ Pool.prototype._addLoader = function _addLoader() { if (this.peers.load != null) return; - peer = new bcoin.peer(this, this.createSocket, { + peer = new bcoin.peer(this, this.createConnection, { backoff: 750 * Math.random(), startHeight: this.options.startHeight, relay: this.options.relay }); this.peers.load = peer; + this.peers.all.push(peer); peer.on('error', function(err) { self.emit('error', err, peer); @@ -278,10 +308,6 @@ Pool.prototype._addLoader = function _addLoader() { }); peer.once('ack', function() { - if (!peer.socket) { - peer.destroy(); - return; - } peer.updateWatch(); if (!self._load()) self._stopTimer(); @@ -330,7 +356,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { this.emit('debug', 'Recieved %s headers from %s', headers.length, - peer.address); + peer.host); if (headers.length > 2000) { peer.destroy(); @@ -376,7 +402,7 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { this.emit('debug', 'Requesting %s block packets from %s with getdata', this.request.queue.length, - peer.address + peer.host ); }; @@ -391,7 +417,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { this.emit('debug', 'Recieved %s block hashes from %s', hashes.length, - peer.address); + peer.host); if (hashes.length > 500) { peer.destroy(); @@ -431,7 +457,7 @@ Pool.prototype._handleBlocks = function _handleBlocks(hashes, peer) { this.emit('debug', 'Requesting %s block packets from %s with getdata', this.request.queue.length, - peer.address + peer.host ); }; @@ -584,13 +610,19 @@ Pool.prototype._addPeer = function _addPeer(backoff) { if (this.peers.block.length + this.peers.pending.length >= this.size) return; - peer = new bcoin.peer(this, this.createSocket, { + if (!this._createConnection && !this.usableSeed()) { + setTimeout(this._addPeer.bind(this, 0), this.backoff.max); + return; + } + + peer = new bcoin.peer(this, this.createConnection, { backoff: backoff, startHeight: this.options.startHeight, relay: this.options.relay }); this.peers.pending.push(peer); + this.peers.all.push(peer); peer._retry = 0; @@ -608,17 +640,12 @@ Pool.prototype._addPeer = function _addPeer(backoff) { self._removePeer(peer); if (self.destroyed) return; - self._addPeer(Math.max(backoff + self.backoff.delta, self.backoff.max)); + self._addPeer(Math.min(backoff + self.backoff.delta, self.backoff.max)); }); peer.once('ack', function() { var i; - if (!peer.socket) { - peer.destroy(); - return; - } - if (self.destroyed) return; @@ -630,8 +657,8 @@ Pool.prototype._addPeer = function _addPeer(backoff) { peer.updateWatch(); - self.tx.list.forEach(function(entry) { - var result = peer.broadcast(entry.tx); + self.inv.list.forEach(function(entry) { + var result = peer.broadcast(entry.msg); if (!result) return; @@ -677,6 +704,16 @@ Pool.prototype._addPeer = function _addPeer(backoff) { }); peer.on('addr', function(addr) { + var host = addr.ipv4 + ':' + addr.port; + + if (self.seeds.length > self.size * 2) + self.seeds = network.seeds.slice(); + + if (!~self.seeds.indexOf(host)) { + self.emit('debug', 'Found new peer: %s', host); + self.seeds.push(host); + } + self.emit('addr', addr, peer); }); @@ -720,6 +757,10 @@ Pool.prototype._removePeer = function _removePeer(peer) { if (i !== -1) this.peers.block.splice(i, 1); + i = this.peers.all.indexOf(peer); + if (i !== -1) + this.peers.all.splice(i, 1); + if (this.peers.load === peer) this.peers.load = null; }; @@ -1178,7 +1219,7 @@ Pool.prototype.getBlock = function getBlock(hash, cb) { }; Pool.prototype.sendBlock = function sendBlock(block) { - return this.sendTX(block); + return this.broadcast(block); }; Pool.prototype.getTX = function getTX(hash, range, cb) { @@ -1248,23 +1289,27 @@ Pool.prototype.getTX = function getTX(hash, range, cb) { }; Pool.prototype.sendTX = function sendTX(tx) { + return this.broadcast(tx); +}; + +Pool.prototype.broadcast = function broadcast(msg) { var self = this; var e = new EventEmitter(); var entry = { - tx: tx, + msg: msg, e: e, timer: setTimeout(function() { - var i = self.tx.list.indexOf(entry); + var i = self.inv.list.indexOf(entry); if (i !== -1) - self.tx.list.splice(i, 1); - }, this.tx.timeout) + self.inv.list.splice(i, 1); + }, this.inv.timeout) }; - this.tx.list.push(entry); + this.inv.list.push(entry); this.peers.block.forEach(function(peer) { - var result = peer.broadcast(tx); + var result = peer.broadcast(msg); if (!result) return; result[0].once('request', function() { e.emit('ack', peer); @@ -1287,9 +1332,9 @@ Pool.prototype.destroy = function destroy() { item.finish(null); }); - this.tx.list.forEach(function(tx) { - clearTimeout(tx.timer); - tx.timer = null; + this.inv.list.forEach(function(entry) { + clearTimeout(entry.timer); + entry.timer = null; }); this.peers.pending.slice().forEach(function(peer) { @@ -1306,6 +1351,52 @@ Pool.prototype.destroy = function destroy() { this.load.timer = null; }; +Pool.prototype.getPeer = function getPeer(addr) { + var parts, host, port, i, peer; + + if (!addr) + return; + + parts = addr.split(':'); + host = parts[0]; + port = +parts[1] || network.port; + + for (i = 0; i < this.peers.all.length; i++) { + peer = this.peers.all[i]; + if (peer.host === host) + return peer; + } +}; + +Pool.prototype.usableSeed = function usableSeed(addrs, force) { + var i, addr; + + if (typeof addrs === 'boolean') { + force = addrs; + addrs = null; + } + + if (!addrs) + addrs = this.seeds; + + for (i = 0; i < addrs.length; i++) { + if (!this.getPeer(addrs[i])) + return addrs[i]; + } + + if (!force) + return; + + if (addrs.length === 1) + return addrs[0]; + + do { + addr = addrs[Math.random() * (addrs.length - 1) | 0]; + } while (this.peers.load && this.getPeer(addr) === this.peers.load); + + return addr; +}; + Pool.prototype.toJSON = function toJSON() { return { v: 1,