From 6cab97b2654474923635300f6a6b392b3ed2b7c2 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Mon, 8 Feb 2016 18:46:09 -0800 Subject: [PATCH] listen on server socket for leech peers. --- lib/bcoin/peer.js | 58 +++++++++++-- lib/bcoin/pool.js | 216 +++++++++++++++++++++++++++++++--------------- 2 files changed, 197 insertions(+), 77 deletions(-) diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 88fb8c1e..a619d344 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -18,17 +18,24 @@ var network = bcoin.protocol.network; * Peer */ -function Peer(pool, createConnection, options) { +function Peer(pool, options) { var self = this; if (!(this instanceof Peer)) - return new Peer(pool, createConnection, options); + return new Peer(pool, options); EventEmitter.call(this); - this.options = options || {}; + if (!options) + options = {}; + + this.options = options; this.pool = pool; this.socket = null; + this.host = null; + this.port = 0; + this._createSocket = this.options.createSocket; + this.priority = this.options.priority; this.parser = new bcoin.protocol.parser(); this.framer = new bcoin.protocol.framer(); this.chain = this.pool.chain; @@ -39,9 +46,6 @@ function Peer(pool, createConnection, options) { this.connected = false; this.ts = this.options.ts || 0; - this.host = null; - this.port = 0; - this.challenge = null; this.lastPong = 0; @@ -49,7 +53,17 @@ function Peer(pool, createConnection, options) { this.orphans = 0; this.orphanTime = 0; - this.socket = createConnection.call(pool, this, options); + if (options.socket) { + this.socket = options.socket; + this.host = this.socket.remoteAddress; + this.port = this.socket.remotePort; + assert(this.host); + assert(this.port != null); + } else if (options.seed) { + options.seed = utils.parseHost(options.seed); + this.socket = this.createSocket(options.seed.port, options.seed.host); + } + if (!this.socket) throw new Error('No socket'); @@ -168,6 +182,36 @@ Peer.prototype._init = function init() { })); }; +Peer.prototype.createSocket = function createSocket(port, host) { + var net, socket; + + assert(port != null); + assert(host); + + this.host = host; + this.port = port; + + if (this._createSocket) { + socket = this._createSocket(port, host); + } else { + if (!bcoin.net) + throw new Error('Please include a `createSocket` callback.'); + socket = bcoin.net.connect(port, host); + } + + utils.debug( + 'Connecting to %s:%d (priority=%s)', + host, port, this.priority); + + socket.on('connect', function() { + utils.debug( + 'Connected to %s:%d (priority=%s)', + host, port, this.priority); + }); + + return socket; +}; + Peer.prototype.broadcast = function broadcast(items) { var self = this; var result; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 29d21c2b..b2442c31 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -54,11 +54,10 @@ function Pool(options) { this.originalSeeds = (options.seeds || network.seeds).map(utils.parseHost); this.setSeeds([]); + this.server = null; this.destroyed = false; this.size = options.size || 32; - this._createSocket = options.createSocket; - if (!this.options.fullNode) { if (this.options.headers == null) this.options.headers = true; @@ -99,9 +98,11 @@ function Pool(options) { this.peers = { // Peers that are loading blocks themselves - block: [], + regular: [], // Peers that are still connecting pending: [], + // Peers that connected to us + leeches: [], // Peers that are loading block ids load: null, // All peers @@ -234,11 +235,46 @@ Pool.prototype._init = function _init() { this.emit('full'); utils.debug('Chain is fully synced (height=%d).', this.chain.height()); } + + this.startServer(); }; Pool.prototype.startServer = function startServer() { - bcoin.net.createServer(function(socket) { + var self = this; + + if (!bcoin.net) + return; + + if (!this.options.listen) + return; + + assert(!this.server); + + this.server = new bcoin.net.Server(); + + this.server.on('connection', function(socket) { + self._addLeech(socket); + }) + + this.server.on('listening', function() { + var data = self.server.address(); + utils.debug( + 'Bitcoin server listening on %s (port=%d)', + data.address, data.port); }); + + this.server.listen(network.port, '0.0.0.0'); +}; + +Pool.prototype.stopServer = function stopServer() { + if (!bcoin.net) + return; + + if (!this.server) + return; + + this.server.close(); + delete this.server; }; Pool.prototype._startTimer = function _startTimer() { @@ -300,38 +336,6 @@ Pool.prototype._stopInterval = function _stopInterval() { delete this._interval; }; -Pool.prototype.createConnection = function createConnection(peer, options) { - var addr, net, socket; - - addr = this.getSeed(options.priority); - - assert(addr); - assert(addr.host); - - peer.host = addr.host; - peer.port = addr.port; - - if (this._createSocket) { - socket = this._createSocket(addr.port, addr.host); - } else { - if (!bcoin.net) - throw new Error('Please include a `createSocket` callback.'); - socket = bcoin.net.connect(addr.port, addr.host); - } - - utils.debug( - 'Connecting to %s:%d (priority=%s)', - addr.host, addr.port, options.priority); - - socket.on('connect', function() { - utils.debug( - 'Connected to %s:%d (priority=%s)', - addr.host, addr.port, options.priority); - }); - - return socket; -}; - Pool.prototype._addLoader = function _addLoader() { var self = this; var peer; @@ -342,7 +346,12 @@ Pool.prototype._addLoader = function _addLoader() { if (this.peers.load != null) return; - peer = this._createPeer(true); + peer = this._createPeer({ + seed: this.getSeed(true), + priority: true + }); + + assert(peer); utils.debug('Added loader peer: %s', peer.host); @@ -464,8 +473,8 @@ Pool.prototype._handleHeaders = function _handleHeaders(headers, peer) { blockPeer = peer; // if (this.options.multiplePeers) { - // if (this.peers.block.length) { - // blockPeer = this.peers.block[i % (this.peers.block.length + 1)]; + // if (this.peers.regular.length) { + // blockPeer = this.peers.regular[i % (this.peers.regular.length + 1)]; // if (!blockPeer) // blockPeer = this.peers.load; // } @@ -720,19 +729,20 @@ Pool.prototype.loadMempool = function loadMempool() { if (this.peers.load) this.peers.load.loadMempool(); - this.peers.block.forEach(function(peer) { + this.peers.regular.forEach(function(peer) { peer.loadMempool(); }); }; -Pool.prototype._createPeer = function _createPeer(priority, socket) { +Pool.prototype._createPeer = function _createPeer(options) { var self = this; - var peer = new bcoin.peer(this, this.createConnection, { - startHeight: this.options.startHeight, + var peer = new bcoin.peer(this, { + seed: options.seed, + createSocket: this.options.createSocket, relay: this.options.relay, - priority: priority, - socket: socket + priority: options.priority, + socket: options.socket }); peer.on('error', function(err) { @@ -808,30 +818,90 @@ Pool.prototype._handleTX = function _handleTX(tx, peer) { this.emit('watched', tx, peer); }; -Pool.prototype._addPeer = function _addPeer(socket) { - var self = this; +Pool.prototype._addLeech = function _addLeech(socket) { var peer; - if (!socket) { - if (this.destroyed) + if (this.destroyed) + return socket.destroy(); + + peer = this._createPeer({ + socket: socket, + priority: false + }); + + assert(peer); + + this.peers.leeches.push(peer); + this.peers.all.push(peer); + + peer.once('close', function() { + self._removePeer(peer); + }); + + peer.once('ack', function() { + var i; + + if (self.destroyed) return; - if (this.peers.block.length + this.peers.pending.length >= this.size) - return; + peer.updateWatch(); - if (!this.getSeed()) { - setTimeout(this._addPeer.bind(this), 5000); - return; - } + self.inv.list.forEach(function(entry) { + var result = peer.broadcast(entry.msg); + if (!result) + return; - peer = this._createPeer(false); - } else { - if (this.destroyed) - return socket.destroy(); + result[0].once('request', function() { + entry.e.emit('ack', peer); + }); - peer = this._createPeer(false, socket); + result[0].once('reject', function(payload) { + entry.e.emit('reject', payload, peer); + }); + }); + }); + + peer.on('merkleblock', function(block) { + self._handleBlock(block, peer); + }); + + peer.on('block', function(block) { + self._handleBlock(block, peer); + }); + + peer.on('blocks', function(hashes) { + self._handleInv(hashes, peer); + }); + + utils.nextTick(function() { + self.emit('leech', peer); + }); + + return peer; +}; + +Pool.prototype._addPeer = function _addPeer() { + var self = this; + var peer, seed; + + if (this.destroyed) + return; + + if (this.peers.regular.length + this.peers.pending.length >= this.size) + return; + + seed = this.getSeed(false); + + if (!seed) { + setTimeout(this._addPeer.bind(this), 5000); + return; } + peer = this._createPeer({ + seed: seed, + priority: false + }); + this.peers.pending.push(peer); this.peers.all.push(peer); @@ -851,7 +921,7 @@ Pool.prototype._addPeer = function _addPeer(socket) { i = self.peers.pending.indexOf(peer); if (i !== -1) { self.peers.pending.splice(i, 1); - self.peers.block.push(peer); + self.peers.regular.push(peer); } peer.updateWatch(); @@ -914,7 +984,7 @@ Pool.prototype._addTX = function(hash, state) { }; Pool.prototype.bestPeer = function bestPeer() { - return this.peers.block.reduce(function(best, peer) { + return this.peers.regular.reduce(function(best, peer) { if (!peer.version || !peer.socket) return; @@ -930,9 +1000,13 @@ Pool.prototype._removePeer = function _removePeer(peer) { if (i !== -1) this.peers.pending.splice(i, 1); - i = this.peers.block.indexOf(peer); + i = this.peers.regular.indexOf(peer); if (i !== -1) - this.peers.block.splice(i, 1); + this.peers.regular.splice(i, 1); + + i = this.peers.leeches.indexOf(peer); + if (i !== -1) + this.peers.leeches.splice(i, 1); i = this.peers.all.indexOf(peer); if (i !== -1) @@ -978,8 +1052,8 @@ Pool.prototype.watch = function watch(id) { if (self.peers.load) self.peers.load.updateWatch(); - for (i = 0; i < self.peers.block.length; i++) - self.peers.block[i].updateWatch(); + for (i = 0; i < self.peers.regular.length; i++) + self.peers.regular[i].updateWatch(); }); }; @@ -1015,8 +1089,8 @@ Pool.prototype.unwatch = function unwatch(id) { if (self.peers.load) self.peers.load.updateWatch(); - for (i = 0; i < self.peers.block.length; i++) - self.peers.block[i].updateWatch(); + for (i = 0; i < self.peers.regular.length; i++) + self.peers.regular[i].updateWatch(); }); }; @@ -1532,7 +1606,7 @@ Pool.prototype.broadcast = function broadcast(msg) { this.inv.list.push(entry); - this.peers.block.forEach(function(peer) { + this.peers.regular.forEach(function(peer) { var result = peer.broadcast(msg); if (!result) return; result[0].once('request', function() { @@ -1565,9 +1639,11 @@ Pool.prototype.destroy = function destroy() { peer.destroy(); }); - this.peers.block.slice().forEach(function(peer) { + this.peers.regular.slice().forEach(function(peer) { peer.destroy(); }); + + this.stopServer(); }; Pool.prototype.getPeer = function getPeer(addr) {