From 4692849593481ced3ab83d2144641e747e3d520f Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Sat, 21 Jan 2017 03:01:18 -0800 Subject: [PATCH] peer: abstract all deps. --- lib/blockchain/chain.js | 2 +- lib/mempool/mempool.js | 3 - lib/net/framer.js | 19 +- lib/net/parser.js | 50 +- lib/net/peer.js | 989 +++++++++++++++++++------------------- lib/net/pool.js | 617 ++++++++++++++++-------- lib/protocol/network.js | 21 +- lib/protocol/networks.js | 10 +- lib/utils/asyncemitter.js | 315 ++++++++++++ 9 files changed, 1289 insertions(+), 737 deletions(-) create mode 100644 lib/utils/asyncemitter.js diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index 948a42be..5023631b 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -1347,7 +1347,7 @@ Chain.prototype.verifyCheckpoint = function verifyCheckpoint(prev, hash) { if (!this.options.useCheckpoints) return true; - checkpoint = this.network.checkpoints[height]; + checkpoint = this.network.checkpointMap[height]; if (!checkpoint) return true; diff --git a/lib/mempool/mempool.js b/lib/mempool/mempool.js index 450ee4e7..a8d1bb29 100644 --- a/lib/mempool/mempool.js +++ b/lib/mempool/mempool.js @@ -18,14 +18,11 @@ var Bloom = require('../utils/bloom'); var Address = require('../primitives/address'); var Coin = require('../primitives/coin'); var Script = require('../script/script'); -var Lock = require('../utils/lock'); var Outpoint = require('../primitives/outpoint'); var TX = require('../primitives/tx'); var Coin = require('../primitives/coin'); var TXMeta = require('../primitives/txmeta'); var MempoolEntry = require('./mempoolentry'); -var CoinView = require('../coins/coinview'); -var Coins = require('../coins/coins'); var Network = require('../protocol/network'); var VerifyError = errors.VerifyError; var VerifyResult = errors.VerifyResult; diff --git a/lib/net/framer.js b/lib/net/framer.js index 424fa97b..fa822187 100644 --- a/lib/net/framer.js +++ b/lib/net/framer.js @@ -7,28 +7,22 @@ 'use strict'; +var assert = require('assert'); var Network = require('../protocol/network'); var crypto = require('../crypto/crypto'); -var assert = require('assert'); /** * Protocol packet framer * @exports Framer * @constructor - * @param {Object} options + * @param {Network} network */ -function Framer(options) { +function Framer(network) { if (!(this instanceof Framer)) - return new Framer(options); + return new Framer(network); - if (!options) - options = {}; - - this.options = options; - - this.network = Network.get(options.network); - this.bip151 = options.bip151; + this.network = Network.get(network); } /** @@ -43,9 +37,6 @@ Framer.prototype.packet = function packet(cmd, payload, checksum) { assert(payload, 'No payload.'); - if (this.bip151 && this.bip151.handshake) - return this.bip151.packet(cmd, payload); - assert(cmd.length < 12); assert(payload.length <= 0xffffffff); diff --git a/lib/net/parser.js b/lib/net/parser.js index 29523f1d..9a54a83e 100644 --- a/lib/net/parser.js +++ b/lib/net/parser.js @@ -7,11 +7,11 @@ 'use strict'; -var Network = require('../protocol/network'); +var assert = require('assert'); var EventEmitter = require('events').EventEmitter; +var Network = require('../protocol/network'); var util = require('../utils/util'); var crypto = require('../crypto/crypto'); -var assert = require('assert'); var common = require('./common'); var packets = require('./packets'); @@ -19,56 +19,27 @@ var packets = require('./packets'); * Protocol packet parser * @exports Parser * @constructor - * @param {Object?} options + * @param {Network} network * @emits Parser#error * @emits Parser#packet */ -function Parser(options) { +function Parser(network) { if (!(this instanceof Parser)) - return new Parser(options); - - if (!options) - options = {}; + return new Parser(network); EventEmitter.call(this); - this.network = Network.get(options.network); - this.bip151 = options.bip151; + this.network = Network.get(network); this.pending = []; this.total = 0; this.waiting = 24; this.header = null; - - this._init(); } util.inherits(Parser, EventEmitter); -/** - * Initialize. Bind to events. - * @private - * @param {String} str - */ - -Parser.prototype._init = function _init(str) { - var self = this; - - if (!this.bip151) - return; - - this.bip151.on('packet', function(cmd, body) { - var payload; - try { - payload = self.parsePayload(cmd, body); - } catch (e) { - return self.error(e); - } - self.emit('packet', payload); - }); -}; - /** * Emit an error. * @private @@ -88,9 +59,6 @@ Parser.prototype.error = function error() { Parser.prototype.feed = function feed(data) { var chunk, off, len; - if (this.bip151 && this.bip151.handshake) - return this.bip151.feed(data); - this.total += data.length; this.pending.push(data); @@ -135,7 +103,8 @@ Parser.prototype.parse = function parse(data) { if (checksum !== this.header.checksum) { this.waiting = 24; this.header = null; - return this.error('Invalid checksum: %d.', util.hex32(checksum)); + this.error('Invalid checksum: %d.', util.hex32(checksum)); + return; } try { @@ -179,7 +148,8 @@ Parser.prototype.parseHeader = function parseHeader(data) { if (size > common.MAX_MESSAGE) { this.waiting = 24; - return this.error('Packet length too large: %dmb.', util.mb(size)); + this.error('Packet length too large: %dmb.', util.mb(size)); + return; } this.waiting = size; diff --git a/lib/net/peer.js b/lib/net/peer.js index bfa16f47..92df0938 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -27,6 +27,9 @@ var Block = require('../primitives/block'); var TX = require('../primitives/tx'); var encoding = require('../utils/encoding'); var NetAddress = require('../primitives/netaddress'); +var Network = require('../protocol/network'); +var Logger = require('../node/logger'); +var tcp = require('./tcp'); var services = common.services; var invTypes = InvItem.types; var packetTypes = packets.types; @@ -35,18 +38,11 @@ var packetTypes = packets.types; * Represents a remote peer. * @exports Peer * @constructor - * @param {Pool} pool - * @param {NetAddress} address - * @param {net.Socket?} socket - * @property {Pool} pool - * @property {net.Socket?} socket - * @property {String} host - * @property {Number} port - * @property {String} hostname + * @param {PeerOptions} options + * @property {net.Socket} socket + * @property {NetAddress} address * @property {Parser} parser * @property {Framer} framer - * @property {Chain} chain - * @property {Mempool} mempool * @property {Number} version * @property {Boolean} destroyed * @property {Boolean} ack - Whether verack has been received. @@ -69,25 +65,24 @@ var packetTypes = packets.types; * @emits Peer#ack */ -function Peer(pool) { +function Peer(options) { if (!(this instanceof Peer)) - return new Peer(pool); + return new Peer(options); EventEmitter.call(this); - this.pool = pool; - this.options = pool.options; + this.options = options; this.network = this.options.network; this.logger = this.options.logger; - this.chain = this.options.chain; - this.mempool = this.options.mempool; this.locker = new Lock(); - this.next = null; - this.prev = null; + this.parser = new Parser(this.network); + this.framer = new Framer(this.network); this.socket = null; + this.opened = false; this.outbound = false; + this.loader = false; this.address = new NetAddress(); this.connected = false; this.destroyed = false; @@ -101,6 +96,10 @@ function Peer(pool) { this.drainQueue = []; this.banScore = 0; this.invQueue = []; + this.onPacket = null; + + this.next = null; + this.prev = null; this.version = -1; this.services = 0; @@ -139,22 +138,6 @@ function Peer(pool) { this.responseMap = new Map(); this.compactBlocks = new Map(); - if (this.options.bip151) { - this.bip151 = new BIP151(); - if (this.options.bip150) { - this.bip150 = new BIP150( - this.bip151, - this.hostname, - this.outbound, - this.pool.authdb, - this.options.identityKey); - this.bip151.bip150 = this.bip150; - } - } - - this.parser = new Parser(this); - this.framer = new Framer(this); - this._init(); } @@ -167,7 +150,7 @@ util.inherits(Peer, EventEmitter); * @default */ -Peer.DRAIN_MAX = 5 << 20; +Peer.DRAIN_MAX = 10 << 20; /** * Timeout for peer to read from @@ -227,40 +210,40 @@ Peer.RESPONSE_TIMEOUT = 30000; Peer.BLOCK_TIMEOUT = 60000; /** - * Getter to retrieve host. - * @function - * @name host(get) - * @memberof Peer# - * @returns {String} + * Create inbound peer from socket. + * @param {PeerOptions} options + * @param {net.Socket} socket + * @returns {Peer} */ -Peer.prototype.__defineGetter__('host', function() { - return this.address.host; -}); +Peer.fromInbound = function fromInbound(options, socket) { + var peer = new Peer(options); + peer.accept(socket); + return peer; +}; /** - * Getter to retrieve port. - * @function - * @name port(get) - * @memberof Peer# - * @returns {Number} + * Create outbound peer from net address. + * @param {PeerOptions} options + * @param {NetAddress} addr + * @returns {Peer} */ -Peer.prototype.__defineGetter__('port', function() { - return this.address.port; -}); +Peer.fromOutbound = function fromOutbound(options, addr) { + var peer = new Peer(options); + peer.connect(addr); + return peer; +}; /** - * Getter to retrieve hostname. - * @function - * @name hostname(get) - * @memberof Peer# - * @returns {Number} + * Create a peer from options. + * @param {Object} options + * @returns {Peer} */ -Peer.prototype.__defineGetter__('hostname', function() { - return this.address.hostname; -}); +Peer.fromOptions = function fromOptions(options) { + return new Peer(new PeerOptions(options)); +}; /** * Begin peer initialization. @@ -280,24 +263,105 @@ Peer.prototype._init = function init() { })); this.parser.on('error', function(err) { + if (self.destroyed) + return; + self.error(err); self.sendReject('malformed', 'error parsing message'); self.increaseBan(10); }); +}; - if (this.bip151) { - this.bip151.on('error', function(err) { - self.error(err); - self.destroy(); - }); - this.bip151.on('rekey', function() { - if (self.destroyed) - return; +/** + * Getter to retrieve hostname. + * @returns {String} + */ - self.logger.debug('Rekeying with peer (%s).', self.hostname); - self.send(self.bip151.toRekey()); - }); - } +Peer.prototype.hostname = function hostname() { + return this.address.hostname; +}; + +/** + * Frame a payload with a header. + * @param {String} cmd - Packet type. + * @param {Buffer} payload + * @returns {Buffer} Payload with header prepended. + */ + +Peer.prototype.framePacket = function framePacket(cmd, payload, checksum) { + if (this.bip151 && this.bip151.handshake) + return this.bip151.packet(cmd, payload); + return this.framer.packet(cmd, payload, checksum); +}; + +/** + * Feed data to the parser. + * @param {Buffer} data + */ + +Peer.prototype.feedParser = function feedParser(data) { + if (this.bip151 && this.bip151.handshake) + return this.bip151.feed(data); + return this.parser.feed(data); +}; + +/** + * Set BIP151 cipher type. + * @param {Number} cipher + */ + +Peer.prototype.setCipher = function setCipher(cipher) { + var self = this; + + assert(!this.bip151, 'BIP151 already set.'); + assert(this.socket, 'Peer must be initialized with a socket.'); + assert(!this.opened, 'Cannot set cipher after open.'); + + this.bip151 = new BIP151(cipher); + + this.bip151.on('error', function(err) { + self.error(err); + self.destroy(); + }); + + this.bip151.on('rekey', function() { + if (self.destroyed) + return; + + self.logger.debug('Rekeying with peer (%s).', self.hostname); + self.send(self.bip151.toRekey()); + }); + + this.bip151.on('packet', function(cmd, body) { + var payload; + try { + payload = self.parser.parsePayload(cmd, body); + } catch (e) { + self.parser.error(e); + return; + } + self.parser.emit('packet', payload); + }); +}; + +/** + * Set BIP150 auth. + * @param {AuthDB} db + * @param {Buffer} key + */ + +Peer.prototype.setAuth = function setAuth(db, key) { + var bip151 = this.bip151; + var hostname = this.hostname(); + var outbound = this.outbound; + + assert(this.bip151, 'BIP151 not set.'); + assert(!this.bip150, 'BIP150 already set.'); + assert(this.socket, 'Peer must be initialized with a socket.'); + assert(!this.opened, 'Cannot set auth after open.'); + + this.bip150 = new BIP150(bip151, hostname, outbound, db, key); + this.bip151.bip150 = this.bip150; }; /** @@ -312,10 +376,6 @@ Peer.prototype.bind = function bind(socket) { this.socket = socket; - this.socket.once('connect', function() { - self.logger.info('Connected to %s.', self.hostname); - }); - this.socket.once('error', function(err) { if (!self.connected) return; @@ -325,7 +385,7 @@ Peer.prototype.bind = function bind(socket) { }); this.socket.once('close', function() { - self.error('socket hangup'); + self.error('Socket hangup.'); self.destroy(); }); @@ -338,7 +398,7 @@ Peer.prototype.bind = function bind(socket) { return; self.lastRecv = util.ms(); - self.parser.feed(chunk); + self.feedParser(chunk); }); }; @@ -358,6 +418,8 @@ Peer.prototype.accept = function accept(socket) { this.connected = true; this.bind(socket); + + return socket; }; /** @@ -382,8 +444,6 @@ Peer.prototype.connect = function connect(addr) { this.bind(socket); - this.logger.debug('Connecting to %s.', this.hostname); - return socket; }; @@ -421,6 +481,8 @@ Peer.prototype.open = co(function* open() { */ Peer.prototype._open = co(function* open() { + this.opened = true; + // Connect to peer. yield this.initConnect(); yield this.initStall(); @@ -439,6 +501,7 @@ Peer.prototype._open = co(function* open() { /** * Wait for connection. * @private + * @returns {Promise} */ Peer.prototype.initConnect = function initConnect() { @@ -485,6 +548,7 @@ Peer.prototype.initConnect = function initConnect() { /** * Setup stall timer. * @private + * @returns {Promise} */ Peer.prototype.initStall = function initStall() { @@ -502,6 +566,7 @@ Peer.prototype.initStall = function initStall() { * Handle `connect` event (called immediately * if a socket was passed into peer). * @private + * @returns {Promise} */ Peer.prototype.initBIP151 = co(function* initBIP151() { @@ -513,7 +578,7 @@ Peer.prototype.initBIP151 = co(function* initBIP151() { assert(!this.bip151.completed); - this.logger.info('Attempting BIP151 handshake (%s).', this.hostname); + this.logger.info('Attempting BIP151 handshake (%s).', this.hostname()); this.send(this.bip151.toEncinit()); @@ -529,14 +594,15 @@ Peer.prototype.initBIP151 = co(function* initBIP151() { assert(this.bip151.completed); if (this.bip151.handshake) { - this.logger.info('BIP151 handshake complete (%s).', this.hostname); - this.logger.info('Connection is encrypted (%s).', this.hostname); + this.logger.info('BIP151 handshake complete (%s).', this.hostname()); + this.logger.info('Connection is encrypted (%s).', this.hostname()); } }); /** * Handle post bip151-handshake. * @private + * @returns {Promise} */ Peer.prototype.initBIP150 = co(function* initBIP150() { @@ -551,7 +617,7 @@ Peer.prototype.initBIP150 = co(function* initBIP150() { if (!this.bip151.handshake) throw new Error('BIP151 handshake was not completed for BIP150.'); - this.logger.info('Attempting BIP150 handshake (%s).', this.hostname); + this.logger.info('Attempting BIP150 handshake (%s).', this.hostname()); if (this.bip150.outbound) { if (!this.bip150.peerIdentity) @@ -565,15 +631,16 @@ Peer.prototype.initBIP150 = co(function* initBIP150() { assert(this.bip150.completed); if (this.bip150.auth) { - this.logger.info('BIP150 handshake complete (%s).', this.hostname); + this.logger.info('BIP150 handshake complete (%s).', this.hostname()); this.logger.info('Peer is authed (%s): %s.', - this.hostname, this.bip150.getAddress()); + this.hostname(), this.bip150.getAddress()); } }); /** * Handle post handshake. * @private + * @returns {Promise} */ Peer.prototype.initVersion = co(function* initVersion() { @@ -591,21 +658,25 @@ Peer.prototype.initVersion = co(function* initVersion() { if (this.version === -1) { this.logger.debug( 'Peer sent a verack without a version (%s).', - this.hostname); + this.hostname()); yield this.wait(packetTypes.VERSION, 10000); assert(this.version !== -1); } + if (this.destroyed) + throw new Error('Peer was destroyed during handshake.'); + this.handshake = true; - this.logger.debug('Version handshake complete (%s).', this.hostname); + this.logger.debug('Version handshake complete (%s).', this.hostname()); }); /** * Finalize peer after handshake. * @private + * @returns {Promise} */ Peer.prototype.finalize = co(function* finalize() { @@ -624,15 +695,6 @@ Peer.prototype.finalize = co(function* finalize() { }, Peer.INV_INTERVAL); }); -/** - * Test whether the peer is the loader peer. - * @returns {Boolean} - */ - -Peer.prototype.isLoader = function isLoader() { - return this === this.pool.peers.load; -}; - /** * Broadcast blocks to peer. * @param {Block[]} blocks @@ -693,7 +755,7 @@ Peer.prototype.announceBlock = function announceBlock(blocks) { Peer.prototype.announceTX = function announceTX(txs) { var inv = []; - var i, tx, hash, entry; + var i, tx, hash, rate; if (!this.handshake) return; @@ -726,10 +788,10 @@ Peer.prototype.announceTX = function announceTX(txs) { } // Check the fee filter. - if (this.feeRate !== -1 && this.mempool) { + if (this.feeRate !== -1) { hash = tx.hash('hex'); - entry = this.mempool.getEntry(hash); - if (entry && entry.getRate() < this.feeRate) + rate = this.options.getRate(hash); + if (rate !== -1 && rate < this.feeRate) continue; } @@ -787,7 +849,7 @@ Peer.prototype.flushInv = function flushInv() { this.invQueue.length = 0; this.logger.spam('Serving %d inv items to %s.', - queue.length, this.hostname); + queue.length, this.hostname()); for (i = 0; i < queue.length; i++) { item = queue[i]; @@ -829,6 +891,9 @@ Peer.prototype.sendInv = function sendInv(items) { if (items.length === 0) return; + this.logger.spam('Serving %d inv items to %s.', + items.length, this.hostname()); + for (i = 0; i < items.length; i += 1000) { chunk = items.slice(i, i + 1000); this.send(new packets.InvPacket(chunk)); @@ -861,7 +926,7 @@ Peer.prototype.sendHeaders = function sendHeaders(items) { return; this.logger.spam('Serving %d headers to %s.', - items.length, this.hostname); + items.length, this.hostname()); for (i = 0; i < items.length; i += 2000) { chunk = items.slice(i, i + 2000); @@ -903,10 +968,10 @@ Peer.prototype.sendVersion = function sendVersion() { packet.services = this.options.services; packet.ts = this.network.now(); packet.recv = this.address; - packet.from = this.pool.address; - packet.nonce = this.pool.nonce; + packet.from = this.options.address; + packet.nonce = this.options.createNonce(this.hostname()); packet.agent = this.options.agent; - packet.height = this.chain.height; + packet.height = this.options.getHeight(); packet.noRelay = this.options.noRelay; this.send(packet); }; @@ -937,7 +1002,7 @@ Peer.prototype.sendPing = function sendPing() { } if (this.challenge) { - this.logger.debug('Peer has not responded to ping (%s).', this.hostname); + this.logger.debug('Peer has not responded to ping (%s).', this.hostname()); return; } @@ -1071,7 +1136,7 @@ Peer.prototype.needsDrain = function needsDrain(size) { this.logger.warning( 'Peer is not reading: %dmb buffered (%s).', util.mb(this.drainSize), - this.hostname); + this.hostname()); this.error('Peer stalled (drain).'); this.destroy(); } @@ -1080,6 +1145,7 @@ Peer.prototype.needsDrain = function needsDrain(size) { /** * Potentially timeout peer if it hasn't read. * @private + * @returns {Boolean} */ Peer.prototype.maybeStall = function maybeStall() { @@ -1113,7 +1179,7 @@ Peer.prototype.addTimeout = function addTimeout(packet) { this.request(packetTypes.INV, timeout); break; case packetTypes.GETBLOCKS: - if (!this.chain.synced) + if (!this.options.isFull()) this.request(packetTypes.INV, timeout); break; case packetTypes.GETHEADERS: @@ -1181,15 +1247,12 @@ Peer.prototype.maybeTimeout = function maybeTimeout() { } } - if (!this.syncing || this.chain.synced) - return; - - if (!this.isLoader()) - return; - - if (now > this.lastBlock + Peer.BLOCK_TIMEOUT) { - this.error('Peer is stalling (block).'); - this.destroy(); + if (this.syncing && this.loader && !this.options.isFull()) { + if (now > this.lastBlock + Peer.BLOCK_TIMEOUT) { + this.error('Peer is stalling (block).'); + this.destroy(); + return; + } } }; @@ -1248,8 +1311,10 @@ Peer.prototype.wait = function wait(type, timeout) { return new Promise(function(resolve, reject) { var entry; - if (self.destroyed) - return reject(new Error('Request destroyed.')); + if (self.destroyed) { + reject(new Error('Peer is destroyed (request).')); + return; + } entry = self.request(type); @@ -1294,7 +1359,7 @@ Peer.prototype.send = function send(packet) { */ Peer.prototype.sendRaw = function sendRaw(cmd, body, checksum) { - var payload = this.framer.packet(cmd, body, checksum); + var payload = this.framePacket(cmd, body, checksum); this.write(payload); }; @@ -1322,7 +1387,7 @@ Peer.prototype.error = function error(err) { err.message = 'Socket Error: ' + msg; } - err.message += ' (' + this.hostname + ')'; + err.message += ' (' + this.hostname() + ')'; this.emit('error', err); }; @@ -1478,45 +1543,9 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { case packetTypes.PONG: yield this.handlePong(packet); break; - case packetTypes.GETADDR: - yield this.handleGetAddr(packet); - break; - case packetTypes.ADDR: - yield this.handleAddr(packet); - break; - case packetTypes.INV: - yield this.handleInv(packet); - break; - case packetTypes.GETDATA: - yield this.handleGetData(packet); - break; - case packetTypes.NOTFOUND: - yield this.handleNotFound(packet); - break; - case packetTypes.GETBLOCKS: - yield this.handleGetBlocks(packet); - break; - case packetTypes.GETHEADERS: - yield this.handleGetHeaders(packet); - break; - case packetTypes.HEADERS: - yield this.handleHeaders(packet); - break; case packetTypes.SENDHEADERS: yield this.handleSendHeaders(packet); break; - case packetTypes.BLOCK: - yield this.handleBlock(packet); - break; - case packetTypes.TX: - yield this.handleTX(packet); - break; - case packetTypes.REJECT: - yield this.handleReject(packet); - break; - case packetTypes.MEMPOOL: - yield this.handleMempool(packet); - break; case packetTypes.FILTERLOAD: yield this.handleFilterLoad(packet); break; @@ -1526,24 +1555,12 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { case packetTypes.FILTERCLEAR: yield this.handleFilterClear(packet); break; - case packetTypes.MERKLEBLOCK: - yield this.handleMerkleBlock(packet); - break; case packetTypes.FEEFILTER: yield this.handleFeeFilter(packet); break; case packetTypes.SENDCMPCT: yield this.handleSendCmpct(packet); break; - case packetTypes.CMPCTBLOCK: - yield this.handleCmpctBlock(packet); - break; - case packetTypes.GETBLOCKTXN: - yield this.handleGetBlockTxn(packet); - break; - case packetTypes.BLOCKTXN: - yield this.handleBlockTxn(packet); - break; case packetTypes.ENCINIT: yield this.handleEncinit(packet); break; @@ -1559,17 +1576,16 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { case packetTypes.AUTHPROPOSE: yield this.handleAuthPropose(packet); break; - case packetTypes.UNKNOWN: - yield this.handleUnknown(packet); - break; - default: - assert(false, 'Bad packet type.'); - break; } + if (this.onPacket) + yield this.onPacket(packet); + if (entry) entry.resolve(packet); + this.emit('packet', packet); + if (packet.type === packetTypes.UNKNOWN) { this.emit('unknown', packet); return; @@ -1578,110 +1594,6 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { this.emit(packet.cmd, packet); }); -/** - * Handle `filterload` packet. - * @private - * @param {FilterLoadPacket} packet - */ - -Peer.prototype.handleFilterLoad = co(function* handleFilterLoad(packet) { - if (!packet.isWithinConstraints()) { - this.increaseBan(100); - return; - } - - this.spvFilter = packet.filter; - this.noRelay = false; - - yield this.pool.handleFilterLoad(this, packet); -}); - -/** - * Handle `filteradd` packet. - * @private - * @param {FilterAddPacket} packet - */ - -Peer.prototype.handleFilterAdd = co(function* handleFilterAdd(packet) { - var data = packet.data; - - if (data.length > consensus.MAX_SCRIPT_PUSH) { - this.increaseBan(100); - return; - } - - if (this.spvFilter) - this.spvFilter.add(data); - - this.noRelay = false; - - yield this.pool.handleFilterAdd(this, packet); -}); - -/** - * Handle `filterclear` packet. - * @private - * @param {FilterClearPacket} packet - */ - -Peer.prototype.handleFilterClear = co(function* handleFilterClear(packet) { - if (this.spvFilter) - this.spvFilter.reset(); - - this.noRelay = false; - - yield this.pool.handleFilterClear(this, packet); -}); - -/** - * Handle `merkleblock` packet. - * @private - * @param {MerkleBlockPacket} packet - */ - -Peer.prototype.handleMerkleBlock = co(function* handleMerkleBlock(packet) { - yield this.pool.handleMerkleBlock(this, packet); -}); - -/** - * Handle `feefilter` packet. - * @private - * @param {FeeFilterPacket} packet - */ - -Peer.prototype.handleFeeFilter = co(function* handleFeeFilter(packet) { - var rate = packet.rate; - - if (!(rate >= 0 && rate <= consensus.MAX_MONEY)) { - this.increaseBan(100); - return; - } - - this.feeRate = rate; - - yield this.pool.handleFeeFilter(this, packet); -}); - -/** - * Handle `getheaders` packet. - * @private - * @param {GetHeadersPacket} packet - */ - -Peer.prototype.handleGetHeaders = co(function* handleGetHeaders(packet) { - yield this.pool.handleGetHeaders(this, packet); -}); - -/** - * Handle `getblocks` packet. - * @private - * @param {GetBlocksPacket} packet - */ - -Peer.prototype.handleGetBlocks = co(function* handleGetBlocks(packet) { - yield this.pool.handleGetBlocks(this, packet); -}); - /** * Handle `version` packet. * @private @@ -1699,7 +1611,7 @@ Peer.prototype.handleVersion = co(function* handleVersion(packet) { this.noRelay = packet.noRelay; if (!this.network.selfConnect) { - if (util.equal(packet.nonce, this.pool.nonce)) + if (this.options.hasNonce(packet.nonce)) throw new Error('We connected to ourself. Oops.'); } @@ -1735,8 +1647,6 @@ Peer.prototype.handleVersion = co(function* handleVersion(packet) { } this.send(new packets.VerackPacket()); - - yield this.pool.handleVersion(this, packet); }); /** @@ -1747,54 +1657,12 @@ Peer.prototype.handleVersion = co(function* handleVersion(packet) { Peer.prototype.handleVerack = co(function* handleVerack(packet) { if (this.ack) { - this.logger.debug('Peer sent duplicate ack (%s).', this.hostname); + this.logger.debug('Peer sent duplicate ack (%s).', this.hostname()); return; } this.ack = true; - this.logger.debug('Received verack (%s).', this.hostname); - - yield this.pool.handleVerack(this, packet); -}); - -/** - * Handle `mempool` packet. - * @private - * @param {MempoolPacket} packet - */ - -Peer.prototype.handleMempool = co(function* handleMempool(packet) { - yield this.pool.handleMempool(this, packet); -}); - -/** - * Handle `getdata` packet. - * @private - * @param {GetDataPacket} packet - */ - -Peer.prototype.handleGetData = co(function* handleGetData(packet) { - yield this.pool.handleGetData(this, packet); -}); - -/** - * Handle `notfound` packet. - * @private - * @param {NotFoundPacket} packet - */ - -Peer.prototype.handleNotFound = co(function* handleNotFound(packet) { - yield this.pool.handleNotFound(this, packet); -}); - -/** - * Handle `addr` packet. - * @private - * @param {AddrPacket} packet - */ - -Peer.prototype.handleAddr = co(function* handleAddr(packet) { - yield this.pool.handleAddr(this, packet); + this.logger.debug('Received verack (%s).', this.hostname()); }); /** @@ -1804,9 +1672,10 @@ Peer.prototype.handleAddr = co(function* handleAddr(packet) { */ Peer.prototype.handlePing = co(function* handlePing(packet) { - if (packet.nonce) - this.send(new packets.PongPacket(packet.nonce)); - yield this.pool.handlePing(this, packet); + if (!packet.nonce) + return; + + this.send(new packets.PongPacket(packet.nonce)); }); /** @@ -1820,17 +1689,17 @@ Peer.prototype.handlePong = co(function* handlePong(packet) { var now = util.ms(); if (!this.challenge) { - this.logger.debug('Peer sent an unsolicited pong (%s).', this.hostname); + this.logger.debug('Peer sent an unsolicited pong (%s).', this.hostname()); return; } if (!util.equal(nonce, this.challenge)) { if (util.equal(nonce, encoding.ZERO_U64)) { - this.logger.debug('Peer sent a zero nonce (%s).', this.hostname); + this.logger.debug('Peer sent a zero nonce (%s).', this.hostname()); this.challenge = null; return; } - this.logger.debug('Peer sent the wrong nonce (%s).', this.hostname); + this.logger.debug('Peer sent the wrong nonce (%s).', this.hostname()); return; } @@ -1840,42 +1709,10 @@ Peer.prototype.handlePong = co(function* handlePong(packet) { this.minPing = now - this.lastPing; this.minPing = Math.min(this.minPing, now - this.lastPing); } else { - this.logger.debug('Timing mismatch (what?) (%s).', this.hostname); + this.logger.debug('Timing mismatch (what?) (%s).', this.hostname()); } this.challenge = null; - - yield this.pool.handlePong(this, packet); -}); - -/** - * Handle `getaddr` packet. - * @private - * @param {GetAddrPacket} packet - */ - -Peer.prototype.handleGetAddr = co(function* handleGetAddr(packet) { - yield this.pool.handleGetAddr(this, packet); -}); - -/** - * Handle `inv` packet. - * @private - * @param {InvPacket} packet - */ - -Peer.prototype.handleInv = co(function* handleInv(packet) { - yield this.pool.handleInv(this, packet); -}); - -/** - * Handle `headers` packet. - * @private - * @param {HeadersPacket} packet - */ - -Peer.prototype.handleHeaders = co(function* handleHeaders(packet) { - yield this.pool.handleHeaders(this, packet); }); /** @@ -1886,37 +1723,107 @@ Peer.prototype.handleHeaders = co(function* handleHeaders(packet) { Peer.prototype.handleSendHeaders = co(function* handleSendHeaders(packet) { this.preferHeaders = true; - yield this.pool.handleSendHeaders(this, packet); }); /** - * Handle `block` packet. + * Handle `filterload` packet. * @private - * @param {BlockPacket} packet + * @param {FilterLoadPacket} packet */ -Peer.prototype.handleBlock = co(function* handleBlock(packet) { - yield this.pool.handleBlock(this, packet); +Peer.prototype.handleFilterLoad = co(function* handleFilterLoad(packet) { + if (!packet.isWithinConstraints()) { + this.increaseBan(100); + return; + } + + this.spvFilter = packet.filter; + this.noRelay = false; }); /** - * Handle `tx` packet. + * Handle `filteradd` packet. * @private - * @param {TXPacket} packet + * @param {FilterAddPacket} packet */ -Peer.prototype.handleTX = co(function* handleTX(packet) { - yield this.pool.handleTX(this, packet); +Peer.prototype.handleFilterAdd = co(function* handleFilterAdd(packet) { + var data = packet.data; + + if (data.length > consensus.MAX_SCRIPT_PUSH) { + this.increaseBan(100); + return; + } + + if (this.spvFilter) + this.spvFilter.add(data); + + this.noRelay = false; }); /** - * Handle `reject` packet. + * Handle `filterclear` packet. * @private - * @param {RejectPacket} packet + * @param {FilterClearPacket} packet */ -Peer.prototype.handleReject = co(function* handleReject(packet) { - yield this.pool.handleReject(this, packet); +Peer.prototype.handleFilterClear = co(function* handleFilterClear(packet) { + if (this.spvFilter) + this.spvFilter.reset(); + + this.noRelay = false; +}); + +/** + * Handle `feefilter` packet. + * @private + * @param {FeeFilterPacket} packet + */ + +Peer.prototype.handleFeeFilter = co(function* handleFeeFilter(packet) { + var rate = packet.rate; + + if (!(rate >= 0 && rate <= consensus.MAX_MONEY)) { + this.increaseBan(100); + return; + } + + this.feeRate = rate; +}); + +/** + * Handle `sendcmpct` packet. + * @private + * @param {SendCmpctPacket} + */ + +Peer.prototype.handleSendCmpct = co(function* handleSendCmpct(packet) { + var max = this.options.witness ? 2 : 1; + + if (packet.version > max) { + // Ignore + this.logger.info('Peer request compact blocks version %d (%s).', + packet.version, this.hostname()); + return; + } + + if (packet.mode > 1) { + this.logger.info('Peer request compact blocks mode %d (%s).', + packet.mode, this.hostname()); + return; + } + + // Core witness nodes send this twice + // with both version 1 and 2 (why + // would you even _want_ non-witness + // blocks if you use segwit??). + if (this.compactMode !== -1) + return; + + this.logger.info('Peer initialized compact blocks (%s).', this.hostname()); + + this.compactMode = packet.mode; + this.compactWitness = packet.version === 2; }); /** @@ -1932,8 +1839,6 @@ Peer.prototype.handleEncinit = co(function* handleEncinit(packet) { this.bip151.encinit(packet.publicKey, packet.cipher); this.send(this.bip151.toEncack()); - - yield this.pool.handleEncinit(this, packet); }); /** @@ -1947,8 +1852,6 @@ Peer.prototype.handleEncack = co(function* handleEncack(packet) { return; this.bip151.encack(packet.publicKey); - - yield this.pool.handleEncack(this, packet); }); /** @@ -1966,8 +1869,6 @@ Peer.prototype.handleAuthChallenge = co(function* handleAuthChallenge(packet) { sig = this.bip150.challenge(packet.hash); this.send(new packets.AuthReplyPacket(sig)); - - yield this.pool.handleAuthChallenge(this, packet); }); /** @@ -1986,8 +1887,6 @@ Peer.prototype.handleAuthReply = co(function* handleAuthReply(packet) { if (hash) this.send(new packets.AuthProposePacket(hash)); - - yield this.pool.handleAuthReply(this, packet); }); /** @@ -2005,85 +1904,6 @@ Peer.prototype.handleAuthPropose = co(function* handleAuthPropose(packet) { hash = this.bip150.propose(packet.hash); this.send(new packets.AuthChallengePacket(hash)); - - yield this.pool.handleAuthPropose(this, packet); -}); - -/** - * Handle an unknown packet. - * @private - * @param {UnknownPacket} packet - */ - -Peer.prototype.handleUnknown = co(function* handleUnknown(packet) { - yield this.pool.handleUnknown(this, packet); -}); - -/** - * Handle `sendcmpct` packet. - * @private - * @param {SendCmpctPacket} - */ - -Peer.prototype.handleSendCmpct = co(function* handleSendCmpct(packet) { - var max = this.options.witness ? 2 : 1; - - if (packet.version > max) { - // Ignore - this.logger.info('Peer request compact blocks version %d (%s).', - packet.version, this.hostname); - return; - } - - if (packet.mode > 1) { - this.logger.info('Peer request compact blocks mode %d (%s).', - packet.mode, this.hostname); - return; - } - - // Core witness nodes send this twice - // with both version 1 and 2 (why - // would you even _want_ non-witness - // blocks if you use segwit??). - if (this.compactMode !== -1) - return; - - this.logger.info('Peer initialized compact blocks (%s).', this.hostname); - - this.compactMode = packet.mode; - this.compactWitness = packet.version === 2; - - yield this.pool.handleSendCmpct(this, packet); -}); - -/** - * Handle `cmpctblock` packet. - * @private - * @param {CmpctBlockPacket} - */ - -Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) { - yield this.pool.handleCmpctBlock(this, packet); -}); - -/** - * Handle `getblocktxn` packet. - * @private - * @param {GetBlockTxnPacket} - */ - -Peer.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(packet) { - yield this.pool.handleGetBlockTxn(this, packet); -}); - -/** - * Handle `blocktxn` packet. - * @private - * @param {BlockTxnPacket} - */ - -Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(packet) { - yield this.pool.handleBlockTxn(this, packet); }); /** @@ -2106,7 +1926,7 @@ Peer.prototype.sendGetHeaders = function sendGetHeaders(locator, stop) { this.logger.debug( 'Requesting headers packet from peer with getheaders (%s).', - this.hostname); + this.hostname()); this.logger.debug( 'Sending getheaders (hash=%s, stop=%s).', @@ -2123,25 +1943,22 @@ Peer.prototype.sendGetHeaders = function sendGetHeaders(locator, stop) { Peer.prototype.sendGetBlocks = function getBlocks(locator, stop) { var packet = new packets.GetBlocksPacket(locator, stop); - var height = -1; var hash = null; var end = null; - if (packet.locator.length > 0) { - height = this.chain.checkHeight(packet.locator[0]); + if (packet.locator.length > 0) hash = util.revHex(packet.locator[0]); - } if (stop) end = util.revHex(stop); this.logger.debug( 'Requesting inv packet from peer with getblocks (%s).', - this.hostname); + this.hostname()); this.logger.debug( - 'Sending getblocks (height=%d, hash=%s, stop=%s).', - height, hash, end); + 'Sending getblocks (hash=%s, stop=%s).', + hash, end); this.send(packet); }; @@ -2157,13 +1974,13 @@ Peer.prototype.sendMempool = function sendMempool() { if (!(this.services & services.BLOOM)) { this.logger.debug( 'Cannot request mempool for non-bloom peer (%s).', - this.hostname); + this.hostname()); return; } this.logger.debug( 'Requesting inv packet from peer with mempool (%s).', - this.hostname); + this.hostname()); this.send(new packets.MempoolPacket()); }; @@ -2180,15 +1997,15 @@ Peer.prototype.sendReject = function sendReject(code, reason, msg) { if (msg) { this.logger.debug('Rejecting %s %s (%s): ccode=%s reason=%s.', - reject.message, msg.rhash(), this.hostname, code, reason); + reject.message, msg.rhash(), this.hostname(), code, reason); } else { this.logger.debug('Rejecting packet from %s: ccode=%s reason=%s.', - this.hostname, code, reason); + this.hostname(), code, reason); } this.logger.debug( 'Sending reject packet to peer (%s).', - this.hostname); + this.hostname()); this.send(reject); }; @@ -2200,20 +2017,21 @@ Peer.prototype.sendReject = function sendReject(code, reason, msg) { Peer.prototype.sendCompact = function sendCompact(mode) { var version = this.options.witness ? 2 : 1; - this.logger.info('Initializing compact blocks (%s).', this.hostname); + this.logger.info('Initializing compact blocks (%s).', this.hostname()); this.send(new packets.SendCmpctPacket(mode, version)); }; /** * Increase banscore on peer. * @param {Number} score + * @returns {Boolean} */ Peer.prototype.increaseBan = function increaseBan(score) { this.banScore += score; if (this.banScore >= this.options.banScore) { - this.logger.debug('Ban threshold exceeded (%s).', this.hostname); + this.logger.debug('Ban threshold exceeded (%s).', this.hostname()); this.ban(); return true; } @@ -2236,11 +2054,12 @@ Peer.prototype.ban = function ban() { * @param {String} code * @param {String} reason * @param {Number} score + * @returns {Boolean} */ Peer.prototype.reject = function reject(msg, code, reason, score) { this.sendReject(code, reason, msg); - this.increaseBan(score); + return this.increaseBan(score); }; /** @@ -2288,12 +2107,218 @@ Peer.prototype.hasCompact = function hasCompact() { Peer.prototype.inspect = function inspect() { return ''; }; +/** + * PeerOptions + * @constructor + */ + +function PeerOptions(options) { + if (!(this instanceof PeerOptions)) + return new PeerOptions(options); + + this.network = Network.primary; + this.logger = Logger.global; + + this.proxyServer = null; + this.createSocket = tcp.createSocket; + this.version = common.PROTOCOL_VERSION; + this.services = common.LOCAL_SERVICES; + this.address = new NetAddress(); + this.agent = common.USER_AGENT; + this.noRelay = false; + this.spv = false; + this.compact = false; + this.witness = this.network.witness; + this.headers = false; + this.banScore = common.BAN_SCORE; + + this.getHeight = PeerOptions.getHeight; + this.isFull = PeerOptions.isFull; + this.createNonce = PeerOptions.createNonce; + this.hasNonce = PeerOptions.hasNonce; + this.getRate = PeerOptions.getRate; + + if (options) + this.fromOptions(options); +} + +/** + * Inject properties from object. + * @private + * @param {Object} options + * @returns {PeerOptions} + */ + +PeerOptions.prototype.fromOptions = function fromOptions(options) { + assert(options, 'Options are required.'); + + if (options.network != null) { + this.network = Network.get(options.network); + this.witness = this.network.witness; + } + + if (options.logger != null) { + assert(typeof options.logger === 'object'); + this.logger = options.logger; + } + + if (options.proxyServer != null) { + assert(typeof options.proxyServer === 'string'); + this.proxyServer = options.proxyServer; + } + + if (options.createSocket != null) { + assert(typeof options.createSocket === 'function'); + this.createSocket = options.createSocket; + } + + if (options.version != null) { + assert(typeof options.version === 'number'); + this.version = options.version; + } + + if (options.services != null) { + assert(typeof options.services === 'number'); + this.services = options.services; + } + + if (options.address != null) { + assert(typeof options.address === 'object'); + this.address = options.address; + } + + if (options.agent != null) { + assert(typeof options.agent === 'string'); + this.agent = options.agent; + } + + if (options.noRelay != null) { + assert(typeof options.noRelay === 'boolean'); + this.noRelay = options.noRelay; + } + + if (options.spv != null) { + assert(typeof options.spv === 'boolean'); + this.spv = options.spv; + } + + if (options.compact != null) { + assert(typeof options.compact === 'boolean'); + this.compact = options.compact; + } + + if (options.witness != null) { + assert(typeof options.witness === 'boolean'); + this.witness = options.witness; + } + + if (options.headers != null) { + assert(typeof options.headers === 'boolean'); + this.headers = options.headers; + } + + if (options.banScore != null) { + assert(typeof options.banScore === 'number'); + this.banScore = options.banScore; + } + + if (options.getHeight != null) { + assert(typeof options.getHeight === 'function'); + this.getHeight = options.getHeight; + } + + if (options.isFull != null) { + assert(typeof options.isFull === 'function'); + this.isFull = options.isFull; + } + + if (options.createNonce != null) { + assert(typeof options.createNonce === 'function'); + this.createNonce = options.createNonce; + } + + if (options.hasNonce != null) { + assert(typeof options.hasNonce === 'function'); + this.hasNonce = options.hasNonce; + } + + if (options.getRate != null) { + assert(typeof options.getRate === 'function'); + this.getRate = options.getRate; + } + + return this; +}; + +/** + * Instantiate options from object. + * @param {Object} options + * @returns {PeerOptions} + */ + +PeerOptions.fromOptions = function fromOptions(options) { + return new PeerOptions().fromOptions(options); +}; + +/** + * Get the chain height. + * @private + * @returns {Number} + */ + +PeerOptions.getHeight = function getHeight() { + return 0; +}; + +/** + * Test whether the chain is synced. + * @private + * @returns {Boolean} + */ + +PeerOptions.isFull = function isFull() { + return false; +}; + +/** + * Create a version packet nonce. + * @private + * @param {String} hostname + * @returns {Buffer} + */ + +PeerOptions.createNonce = function createNonce() { + return util.nonce(); +}; + +/** + * Test whether version nonce is ours. + * @private + * @param {Buffer} nonce + * @returns {Boolean} + */ + +PeerOptions.hasNonce = function hasNonce() { + return false; +}; + +/** + * Get fee rate for txid. + * @private + * @param {Hash} hash + * @returns {Rate} + */ + +PeerOptions.getRate = function getRate(hash) { + return -1; +}; + /** * RequestEntry * @constructor diff --git a/lib/net/pool.js b/lib/net/pool.js index 92995cec..80ccd98e 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -17,6 +17,7 @@ var common = require('./common'); var NetAddress = require('../primitives/netaddress'); var Address = require('../primitives/address'); var BIP150 = require('./bip150'); +var BIP151 = require('./bip151'); var BIP152 = require('./bip152'); var Bloom = require('../utils/bloom'); var ec = require('../crypto/ec'); @@ -31,7 +32,9 @@ var HostList = require('./hostlist'); var InvItem = require('../primitives/invitem'); var Map = require('../utils/map'); var packets = require('./packets'); +var services = common.services; var invTypes = InvItem.types; +var packetTypes = packets.types; /** * A pool of peers for handling all network activity. @@ -97,8 +100,10 @@ function Pool(options) { this.chain = this.options.chain; this.mempool = this.options.mempool; this.server = this.options.createServer(); - this.locker = new Lock(true); + this.nonces = this.options.nonces; + this.address = this.options.address; + this.locker = new Lock(true); this.connected = false; this.syncing = false; this.nonce = util.nonce(); @@ -115,14 +120,10 @@ function Pool(options) { this.headerChain = new List(); this.headerNext = null; this.headerTip = null; - this.checkpoints = []; this.peers = new PeerList(); this.authdb = new BIP150.AuthDB(this.options); - this.address = new NetAddress(this.options); - this.address.ts = this.network.now(); this.hosts = new HostList(this.options); - this.hosts.address = this.address; if (this.options.spv) this.spvFilter = Bloom.fromRate(20000, 0.001, Bloom.flags.ALL); @@ -142,18 +143,6 @@ util.inherits(Pool, AsyncObject); Pool.prototype._init = function _init() { var self = this; - var keys = Object.keys(this.network.checkpoints); - var i, key, hash, height; - - for (i = 0; i < keys.length; i++) { - key = keys[i]; - hash = this.network.checkpoints[key]; - height = +key; - - this.checkpoints.push(new BlockNode(hash, height)); - } - - this.checkpoints.sort(sortNodes); this.server.on('error', function(err) { self.emit('error', err); @@ -172,10 +161,6 @@ Pool.prototype._init = function _init() { self.emit('listening', data); }); - this.chain.on('block', function(block, entry) { - self.emit('block', block, entry); - }); - this.chain.on('reset', function() { self.resetChain(); self.forceSync(); @@ -473,8 +458,13 @@ Pool.prototype.addLoader = function addLoader() { for (peer = this.peers.head(); peer; peer = peer.next) { if (!peer.outbound) continue; - this.logger.info('Repurposing peer for loader (%s).', peer.hostname); + + this.logger.info( + 'Repurposing peer for loader (%s).', + peer.hostname()); + this.setLoader(peer); + return; } @@ -483,9 +473,9 @@ Pool.prototype.addLoader = function addLoader() { if (!addr) return; - peer = this.createPeer(addr); + peer = this.createOutbound(addr); - this.logger.info('Adding loader peer (%s).', peer.hostname); + this.logger.info('Adding loader peer (%s).', peer.hostname()); this.peers.add(peer); @@ -503,6 +493,10 @@ Pool.prototype.setLoader = function setLoader(peer) { return; assert(peer.outbound); + assert(!this.peers.load); + assert(!peer.loader); + + peer.loader = true; this.peers.load = peer; this.sendSync(peer); @@ -605,14 +599,14 @@ Pool.prototype.sendSync = co(function* sendSync(peer) { if (this.options.witness && !peer.hasWitness()) return false; - if (!peer.isLoader()) { + if (!peer.loader) { if (!this.chain.synced) return false; } // Ask for the mempool if we're synced. if (this.network.requestMempool) { - if (peer.isLoader() && this.chain.synced) + if (peer.loader && this.chain.synced) peer.sendMempool(); } @@ -620,7 +614,7 @@ Pool.prototype.sendSync = co(function* sendSync(peer) { peer.lastBlock = util.ms(); - if (this.pool.headersFirst) { + if (this.headersFirst) { tip = this.chain.tip; watermark = this.headerTip; peer.sendGetHeaders([tip.hash], watermark.hash); @@ -703,8 +697,8 @@ Pool.prototype.getNextTip = function getNextTip(height) { var i, next; if (this.options.useCheckpoints) { - for (i = 0; i < this.checkpoints.length; i++) { - next = this.checkpoints[i]; + for (i = 0; i < this.network.checkpoints.length; i++) { + next = this.network.checkpoints[i]; if (next.height > height) return next; } @@ -770,7 +764,7 @@ Pool.prototype.getBroadcasted = function getBroadcasted(peer, item) { if (type !== entry.type) { this.logger.debug( 'Peer requested item with the wrong type (%s).', - peer.hostname); + peer.hostname()); return; } @@ -779,7 +773,7 @@ Pool.prototype.getBroadcasted = function getBroadcasted(peer, item) { item.isTX() ? 'tx' : 'block', item.rhash(), item.hasWitness() ? 'witness' : 'normal', - peer.hostname); + peer.hostname()); entry.ack(peer); @@ -871,13 +865,23 @@ Pool.prototype.sendBlock = co(function* sendBlock(peer, item, witness) { * @returns {Peer} */ -Pool.prototype.createPeer = function createPeer(addr) { - var peer = new Peer(this); +Pool.prototype.createOutbound = function createOutbound(addr) { + var cipher = BIP151.ciphers.CHACHAPOLY; + var identity = this.options.identityKey; + var peer = Peer.fromOutbound(this.options, addr); - this.bindPeer(peer); this.hosts.markAttempt(addr.hostname); - peer.connect(addr); + if (this.options.bip151) + peer.setCipher(cipher); + + if (this.options.bip150) + peer.setAuth(this.authdb, identity); + + this.bindPeer(peer); + + this.logger.debug('Connecting to %s.', peer.hostname()); + peer.tryOpen(); return peer; @@ -890,12 +894,19 @@ Pool.prototype.createPeer = function createPeer(addr) { * @returns {Peer} */ -Pool.prototype.acceptPeer = function acceptPeer(socket) { - var peer = new Peer(this); +Pool.prototype.createInbound = function createInbound(socket) { + var cipher = BIP151.ciphers.CHACHAPOLY; + var identity = this.options.identityKey; + var peer = Peer.fromInbound(this.options, socket); + + if (this.options.bip151) + peer.setCipher(cipher); + + if (this.options.bip150) + peer.setAuth(this.authdb, identity); this.bindPeer(peer); - peer.accept(socket); peer.tryOpen(); return peer; @@ -904,11 +915,20 @@ Pool.prototype.acceptPeer = function acceptPeer(socket) { /** * Bind to peer events. * @private + * @param {Peer} peer */ Pool.prototype.bindPeer = function bindPeer(peer) { var self = this; + peer.onPacket = function onPacket(packet) { + return self.handlePacket(peer, packet); + }; + + peer.on('error', function(err) { + self.emit('error', err, peer); + }); + peer.once('connect', function() { self.handleConnect(peer); }); @@ -921,15 +941,132 @@ Pool.prototype.bindPeer = function bindPeer(peer) { self.handleClose(peer, connected); }); - peer.on('ban', function() { - self.ban(peer.address); - }); - - peer.on('error', function(err) { - self.emit('error', err, peer); + peer.once('ban', function() { + self.handleBan(peer); }); }; +/** + * Handle peer packet event. + * @private + * @param {Peer} peer + * @param {Packet} packet + * @returns {Promise} + */ + +Pool.prototype.handlePacket = co(function* handlePacket(peer, packet) { + switch (packet.type) { + case packetTypes.VERSION: + yield this.handleVersion(peer, packet); + break; + case packetTypes.VERACK: + yield this.handleVerack(peer, packet); + break; + case packetTypes.PING: + yield this.handlePing(peer, packet); + break; + case packetTypes.PONG: + yield this.handlePong(peer, packet); + break; + case packetTypes.GETADDR: + yield this.handleGetAddr(peer, packet); + break; + case packetTypes.ADDR: + yield this.handleAddr(peer, packet); + break; + case packetTypes.INV: + yield this.handleInv(peer, packet); + break; + case packetTypes.GETDATA: + yield this.handleGetData(peer, packet); + break; + case packetTypes.NOTFOUND: + yield this.handleNotFound(peer, packet); + break; + case packetTypes.GETBLOCKS: + yield this.handleGetBlocks(peer, packet); + break; + case packetTypes.GETHEADERS: + yield this.handleGetHeaders(peer, packet); + break; + case packetTypes.HEADERS: + yield this.handleHeaders(peer, packet); + break; + case packetTypes.SENDHEADERS: + yield this.handleSendHeaders(peer, packet); + break; + case packetTypes.BLOCK: + yield this.handleBlock(peer, packet); + break; + case packetTypes.TX: + yield this.handleTX(peer, packet); + break; + case packetTypes.REJECT: + yield this.handleReject(peer, packet); + break; + case packetTypes.MEMPOOL: + yield this.handleMempool(peer, packet); + break; + case packetTypes.FILTERLOAD: + yield this.handleFilterLoad(peer, packet); + break; + case packetTypes.FILTERADD: + yield this.handleFilterAdd(peer, packet); + break; + case packetTypes.FILTERCLEAR: + yield this.handleFilterClear(peer, packet); + break; + case packetTypes.MERKLEBLOCK: + yield this.handleMerkleBlock(peer, packet); + break; + case packetTypes.FEEFILTER: + yield this.handleFeeFilter(peer, packet); + break; + case packetTypes.SENDCMPCT: + yield this.handleSendCmpct(peer, packet); + break; + case packetTypes.CMPCTBLOCK: + yield this.handleCmpctBlock(peer, packet); + break; + case packetTypes.GETBLOCKTXN: + yield this.handleGetBlockTxn(peer, packet); + break; + case packetTypes.BLOCKTXN: + yield this.handleBlockTxn(peer, packet); + break; + case packetTypes.ENCINIT: + yield this.handleEncinit(peer, packet); + break; + case packetTypes.ENCACK: + yield this.handleEncack(peer, packet); + break; + case packetTypes.AUTHCHALLENGE: + yield this.handleAuthChallenge(peer, packet); + break; + case packetTypes.AUTHREPLY: + yield this.handleAuthReply(peer, packet); + break; + case packetTypes.AUTHPROPOSE: + yield this.handleAuthPropose(peer, packet); + break; + case packetTypes.UNKNOWN: + yield this.handleUnknown(peer, packet); + break; + default: + assert(false, 'Bad packet type.'); + break; + } + + this.emit('packet', packet, peer); + + if (packet.type === packetTypes.UNKNOWN) { + this.emit('unknown', packet, peer); + return; + } + + this.emit(packet.cmd, packet, peer); +}); + /** * Handle peer connect event. * @private @@ -937,10 +1074,12 @@ Pool.prototype.bindPeer = function bindPeer(peer) { */ Pool.prototype.handleConnect = function handleConnect(peer) { - if (!peer.outbound) - return; + this.logger.info('Connected to %s.', peer.hostname()); - this.hosts.markSuccess(peer.hostname); + if (peer.outbound) + this.hosts.markSuccess(peer.hostname()); + + this.emit('peer connect', peer); }; /** @@ -983,12 +1122,23 @@ Pool.prototype.handleOpen = function handleOpen(peer) { this.sendSync(peer); if (peer.outbound) { - this.hosts.markAck(peer.hostname, peer.services); + this.hosts.markAck(peer.hostname(), peer.services); - // If we don't have an ack'd loader yet, use this peer. - if (!this.peers.load || !this.peers.load.handshake) + // If we don't have an ack'd + // loader yet consider it dead. + if (!this.peers.load.handshake) { + assert(this.peers.load.loader); + this.peers.load.loader = false; + this.peers.load = null; + } + + // If we do not have a loader, + // use this peer. + if (!this.peers.load) this.setLoader(peer); } + + this.emit('peer open', peer); }; /** @@ -1000,15 +1150,17 @@ Pool.prototype.handleOpen = function handleOpen(peer) { Pool.prototype.handleClose = co(function* handleClose(peer, connected) { var outbound = peer.outbound; - var loader = peer.isLoader(); + var loader = peer.loader; this.removePeer(peer); if (loader) { - this.logger.info('Removed loader peer (%s).', peer.hostname); + this.logger.info('Removed loader peer (%s).', peer.hostname()); this.resetChain(); } + this.emit('peer close', peer, connected); + if (!this.loaded) return; @@ -1018,6 +1170,17 @@ Pool.prototype.handleClose = co(function* handleClose(peer, connected) { this.refill(); }); +/** + * Handle ban event. + * @private + * @param {Peer} peer + */ + +Pool.prototype.handleBan = function handleBan(peer) { + this.ban(peer.address); + this.emit('ban', peer); +}; + /** * Handle peer version event. * @private @@ -1028,15 +1191,13 @@ Pool.prototype.handleClose = co(function* handleClose(peer, connected) { Pool.prototype.handleVersion = co(function* handleVersion(peer, packet) { this.logger.info( 'Received version (%s): version=%d height=%d services=%s agent=%s', - peer.hostname, + peer.hostname(), packet.version, packet.height, packet.services.toString(2), packet.agent); - this.network.time.add(peer.hostname, packet.ts); - - this.emit('version', packet, peer); + this.network.time.add(peer.hostname(), packet.ts); }); /** @@ -1047,7 +1208,7 @@ Pool.prototype.handleVersion = co(function* handleVersion(peer, packet) { */ Pool.prototype.handleVerack = co(function* handleVerack(peer, packet) { - this.emit('verack', packet, peer); + ; }); /** @@ -1058,7 +1219,7 @@ Pool.prototype.handleVerack = co(function* handleVerack(peer, packet) { */ Pool.prototype.handlePing = co(function* handlePing(peer, packet) { - this.emit('ping', packet, peer); + ; }); /** @@ -1069,7 +1230,7 @@ Pool.prototype.handlePing = co(function* handlePing(peer, packet) { */ Pool.prototype.handlePong = co(function* handlePong(peer, packet) { - this.emit('pong', packet, peer); + ; }); /** @@ -1087,7 +1248,9 @@ Pool.prototype.handleGetAddr = co(function* handleGetAddr(peer, packet) { return; if (peer.sentAddr) { - this.logger.debug('Ignoring repeated getaddr (%s).', peer.hostname); + this.logger.debug( + 'Ignoring repeated getaddr (%s).', + peer.hostname()); return; } @@ -1113,11 +1276,9 @@ Pool.prototype.handleGetAddr = co(function* handleGetAddr(peer, packet) { this.logger.debug( 'Sending %d addrs to peer (%s)', items.length, - peer.hostname); + peer.hostname()); peer.send(new packets.AddrPacket(items)); - - this.emit('getaddr', packet, peer); }); /** @@ -1158,11 +1319,9 @@ Pool.prototype.handleAddr = co(function* handleAddr(peer, packet) { addrs.length, this.hosts.size(), this.peers.size(), - peer.hostname); + peer.hostname()); this.fillOutbound(); - - this.emit('addr', packet, peer); }); /** @@ -1218,12 +1377,12 @@ Pool.prototype._handleInv = co(function* handleInv(peer, packet) { this.logger.debug( 'Received inv packet with %d items: blocks=%d txs=%d (%s).', - items.length, blocks.length, txs.length, peer.hostname); + items.length, blocks.length, txs.length, peer.hostname()); if (unknown !== -1) { this.logger.warning( 'Peer sent an unknown inv type: %d (%s).', - unknown, peer.hostname); + unknown, peer.hostname()); } if (blocks.length > 0) @@ -1231,8 +1390,6 @@ Pool.prototype._handleInv = co(function* handleInv(peer, packet) { if (txs.length > 0) yield this.handleTXInv(peer, txs); - - this.emit('inv', packet, peer); }); /** @@ -1251,7 +1408,7 @@ Pool.prototype.handleBlockInv = co(function* handleBlockInv(peer, hashes) { return; // Ignore for now if we're still syncing - if (!this.chain.synced && !peer.isLoader()) + if (!this.chain.synced && !peer.loader) return; if (this.options.witness && !peer.hasWitness()) @@ -1264,14 +1421,14 @@ Pool.prototype.handleBlockInv = co(function* handleBlockInv(peer, hashes) { this.logger.debug( 'Received %s block hashes from peer (%s).', hashes.length, - peer.hostname); + peer.hostname()); for (i = 0; i < hashes.length; i++) { hash = hashes[i]; // Resolve orphan chain. if (this.chain.hasOrphan(hash)) { - this.logger.debug('Received known orphan hash (%s).', peer.hostname); + this.logger.debug('Received known orphan hash (%s).', peer.hostname()); yield this.resolveOrphan(peer, hash); continue; } @@ -1290,7 +1447,7 @@ Pool.prototype.handleBlockInv = co(function* handleBlockInv(peer, hashes) { // from the last hash (this will reset // the hashContinue on the remote node). if (i === hashes.length - 1) { - this.logger.debug('Received existing hash (%s).', peer.hostname); + this.logger.debug('Received existing hash (%s).', peer.hostname()); yield this.getBlocks(peer, hash); } } @@ -1340,7 +1497,7 @@ Pool.prototype.handleGetData = co(function* handleGetData(peer, packet) { var i, j, item, tx, block, result, height; if (items.length > 50000) { - this.logger.warning('Peer sent a inv >50k items (%s).', peer.hostname); + this.logger.warning('Peer sent inv with >50k items (%s).', peer.hostname()); peer.increaseBan(100); peer.destroy(); return; @@ -1455,22 +1612,20 @@ Pool.prototype.handleGetData = co(function* handleGetData(peer, packet) { if (txs > 0) { this.logger.debug( 'Served %d txs with getdata (notfound=%d) (%s).', - txs, notFound.length, peer.hostname); + txs, notFound.length, peer.hostname()); } if (blocks > 0) { this.logger.debug( 'Served %d blocks with getdata (notfound=%d) (%s).', - blocks, notFound.length, peer.hostname); + blocks, notFound.length, peer.hostname()); } if (unknown !== -1) { this.logger.warning( 'Peer sent an unknown getdata type: %s (%d).', - unknown, peer.hostname); + unknown, peer.hostname()); } - - this.emit('getdata', packet, peer); }); /** @@ -1488,8 +1643,6 @@ Pool.prototype.handleNotFound = co(function* handleNotFound(peer, packet) { item = items[i]; this.fulfill(peer, item.hash); } - - this.emit('notfound', packet, peer); }); /** @@ -1535,8 +1688,6 @@ Pool.prototype.handleGetBlocks = co(function* handleGetBlocks(peer, packet) { } peer.sendInv(blocks); - - this.emit('getblocks', packet, peer); }); /** @@ -1586,8 +1737,6 @@ Pool.prototype.handleGetHeaders = co(function* handleGetHeaders(peer, packet) { } peer.sendHeaders(headers); - - this.emit('getheaders', packet, peer); }); /** @@ -1627,7 +1776,7 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, packet) { if (!this.syncing) return; - if (!peer.isLoader()) + if (!peer.loader) return; if (headers.length === 0) @@ -1641,7 +1790,7 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, packet) { if (this.headerChain.size > 40000) { this.logger.warning( 'Peer is sending too many headers (%s).', - peer.hostname); + peer.hostname()); peer.increaseBan(100); peer.destroy(); return; @@ -1656,13 +1805,17 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, packet) { height = last.height + 1; if (header.prevBlock !== last.hash) { - this.logger.warning('Peer sent a bad header chain (%s).', peer.hostname); + this.logger.warning( + 'Peer sent a bad header chain (%s).', + peer.hostname()); peer.destroy(); return; } if (!header.verify()) { - this.logger.warning('Peer sent an invalid header (%s).', peer.hostname); + this.logger.warning( + 'Peer sent an invalid header (%s).', + peer.hostname()); peer.increaseBan(100); peer.destroy(); return; @@ -1678,7 +1831,7 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, packet) { if (node.hash !== this.headerTip.hash) { this.logger.warning( 'Peer sent an invalid checkpoint (%s).', - peer.hostname); + peer.hostname()); peer.increaseBan(100); peer.destroy(); return; @@ -1693,9 +1846,7 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, packet) { this.logger.debug( 'Received %s headers from peer (%s).', headers.length, - peer.hostname); - - this.emit('headers', packet, peer); + peer.hostname()); // Request the hashes we just added. if (isWatermark) { @@ -1717,7 +1868,7 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, packet) { */ Pool.prototype.handleSendHeaders = co(function* handleSendHeaders(peer, packet) { - this.emit('sendheaders', packet, peer); + ; }); /** @@ -1732,7 +1883,7 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, packet) { if (this.options.spv) { this.logger.warning( 'Peer sent unsolicited block (%s).', - this.hostname); + peer.hostname()); return; } @@ -1776,7 +1927,7 @@ Pool.prototype._addBlock = co(function* addBlock(peer, block) { if (!this.fulfill(peer, hash)) { this.logger.warning( 'Received unrequested block: %s (%s).', - block.rhash(), peer.hostname); + block.rhash(), peer.hostname()); peer.destroy(); return; } @@ -1926,15 +2077,13 @@ Pool.prototype._handleTX = co(function* handleTX(peer, packet) { if (!this.fulfill(peer, hash)) { this.logger.warning( 'Peer sent unrequested tx: %s (%s).', - tx.txid(), peer.hostname); + tx.txid(), peer.hostname()); peer.destroy(); return; } - if (!this.mempool) { - this.emit('tx', tx, peer); + if (!this.mempool) return; - } try { missing = yield this.mempool.addTX(tx); @@ -1950,12 +2099,10 @@ Pool.prototype._handleTX = co(function* handleTX(peer, packet) { if (missing) { this.logger.debug( 'Requesting %d missing transactions (%s).', - missing.length, peer.hostname); + missing.length, peer.hostname()); this.getTX(peer, missing); } - - this.emit('tx', tx, peer); }); /** @@ -1970,14 +2117,12 @@ Pool.prototype.handleReject = co(function* handleReject(peer, packet) { this.logger.warning( 'Received reject (%s): msg=%s code=%s reason=%s hash=%s.', - peer.hostname, + peer.hostname(), packet.message, packet.getCode(), packet.reason, packet.rhash()); - this.emit('reject', packet, peer); - if (!packet.hash) return; @@ -2016,11 +2161,11 @@ Pool.prototype.handleMempool = co(function* handleMempool(peer, packet) { items.push(new InvItem(invTypes.TX, hash)); } - this.logger.debug('Sending mempool snapshot (%s).', peer.hostname); + this.logger.debug( + 'Sending mempool snapshot (%s).', + peer.hostname()); peer.queueInv(items); - - this.emit('mempool', packet, peer); }); /** @@ -2031,7 +2176,7 @@ Pool.prototype.handleMempool = co(function* handleMempool(peer, packet) { */ Pool.prototype.handleFilterLoad = co(function* handleFilterLoad(peer, packet) { - this.emit('filterload', packet, peer); + ; }); /** @@ -2042,7 +2187,7 @@ Pool.prototype.handleFilterLoad = co(function* handleFilterLoad(peer, packet) { */ Pool.prototype.handleFilterAdd = co(function* handleFilterAdd(peer, packet) { - this.emit('filteradd', packet, peer); + ; }); /** @@ -2053,7 +2198,7 @@ Pool.prototype.handleFilterAdd = co(function* handleFilterAdd(peer, packet) { */ Pool.prototype.handleFilterClear = co(function* handleFilterClear(peer, packet) { - this.emit('filterclear', packet, peer); + ; }); /** @@ -2091,7 +2236,7 @@ Pool.prototype._handleMerkleBlock = co(function* handleMerkleBlock(peer, packet) if (!this.options.spv) { this.logger.warning( 'Peer sent unsolicited merkleblock (%s).', - peer.hostname); + peer.hostname()); peer.increaseBan(100); return; } @@ -2099,7 +2244,7 @@ Pool.prototype._handleMerkleBlock = co(function* handleMerkleBlock(peer, packet) if (!peer.requestMap.has(hash)) { this.logger.warning( 'Peer sent an unrequested merkleblock (%s).', - peer.hostname); + peer.hostname()); peer.destroy(); return; } @@ -2107,7 +2252,7 @@ Pool.prototype._handleMerkleBlock = co(function* handleMerkleBlock(peer, packet) if (peer.merkleBlock) { this.logger.warning( 'Peer sent a merkleblock prematurely (%s).', - peer.hostname); + peer.hostname()); peer.increaseBan(100); return; } @@ -2115,7 +2260,7 @@ Pool.prototype._handleMerkleBlock = co(function* handleMerkleBlock(peer, packet) if (!block.verify()) { this.logger.warning( 'Peer sent an invalid merkleblock (%s).', - peer.hostname); + peer.hostname()); peer.increaseBan(100); return; } @@ -2128,8 +2273,6 @@ Pool.prototype._handleMerkleBlock = co(function* handleMerkleBlock(peer, packet) peer.merkleTime = util.ms(); peer.merkleBlock = block; peer.merkleMatches = block.matches.length; - - this.emit('merkleblock', packet, peer); }); /** @@ -2140,7 +2283,7 @@ Pool.prototype._handleMerkleBlock = co(function* handleMerkleBlock(peer, packet) */ Pool.prototype.handleFeeFilter = co(function* handleFeeFilter(peer, packet) { - this.emit('feefilter', packet, peer); + ; }); /** @@ -2151,7 +2294,7 @@ Pool.prototype.handleFeeFilter = co(function* handleFeeFilter(peer, packet) { */ Pool.prototype.handleSendCmpct = co(function* handleSendCmpct(peer, packet) { - this.emit('sendcmpct', packet, peer); + ; }); /** @@ -2171,26 +2314,23 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, packet) { return; if (!this.options.compact) { - this.logger.info('Peer sent unsolicited cmpctblock (%s).', peer.hostname); - return; - } - - if (!this.mempool) { - this.logger.warning('Requesting compact blocks without a mempool!'); + this.logger.info( + 'Peer sent unsolicited cmpctblock (%s).', + peer.hostname()); return; } if (peer.compactBlocks.has(hash)) { this.logger.debug( 'Peer sent us a duplicate compact block (%s).', - peer.hostname); + peer.hostname()); return; } if (this.compactBlocks.has(hash)) { this.logger.debug( 'Already waiting for compact block %s (%s).', - hash, peer.hostname); + hash, peer.hostname()); return; } @@ -2198,7 +2338,7 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, packet) { if (this.options.blockMode !== 1) { this.logger.warning( 'Peer sent us an unrequested compact block (%s).', - peer.hostname); + peer.hostname()); peer.destroy(); return; } @@ -2207,10 +2347,15 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, packet) { this.requestMap.insert(hash); } + if (!this.mempool) { + this.logger.warning('Requesting compact blocks without a mempool!'); + return; + } + if (!block.verify()) { this.logger.debug( 'Peer sent an invalid compact block (%s).', - peer.hostname); + peer.hostname()); peer.increaseBan(100); return; } @@ -2220,14 +2365,14 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, packet) { if (result) { this.logger.debug( 'Received full compact block %s (%s).', - block.rhash(), peer.hostname); + block.rhash(), peer.hostname()); yield this.addBlock(peer, block.toBlock()); return; } if (this.options.blockMode === 1) { if (peer.compactBlocks.size >= 10) { - this.logger.warning('Compact block DoS attempt (%s).', peer.hostname); + this.logger.warning('Compact block DoS attempt (%s).', peer.hostname()); peer.destroy(); return; } @@ -2240,11 +2385,9 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, packet) { this.logger.debug( 'Received semi-full compact block %s (%s).', - block.rhash(), peer.hostname); + block.rhash(), peer.hostname()); peer.send(new packets.GetBlockTxnPacket(block.toRequest())); - - this.emit('cmpctblock', packet, peer); }); /** @@ -2274,7 +2417,7 @@ Pool.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(peer, packet) if (!block) { this.logger.debug( 'Peer sent getblocktxn for non-existent block (%s).', - peer.hostname); + peer.hostname()); peer.increaseBan(100); return; } @@ -2284,15 +2427,13 @@ Pool.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(peer, packet) if (height < this.chain.tip.height - 15) { this.logger.debug( 'Peer sent a getblocktxn for a block > 15 deep (%s)', - peer.hostname); + peer.hostname()); return; } res = BIP152.TXResponse.fromBlock(block, req); peer.send(new packets.BlockTxnPacket(res, peer.compactWitness)); - - this.emit('getblocktxn', packet, peer); }); /** @@ -2307,7 +2448,9 @@ Pool.prototype.handleBlockTxn = co(function* handleBlockTxn(peer, packet) { var block = peer.compactBlocks.get(res.hash); if (!block) { - this.logger.debug('Peer sent unsolicited blocktxn (%s).', peer.hostname); + this.logger.debug( + 'Peer sent unsolicited blocktxn (%s).', + peer.hostname()); return; } @@ -2318,17 +2461,17 @@ Pool.prototype.handleBlockTxn = co(function* handleBlockTxn(peer, packet) { if (!block.fillMissing(res)) { peer.increaseBan(100); - this.logger.warning('Peer sent non-full blocktxn (%s).', peer.hostname); + this.logger.warning( + 'Peer sent non-full blocktxn (%s).', + peer.hostname()); return; } this.logger.debug( 'Filled compact block %s (%s).', - block.rhash(), peer.hostname); + block.rhash(), peer.hostname()); yield this.addBlock(peer, block.toBlock()); - - this.emit('blocktxn', packet, peer); }); /** @@ -2339,7 +2482,7 @@ Pool.prototype.handleBlockTxn = co(function* handleBlockTxn(peer, packet) { */ Pool.prototype.handleEncinit = co(function* handleEncinit(peer, packet) { - this.emit('encinit', packet, peer); + ; }); /** @@ -2350,7 +2493,7 @@ Pool.prototype.handleEncinit = co(function* handleEncinit(peer, packet) { */ Pool.prototype.handleEncack = co(function* handleEncack(peer, packet) { - this.emit('encack', packet, peer); + ; }); /** @@ -2361,7 +2504,7 @@ Pool.prototype.handleEncack = co(function* handleEncack(peer, packet) { */ Pool.prototype.handleAuthChallenge = co(function* handleAuthChallenge(peer, packet) { - this.emit('authchallenge', packet, peer); + ; }); /** @@ -2372,7 +2515,7 @@ Pool.prototype.handleAuthChallenge = co(function* handleAuthChallenge(peer, pack */ Pool.prototype.handleAuthReply = co(function* handleAuthReply(peer, packet) { - this.emit('authreply', packet, peer); + ; }); /** @@ -2383,7 +2526,7 @@ Pool.prototype.handleAuthReply = co(function* handleAuthReply(peer, packet) { */ Pool.prototype.handleAuthPropose = co(function* handleAuthPropose(peer, packet) { - this.emit('authpropose', packet, peer); + ; }); /** @@ -2394,8 +2537,9 @@ Pool.prototype.handleAuthPropose = co(function* handleAuthPropose(peer, packet) */ Pool.prototype.handleUnknown = co(function* handleUnknown(peer, packet) { - this.logger.warning('Unknown packet: %s (%s).', packet.cmd, peer.hostname); - this.emit('unknown', packet, peer); + this.logger.warning( + 'Unknown packet: %s (%s).', + packet.cmd, peer.hostname()); }); /** @@ -2412,13 +2556,11 @@ Pool.prototype.addInbound = function addInbound(socket) { return; } - peer = this.acceptPeer(socket); + peer = this.createInbound(socket); - this.logger.info('Added inbound peer (%s).', peer.hostname); + this.logger.info('Added inbound peer (%s).', peer.hostname()); this.peers.add(peer); - - this.emit('peer', peer); }; /** @@ -2485,7 +2627,8 @@ Pool.prototype.addOutbound = function addOutbound() { if (this.peers.outbound >= this.options.maxOutbound) return; - // Hang back if we don't have a loader peer yet. + // Hang back if we don't + // have a loader peer yet. if (!this.peers.load) return; @@ -2494,7 +2637,7 @@ Pool.prototype.addOutbound = function addOutbound() { if (!addr) return; - peer = this.createPeer(addr); + peer = this.createOutbound(addr); this.peers.add(peer); @@ -2551,6 +2694,7 @@ Pool.prototype.removePeer = function removePeer(peer) { var i, hashes, hash; this.peers.remove(peer); + this.nonces.remove(peer.hostname()); hashes = peer.requestMap.keys(); @@ -2757,7 +2901,7 @@ Pool.prototype.getBlock = function getBlock(peer, hashes) { 'Requesting %d/%d blocks from peer with getdata (%s).', items.length, this.requestMap.size, - peer.hostname); + peer.hostname()); peer.getBlock(items); }; @@ -2800,7 +2944,7 @@ Pool.prototype.getTX = function getTX(peer, hashes) { 'Requesting %d/%d txs from peer with getdata (%s).', items.length, this.requestMap.size, - peer.hostname); + peer.hostname()); peer.getTX(items); }; @@ -2996,6 +3140,9 @@ function PoolOptions(options) { this.chain = null; this.mempool = null; + this.nonces = new NonceList(); + this.address = new NetAddress(); + this.witness = this.network.witness; this.useCheckpoints = false; this.spv = false; @@ -3257,6 +3404,9 @@ PoolOptions.prototype.fromOptions = function fromOptions(options) { this.requiredServices = options.requiredServices; } + this.address.fromOptions(this); + this.address.ts = this.network.now(); + return this; }; @@ -3270,6 +3420,69 @@ PoolOptions.fromOptions = function fromOptions(options) { return new PoolOptions().fromOptions(options); }; +/** + * Get the chain height. + * @private + * @returns {Number} + */ + +PoolOptions.prototype.getHeight = function getHeight() { + return this.chain.height; +}; + +/** + * Test whether the chain is synced. + * @private + * @returns {Boolean} + */ + +PoolOptions.prototype.isFull = function isFull() { + return this.chain.synced; +}; + +/** + * Create a version packet nonce. + * @private + * @param {String} hostname + * @returns {Buffer} + */ + +PoolOptions.prototype.createNonce = function createNonce(hostname) { + return this.nonces.alloc(hostname); +}; + +/** + * Test whether version nonce is ours. + * @private + * @param {Buffer} nonce + * @returns {Boolean} + */ + +PoolOptions.prototype.hasNonce = function hasNonce(nonce) { + return this.nonces.has(nonce); +}; + +/** + * Get fee rate for txid. + * @private + * @param {Hash} hash + * @returns {Rate} + */ + +PoolOptions.prototype.getRate = function getRate(hash) { + var entry; + + if (!this.mempool) + return -1; + + entry = this.mempool.getEntry(hash); + + if (!entry) + return -1; + + return entry.getRate(); +}; + /** * Peer List * @constructor @@ -3319,8 +3532,8 @@ PeerList.prototype.size = function size() { PeerList.prototype.add = function add(peer) { assert(this.list.push(peer)); - assert(!this.map[peer.hostname]); - this.map[peer.hostname] = peer; + assert(!this.map[peer.hostname()]); + this.map[peer.hostname()] = peer; if (peer.outbound) this.outbound++; @@ -3336,11 +3549,14 @@ PeerList.prototype.add = function add(peer) { PeerList.prototype.remove = function remove(peer) { assert(this.list.remove(peer)); - assert(this.map[peer.hostname]); - delete this.map[peer.hostname]; + assert(this.map[peer.hostname()]); + delete this.map[peer.hostname()]; - if (peer.isLoader()) + if (peer === this.load) { + assert(this.load === peer); + peer.loader = false; this.load = null; + } if (peer.outbound) this.outbound--; @@ -3368,25 +3584,6 @@ PeerList.prototype.has = function has(hostname) { return this.map[hostname] != null; }; -/** - * Get peers by host. - * @param {String} host - * @returns {Peer[]} - */ - -PeerList.prototype.getByHost = function getByHost(host) { - var peers = []; - var peer; - - for (peer = this.list.head; peer; peer = peer.next) { - if (peer.host !== host) - continue; - peers.push(peer); - } - - return peers; -}; - /** * Destroy peer list (kills peers). */ @@ -3596,8 +3793,52 @@ BroadcastItem.prototype.inspect = function inspect() { + '>'; }; -/* - * Helpers +/** + * NonceList + * @constructor + */ + +function NonceList() { + this.nonces = {}; + this.hosts = {}; +} + +NonceList.prototype.alloc = function alloc(hostname) { + var nonce = util.nonce(); + var key = nonce.toString('hex'); + this.nonces[key] = hostname; + this.hosts[hostname] = key; + return nonce; +}; + +NonceList.prototype.has = function has(nonce) { + var key = nonce.toString('hex'); + var hostname = this.nonces[key]; + + if (!hostname) + return false; + + delete this.nonces[key]; + delete this.hosts[hostname]; + + return true; +}; + +NonceList.prototype.remove = function remove(hostname) { + var key = this.hosts[hostname]; + + if (!key) + return false; + + delete this.nonces[key]; + delete this.hosts[hostname]; + + return true; +}; + +/** + * BlockNode + * @constructor */ function BlockNode(hash, height) { @@ -3607,10 +3848,6 @@ function BlockNode(hash, height) { this.next = null; } -function sortNodes(a, b) { - return a.height - b.height; -} - /* * Expose */ diff --git a/lib/protocol/network.js b/lib/protocol/network.js index cfade27c..dc124dab 100644 --- a/lib/protocol/network.js +++ b/lib/protocol/network.js @@ -30,8 +30,9 @@ function Network(options) { this.seeds = options.seeds; this.magic = options.magic; this.port = options.port; - this.checkpoints = options.checkpoints; + this.checkpointMap = options.checkpointMap; this.lastCheckpoint = options.lastCheckpoint; + this.checkpoints = []; this.halvingInterval = options.halvingInterval; this.genesis = options.genesis; this.genesisBlock = options.genesisBlock; @@ -92,7 +93,7 @@ Network.simnet = null; Network.prototype._init = function _init() { var bits = 0; - var i, deployment; + var i, deployment, keys, key, hash, height; for (i = 0; i < this.deploys.length; i++) { deployment = this.deploys[i]; @@ -102,6 +103,18 @@ Network.prototype._init = function _init() { bits |= consensus.VERSION_TOP_MASK; this.unknownBits = ~bits; + + keys = Object.keys(this.checkpointMap); + + for (i = 0; i < keys.length; i++) { + key = keys[i]; + hash = this.checkpointMap[key]; + height = +key; + + this.checkpoints.push({ hash: hash, height: height }); + } + + this.checkpoints.sort(cmpNode); }; /** @@ -298,6 +311,10 @@ function cmpBit(a, b) { return a.bit - b; } +function cmpNode(a, b) { + return a.height - b.height; +} + /* * Expose */ diff --git a/lib/protocol/networks.js b/lib/protocol/networks.js index 43577837..61166706 100644 --- a/lib/protocol/networks.js +++ b/lib/protocol/networks.js @@ -77,7 +77,7 @@ main.port = 8333; * @const {Object} */ -main.checkpoints = { +main.checkpointMap = { 11111: '1d7c6eb2fd42f55925e92efad68b61edd22fba29fde8783df744e26900000000', 33333: 'a6d0b5df7d0df069ceb1e736a216ad187a50b07aaa4e78748a58d52d00000000', 74000: '201a66b853f9e7814a820e2af5f5dc79c07144e31ce4c9a39339570000000000', @@ -497,7 +497,7 @@ testnet.magic = 0x0709110b; testnet.port = 18333; -testnet.checkpoints = { +testnet.checkpointMap = { 546: '70cb6af7ebbcb1315d3414029c556c55f3e2fc353c4c9063a76c932a00000000', // Custom checkpoints 10000: '02a1b43f52591e53b660069173ac83b675798e12599dbb0442b7580000000000', @@ -668,7 +668,7 @@ regtest.magic = 0xdab5bffa; regtest.port = 48444; -regtest.checkpoints = {}; +regtest.checkpointMap = {}; regtest.lastCheckpoint = 0; regtest.halvingInterval = 150; @@ -819,7 +819,7 @@ segnet4.magic = 0xc4a1abdc; segnet4.port = 28901; -segnet4.checkpoints = {}; +segnet4.checkpointMap = {}; segnet4.lastCheckpoint = 0; segnet4.halvingInterval = 210000; @@ -963,7 +963,7 @@ simnet.magic = 0x12141c16; simnet.port = 18555; -simnet.checkpoints = {}; +simnet.checkpointMap = {}; simnet.lastCheckpoint = 0; diff --git a/lib/utils/asyncemitter.js b/lib/utils/asyncemitter.js new file mode 100644 index 00000000..476131ef --- /dev/null +++ b/lib/utils/asyncemitter.js @@ -0,0 +1,315 @@ +/*! + * asyncemitter.js - event emitter which resolves promises. + * Copyright (c) 2014-2016, Christopher Jeffrey (MIT License). + * https://github.com/bcoin-org/bcoin + */ + +'use strict'; + +var assert = require('assert'); +var co = require('./co'); + +/** + * Represents a promise-resolving event emitter. + * @see EventEmitter + * @constructor + */ + +function AsyncEmitter() { + if (!(this instanceof AsyncEmitter)) + return new AsyncEmitter(); + + this._events = Object.create(null); +} + +/** + * Add a listener. + * @param {String} type + * @param {Function} handler + */ + +AsyncEmitter.prototype.addListener = function addListener(type, handler) { + return this._push(type, handler, false); +}; + +/** + * Add a listener. + * @param {String} type + * @param {Function} handler + */ + +AsyncEmitter.prototype.on = function on(type, handler) { + return this.addListener(type, handler); +}; + +/** + * Add a listener to execute once. + * @param {String} type + * @param {Function} handler + */ + +AsyncEmitter.prototype.once = function once(type, handler) { + return this._push(type, handler, true); +}; + +/** + * Prepend a listener. + * @param {String} type + * @param {Function} handler + */ + +AsyncEmitter.prototype.prependListener = function prependListener(type, handler) { + return this._unshift(type, handler, false); +}; + +/** + * Prepend a listener to execute once. + * @param {String} type + * @param {Function} handler + */ + +AsyncEmitter.prototype.prependOnceListener = function prependOnceListener(type, handler) { + return this._unshift(type, handler, true); +}; + +/** + * Push a listener. + * @private + * @param {String} type + * @param {Function} handler + * @param {Boolean} once + */ + +AsyncEmitter.prototype._push = function _push(type, handler, once) { + assert(typeof type === 'string', '`type` must be a string.'); + + if (!this._events[type]) + this._events[type] = []; + + this._events[type].push(new Listener(handler, once)); + + this.tryEmit('newListener', type, handler); +}; + +/** + * Unshift a listener. + * @param {String} type + * @param {Function} handler + * @param {Boolean} once + */ + +AsyncEmitter.prototype._unshift = function _unshift(type, handler, once) { + assert(typeof type === 'string', '`type` must be a string.'); + + if (!this._events[type]) + this._events[type] = []; + + this._events[type].unshift(new Listener(handler, once)); + + this.tryEmit('newListener', type, handler); +}; + +/** + * Remove a listener. + * @param {String} type + * @param {Function} handler + */ + +AsyncEmitter.prototype.removeListener = function removeListener(type, handler) { + var i, listeners, listener; + var index = -1; + + assert(typeof type === 'string', '`type` must be a string.'); + + listeners = this._events[type]; + + if (!listeners) + return; + + for (i = 0; i < listeners.length; i++) { + listener = listeners[i]; + if (listener.handler === handler) { + index = i; + break; + } + } + + if (index === -1) + return; + + listeners.splice(index, 1); + + if (listeners.length === 0) + delete this._events[type]; + + this.tryEmit('removeListener', type, handler); +}; + +/** + * Set max listeners. + * @param {Number} max + */ + +AsyncEmitter.prototype.setMaxListeners = function setMaxListeners(max) { + assert(typeof max === 'number', '`max` must be a number.'); + assert(max >= 0, '`max` must be non-negative.'); + assert(max % 1 === 0, '`max` must be an integer.'); +}; + +/** + * Remove all listeners. + * @param {String?} type + */ + +AsyncEmitter.prototype.removeAllListeners = function removeAllListeners(type) { + if (arguments.length === 0) { + this._events = Object.create(null); + return; + } + + assert(typeof type === 'string', '`type` must be a string.'); + + delete this._events[type]; +}; + +/** + * Get listeners array. + * @param {String} type + * @returns {Function[]} + */ + +AsyncEmitter.prototype.listeners = function listeners(type) { + var i, listeners, listener; + var result = []; + + assert(typeof type === 'string', '`type` must be a string.'); + + listeners = this._events[type]; + + if (!listeners) + return result; + + for (i = 0; i < listeners.length; i++) { + listener = listeners[i]; + result.push(listener.handler); + } + + return result; +}; + +/** + * Get listener count for an event. + * @param {String} type + */ + +AsyncEmitter.prototype.listenerCount = function listenerCount(type) { + var listeners; + + assert(typeof type === 'string', '`type` must be a string.'); + + listeners = this._events[type]; + + if (!listeners) + return 0; + + return listeners.length; +}; + +/** + * Emit an event. Wait for promises to resolve. + * @param {String} type + * @param {...Object} args + * @returns {Promise} + */ + +AsyncEmitter.prototype.emit = co(function* emit(type) { + var i, j, listeners, error, err, args, listener, handler; + + assert(typeof type === 'string', '`type` must be a string.'); + + listeners = this._events[type]; + + if (!listeners || listeners.length === 0) { + if (type === 'error') { + error = arguments[1]; + + if (error instanceof Error) + throw error; + + err = new Error('Uncaught, unspecified "error" event. (' + error + ')'); + err.context = error; + throw err; + } + return; + } + + for (i = 0; i < listeners.length; i++) { + listener = listeners[i]; + handler = listener.handler; + + if (listener.once) { + listeners.splice(i, 1); + i--; + } + + switch (arguments.length) { + case 1: + yield handler(); + break; + case 2: + yield handler(arguments[1]); + break; + case 3: + yield handler(arguments[1], arguments[2]); + break; + case 4: + yield handler(arguments[1], arguments[2], arguments[3]); + break; + default: + if (!args) { + args = new Array(arguments.length - 1); + for (j = 1; j < arguments.length; j++) + args[j - 1] = arguments[j]; + } + yield handler.apply(null, args); + break; + } + } +}); + +/** + * Emit an event. Ignore rejections. + * @param {String} type + * @param {...Object} args + * @returns {Promise} + */ + +AsyncEmitter.prototype.tryEmit = co(function* tryEmit() { + try { + yield this.emit.apply(this, arguments); + } catch (e) { + ; + } +}); + +/** + * Event Listener + * @constructor + * @param {Function} handler + * @param {Boolean} once + * @property {Function} handler + * @property {Boolean} once + */ + +function Listener(handler, once) { + assert(typeof handler === 'function', '`handler` must be a function.'); + assert(typeof once === 'boolean', '`once` must be a function.'); + this.handler = handler; + this.once = once; +} + +/* + * Expose + */ + +module.exports = AsyncEmitter;