From cd8e4640797d36f2abc21d93c8f1e1c01f5375fc Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Wed, 4 Jan 2017 17:31:32 -0800 Subject: [PATCH] net: redesign stall behavior and block management. --- lib/net/bip152.js | 34 -- lib/net/packets.js | 93 +++-- lib/net/peer.js | 780 ++++++++++++++++++++++---------------- lib/net/pool.js | 770 +++++++++++++------------------------ lib/primitives/invitem.js | 9 + package.json | 1 + 6 files changed, 774 insertions(+), 913 deletions(-) diff --git a/lib/net/bip152.js b/lib/net/bip152.js index 821d6c13..388f837d 100644 --- a/lib/net/bip152.js +++ b/lib/net/bip152.js @@ -11,7 +11,6 @@ var BufferReader = require('../utils/reader'); var BufferWriter = require('../utils/writer'); var StaticWriter = require('../utils/staticwriter'); var encoding = require('../utils/encoding'); -var co = require('../utils/co'); var crypto = require('../crypto/crypto'); var assert = require('assert'); var constants = require('../protocol/constants'); @@ -45,8 +44,6 @@ function CompactBlock(options) { this.idMap = {}; this.count = 0; this.sipKey = null; - this.timeout = null; - this.callback = null; if (options) this.fromOptions(options); @@ -404,37 +401,6 @@ CompactBlock.fromBlock = function fromBlock(block, witness, nonce) { return new CompactBlock().fromBlock(block, witness, nonce); }; -CompactBlock.prototype.wait = function wait(time) { - var self = this; - return new Promise(function(resolve, reject) { - self._wait(time, co.wrap(resolve, reject)); - }); -}; - -CompactBlock.prototype._wait = function wait(time, callback) { - var self = this; - assert(this.timeout == null); - this.callback = callback; - this.timeout = setTimeout(function() { - self.complete(new Error('Timed out.')); - }, time); -}; - -CompactBlock.prototype.complete = function complete(err) { - if (this.timeout != null) { - clearTimeout(this.timeout); - this.timeout = null; - this.callback(err); - } -}; - -CompactBlock.prototype.destroy = function destroy() { - if (this.timeout != null) { - clearTimeout(this.timeout); - this.timeout = null; - } -}; - CompactBlock.prototype.toHeaders = function toHeaders() { return Headers.fromBlock(this); }; diff --git a/lib/net/packets.js b/lib/net/packets.js index 34f3f571..e0f6dc9c 100644 --- a/lib/net/packets.js +++ b/lib/net/packets.js @@ -37,42 +37,53 @@ var DUMMY = new Buffer(0); exports.types = { VERSION: 0, VERACK: 1, - PING: 1, - PONG: 2, - ALERT: 3, - GETADDR: 4, - ADDR: 5, - INV: 6, - GETDATA: 7, - NOTFOUND: 8, - GETBLOCKS: 9, - GETHEADERS: 10, - HEADERS: 11, - SENDHEADERS: 12, - BLOCK: 13, - TX: 14, - REJECT: 15, - MEMPOOL: 16, - FILTERLOAD: 17, - FILTERADD: 18, - FILTERCLEAR: 19, - MERKLEBLOCK: 20, - GETUTXOS: 21, - UTXOS: 22, - HAVEWITNESS: 23, - FEEFILTER: 24, - SENDCMPCT: 25, - CMPCTBLOCK: 26, - GETBLOCKTXN: 27, - BLOCKTXN: 28, - ENCINIT: 29, - ENCACK: 30, - AUTHCHALLENGE: 31, - AUTHREPLY: 32, - AUTHPROPOSE: 33, - UNKNOWN: 34 + PING: 2, + PONG: 3, + ALERT: 4, + GETADDR: 5, + ADDR: 6, + INV: 7, + GETDATA: 8, + NOTFOUND: 9, + GETBLOCKS: 10, + GETHEADERS: 11, + HEADERS: 12, + SENDHEADERS: 13, + BLOCK: 14, + TX: 15, + REJECT: 16, + MEMPOOL: 17, + FILTERLOAD: 18, + FILTERADD: 19, + FILTERCLEAR: 20, + MERKLEBLOCK: 21, + GETUTXOS: 22, + UTXOS: 23, + HAVEWITNESS: 24, + FEEFILTER: 25, + SENDCMPCT: 26, + CMPCTBLOCK: 27, + GETBLOCKTXN: 28, + BLOCKTXN: 29, + ENCINIT: 30, + ENCACK: 31, + AUTHCHALLENGE: 32, + AUTHREPLY: 33, + AUTHPROPOSE: 34, + UNKNOWN: 35, + // Internal + INTERNAL: 100, + DATA: 101 }; +/** + * Packet types by value. + * @const {Object} + * @default + */ + +exports.typesByVal = util.revMap(exports.types); + /** * Base Packet * @constructor @@ -141,7 +152,7 @@ Packet.prototype.fromRaw = function fromRaw(data) { * @param {Buffer} options.nonce * @param {String} options.agent - User agent string. * @param {Number} options.height - Chain height. - * @param {Boolean} options.relay - Whether transactions + * @param {Boolean} options.noRelay - Whether transactions * should be relayed immediately. * @property {Number} version - Protocol version. * @property {Number} services - Service bits. @@ -151,7 +162,7 @@ Packet.prototype.fromRaw = function fromRaw(data) { * @property {Buffer} nonce * @property {String} agent - User agent string. * @property {Number} height - Chain height. - * @property {Boolean} relay - Whether transactions + * @property {Boolean} noRelay - Whether transactions * should be relayed immediately. */ @@ -169,7 +180,7 @@ function VersionPacket(options) { this.nonce = constants.ZERO_U64; this.agent = constants.USER_AGENT; this.height = 0; - this.relay = true; + this.noRelay = false; if (options) this.fromOptions(options); @@ -211,8 +222,8 @@ VersionPacket.prototype.fromOptions = function fromOptions(options) { if (options.height != null) this.height = options.height; - if (options.relay != null) - this.relay = options.relay; + if (options.noRelay != null) + this.noRelay = options.noRelay; return this; }; @@ -258,7 +269,7 @@ VersionPacket.prototype.toWriter = function toWriter(bw) { bw.writeBytes(this.nonce); bw.writeVarString(this.agent, 'ascii'); bw.write32(this.height); - bw.writeU8(this.relay ? 1 : 0); + bw.writeU8(this.noRelay ? 0 : 1); return bw; }; @@ -366,7 +377,7 @@ VersionPacket.prototype.fromReader = function fromReader(br) { this.height = br.read32(); if (br.left() > 0) - this.relay = br.readU8() === 1; + this.noRelay = br.readU8() === 0; if (this.version === 10300) this.version = 300; diff --git a/lib/net/peer.js b/lib/net/peer.js index 1091c860..cbd05075 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -24,7 +24,6 @@ var BIP152 = require('./bip152'); var Block = require('../primitives/block'); var TX = require('../primitives/tx'); var errors = require('../btc/errors'); -var List = require('../utils/list'); var NetAddress = require('../primitives/netaddress'); var invTypes = constants.inv; var packetTypes = packets.types; @@ -58,7 +57,7 @@ var VerifyResult = errors.VerifyResult; * @property {Hash?} hashContinue - The block hash at which to continue * the sync for the peer. * @property {Bloom?} spvFilter - The _peer's_ bloom spvFilter. - * @property {Boolean} relay - Whether to relay transactions + * @property {Boolean} noRelay - Whether to relay transactions * immediately to the peer. * @property {BN} challenge - Local nonce. * @property {Number} lastPong - Timestamp for last `pong` @@ -89,9 +88,9 @@ function Peer(pool) { this.socket = null; this.outbound = false; this.address = new NetAddress(); + this.connected = false; this.destroyed = false; this.ack = false; - this.connected = false; this.ts = 0; this.lastSend = 0; this.lastRecv = 0; @@ -105,11 +104,11 @@ function Peer(pool) { this.haveWitness = false; this.hashContinue = null; this.spvFilter = null; - this.relay = true; + this.noRelay = false; this.feeRate = -1; this.bip151 = null; this.bip150 = null; - this.compactMode = null; + this.compactMode = -1; this.compactWitness = false; this.compactBlocks = {}; this.lastMerkle = null; @@ -121,21 +120,19 @@ function Peer(pool) { this.lastPong = -1; this.lastPing = -1; this.minPing = -1; + this.lastBlock = -1; this.connectTimeout = null; this.pingTimer = null; - this.pingInterval = 30000; this.stallTimer = null; - this.stallInterval = 5000; this.addrFilter = new Bloom.Rolling(5000, 0.001); this.invFilter = new Bloom.Rolling(50000, 0.000001); - this.requestTimeout = 10000; this.requestMap = {}; + this.queueMap = {}; - this.queueBlock = new List(); - this.queueTX = new List(); + this.responseMap = {}; if (this.options.bip151) { this.bip151 = new BIP151(); @@ -158,6 +155,12 @@ function Peer(pool) { util.inherits(Peer, EventEmitter); +Peer.DRAIN_MAX = 5 << 20; +Peer.DRAIN_TIMEOUT = 10000; +Peer.STALL_INTERVAL = 5000; +Peer.PING_INTERVAL = 30000; +Peer.RESPONSE_TIMEOUT = 30000; + Peer.prototype.__defineGetter__('host', function() { return this.address.host; }); @@ -241,6 +244,7 @@ Peer.prototype.bind = function bind(socket) { if (self.maybeStall()) return; + self.lastRecv = util.ms(); self.parser.feed(chunk); }); }; @@ -324,17 +328,9 @@ Peer.prototype.open = co(function* open() { */ Peer.prototype._open = co(function* open() { - // Mark address attempt. - this.markAttempt(); - // Connect to peer. yield this.initConnect(); yield this.initStall(); - - // Mark address success. - this.markSuccess(); - - // Handshake. yield this.initBIP151(); yield this.initBIP150(); yield this.initVersion(); @@ -342,9 +338,6 @@ Peer.prototype._open = co(function* open() { assert(!this.destroyed); - // Mark address ack. - this.markAck(); - // Finally we can let the pool know // that this peer is ready to go. this.emit('open'); @@ -404,9 +397,11 @@ Peer.prototype.initConnect = function initConnect() { Peer.prototype.initStall = function initStall() { var self = this; assert(!this.stallTimer); + assert(!this.destroyed); this.stallTimer = setInterval(function() { self.maybeStall(); - }, this.stallInterval); + self.maybeTimeout(); + }, Peer.STALL_INTERVAL); return Promise.resolve(); }; @@ -468,9 +463,7 @@ Peer.prototype.initBIP150 = co(function* initBIP150() { yield this.bip150.wait(3000); - if (this.destroyed) - throw new Error('Peer was destroyed during BIP150 handshake.'); - + assert(!this.destroyed); assert(this.bip150.completed); if (this.bip150.auth) { @@ -486,6 +479,8 @@ Peer.prototype.initBIP150 = co(function* initBIP150() { */ Peer.prototype.initVersion = co(function* initVersion() { + assert(!this.destroyed); + // Say hello. this.sendVersion(); @@ -496,10 +491,10 @@ Peer.prototype.initVersion = co(function* initVersion() { this.send(new packets.AddrPacket([this.pool.address])); } - yield this.request('verack'); - - if (this.destroyed) - throw new Error('Peer was destroyed during version handshake.'); + if (!this.ack) { + yield this.wait(packetTypes.VERACK, 10000); + assert(this.ack); + } // Wait for _their_ version. if (!this.version) { @@ -507,56 +502,11 @@ Peer.prototype.initVersion = co(function* initVersion() { 'Peer sent a verack without a version (%s).', this.hostname); - yield this.request('version'); - - if (this.destroyed) - throw new Error('Peer was destroyed during version handshake.'); + yield this.wait(packetTypes.VERSION, 10000); assert(this.version); } - if (!this.network.selfConnect) { - if (util.equal(this.version.nonce, this.pool.localNonce)) - throw new Error('We connected to ourself. Oops.'); - } - - if (this.version.version < constants.MIN_VERSION) - throw new Error('Peer does not support required protocol version.'); - - if (this.options.witness) { - this.haveWitness = this.version.hasWitness(); - if (!this.haveWitness && this.network.oldWitness) { - try { - yield this.request('havewitness'); - this.haveWitness = true; - } catch (err) { - ; - } - } - } - - if (this.outbound) { - if (!this.version.hasNetwork()) - throw new Error('Peer does not support network services.'); - - if (this.options.headers) { - if (!this.version.hasHeaders()) - throw new Error('Peer does not support getheaders.'); - } - - if (this.options.spv) { - if (!this.version.hasBloom()) - throw new Error('Peer does not support BIP37.'); - } - - if (this.options.witness) { - if (!this.haveWitness) - throw new Error('Peer does not support segregated witness.'); - } - } - - this.ack = true; - this.logger.debug('Received verack (%s).', this.hostname); }); @@ -568,10 +518,12 @@ Peer.prototype.initVersion = co(function* initVersion() { Peer.prototype.finalize = co(function* finalize() { var self = this; + assert(!this.destroyed); + // Setup the ping interval. this.pingTimer = setInterval(function() { self.sendPing(); - }, this.pingInterval); + }, Peer.PING_INTERVAL); // Ask for headers-only. if (this.options.headers) { @@ -649,7 +601,7 @@ Peer.prototype.announceBlock = function announceBlock(blocks) { // Send them the block immediately if // they're using compact block mode 1. - if (this.compactMode && this.compactMode.mode === 1) { + if (this.compactMode === 1) { this.invFilter.add(block.hash()); this.sendCompactBlock(block, this.compactWitness); continue; @@ -690,7 +642,7 @@ Peer.prototype.announceTX = function announceTX(txs) { // Do not send txs to spv clients // that have relay unset. - if (!this.relay) + if (this.noRelay) return; if (!Array.isArray(txs)) @@ -701,6 +653,10 @@ Peer.prototype.announceTX = function announceTX(txs) { assert(tx instanceof TX); + // Don't send if they already have it. + if (this.invFilter.test(tx.hash())) + continue; + // Check the peer's bloom // filter if they're using spv. if (this.spvFilter) { @@ -716,10 +672,6 @@ Peer.prototype.announceTX = function announceTX(txs) { continue; } - // Don't send if they already have it. - if (this.invFilter.test(tx.hash())) - continue; - inv.push(tx.toInv()); } @@ -731,18 +683,22 @@ Peer.prototype.announceTX = function announceTX(txs) { */ Peer.prototype.announceList = function announceList() { - var txs = []; var blocks = []; - var item; + var txs = []; + var hashes = Object.keys(this.pool.invMap); + var i, hash, item; + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + item = this.pool.invMap[hash]; - for (item = this.pool.invItems.head; item; item = item.next) { switch (item.type) { - case invTypes.TX: - txs.push(item.msg); - break; case invTypes.BLOCK: blocks.push(item.msg); break; + case invTypes.TX: + txs.push(item.msg); + break; default: assert(false, 'Bad item type.'); break; @@ -837,7 +793,7 @@ Peer.prototype.sendVersion = function sendVersion() { packet.nonce = this.pool.localNonce; packet.agent = constants.USER_AGENT; packet.height = this.chain.height; - packet.relay = this.options.relay; + packet.noRelay = this.options.noRelay; this.send(packet); }; @@ -909,7 +865,8 @@ Peer.prototype.sendFeeRate = function sendFeeRate(rate) { */ Peer.prototype.destroy = function destroy() { - var i, keys, cmd, queue, entry, hash; + var connected = this.connected; + var i, keys, cmd, entry, hash; if (this.destroyed) return; @@ -943,14 +900,13 @@ Peer.prototype.destroy = function destroy() { this.connectTimeout = null; } - keys = Object.keys(this.requestMap); + keys = Object.keys(this.responseMap); for (i = 0; i < keys.length; i++) { cmd = keys[i]; - queue = this.requestMap[cmd]; - for (entry = queue.head; entry; entry = entry.next) - entry.stop(); - delete this.requestMap[cmd]; + entry = this.responseMap[cmd]; + delete this.responseMap[cmd]; + entry.reject(new Error('Peer was destroyed.')); } keys = Object.keys(this.compactBlocks); @@ -958,12 +914,13 @@ Peer.prototype.destroy = function destroy() { for (i = 0; i < keys.length; i++) { hash = keys[i]; entry = this.compactBlocks[hash]; + delete this.compactBlocks[hash]; entry.destroy(); } this.locker.destroy(); - this.emit('close'); + this.emit('close', connected); }; /** @@ -985,8 +942,7 @@ Peer.prototype.write = function write(data) { /** * Add to drain counter. * @private - * @param {Function} resolve - * @param {Function} reject + * @param {Number} size */ Peer.prototype.needsDrain = function needsDrain(size) { @@ -996,10 +952,10 @@ Peer.prototype.needsDrain = function needsDrain(size) { return; } - this.drainStart = util.now(); + this.drainStart = util.ms(); this.drainSize += size; - if (this.drainSize >= (5 << 20)) { + if (this.drainSize >= Peer.DRAIN_MAX) { this.logger.warning( 'Peer is not reading: %dmb buffered (%s).', util.mb(this.drainSize), @@ -1018,16 +974,163 @@ Peer.prototype.maybeStall = function maybeStall() { if (this.drainSize === 0) return false; - if (util.now() < this.drainStart + 10) + if (util.ms() < this.drainStart + Peer.DRAIN_TIMEOUT) return false; this.drainSize = 0; - this.error('Peer stalled.'); + this.error('Peer stalled (write).'); this.destroy(); return true; }; +/** + * Potentially add response timeout. + * @private + * @param {Packet} packet + */ + +Peer.prototype.addTimeout = function addTimeout(packet) { + var timeout = Peer.RESPONSE_TIMEOUT; + + switch (packet.type) { + case packetTypes.MEMPOOL: + case packetTypes.GETBLOCKS: + this.request(packetTypes.INV, timeout); + break; + case packetTypes.GETHEADERS: + this.request(packetTypes.HEADERS, timeout * 2); + break; + case packetTypes.GETDATA: + this.request(packetTypes.DATA, timeout); + break; + case packetTypes.GETBLOCKTXN: + this.request(packetTypes.BLOCKTXN, timeout); + break; + } +}; + +/** + * Potentially finish response timeout. + * @private + * @param {Packet} packet + */ + +Peer.prototype.fulfill = function fulfill(packet) { + var entry; + + switch (packet.type) { + case packetTypes.BLOCK: + case packetTypes.CMPCTBLOCK: + case packetTypes.MERKLEBLOCK: + case packetTypes.TX: + case packetTypes.NOTFOUND: + entry = this.response(packetTypes.DATA, packet); + assert(!entry || entry.jobs.length === 0); + break; + } + + return this.response(packet.type, packet); +}; + +/** + * Potentially timeout peer if it hasn't responded. + * @private + */ + +Peer.prototype.maybeTimeout = function maybeTimeout() { + var keys = Object.keys(this.responseMap); + var now = util.ms(); + var i, key, entry, name; + + for (i = 0; i < keys.length; i++) { + key = keys[i]; + entry = this.responseMap[key]; + if (now > entry.timeout) { + name = packets.typesByVal[key]; + this.error('Peer is stalling (%s).', name.toLowerCase()); + this.destroy(); + return; + } + } + + if (!this.pool.syncing || this.chain.synced) + return; + + if (!this.isLoader()) + return; + + if (now > this.lastBlock + 60000) { + this.error('Peer is stalling (block).'); + this.destroy(); + } +}; + +/** + * Wait for a packet to be received from peer. + * @private + * @param {Number} type - Packet type. + * @param {Number} timeout + * @returns {RequestEntry} + */ + +Peer.prototype.request = function request(type, timeout) { + var entry = this.responseMap[type]; + + if (this.destroyed) + return; + + if (!entry) { + entry = new RequestEntry(); + this.responseMap[type] = entry; + } + + entry.setTimeout(timeout); + + return entry; +}; + +/** + * Fulfill awaiting requests created with {@link Peer#request}. + * @private + * @param {Number} type - Packet type. + * @param {Object} payload + */ + +Peer.prototype.response = function response(type, payload) { + var entry = this.responseMap[type]; + + if (!entry) + return; + + delete this.responseMap[type]; + + return entry; +}; + +/** + * Wait for a packet to be received from peer. + * @private + * @param {Number} type - Packet type. + * @returns {Promise} - Returns Object(payload). + * Executed on timeout or once packet is received. + */ + +Peer.prototype.wait = function wait(type, timeout) { + var self = this; + return new Promise(function(resolve, reject) { + var entry; + + if (self.destroyed) + return reject(new Error('Request destroyed.')); + + entry = self.request(type); + + entry.setTimeout(timeout); + entry.addJob(resolve, reject); + }); +}; + /** * Send a packet. * @param {Packet} packet @@ -1049,6 +1152,8 @@ Peer.prototype.send = function send(packet) { } } + this.addTimeout(packet); + this.sendRaw(packet.cmd, packet.toRaw(), checksum); }; @@ -1085,57 +1190,6 @@ Peer.prototype.error = function error(err) { this.emit('error', err); }; -/** - * Wait for a packet to be received from peer. - * @private - * @param {String} cmd - Packet name. - * @returns {Promise} - Returns Object(payload). - * Executed on timeout or once packet is received. - */ - -Peer.prototype.request = function request(cmd) { - var self = this; - return new Promise(function(resolve, reject) { - var entry; - - if (self.destroyed) - return reject(new Error('Request destroyed.')); - - entry = new RequestEntry(self, cmd, resolve, reject); - - if (!self.requestMap[cmd]) - self.requestMap[cmd] = new List(); - - self.requestMap[cmd].push(entry); - }); -}; - -/** - * Fulfill awaiting requests created with {@link Peer#request}. - * @private - * @param {String} cmd - Packet name. - * @param {Object} payload - */ - -Peer.prototype.response = function response(cmd, payload) { - var queue = this.requestMap[cmd]; - var entry; - - if (!queue) - return false; - - entry = queue.shift(); - assert(entry); - - if (queue.size === 0) - delete this.requestMap[cmd]; - - entry.stop(); - entry.resolve(payload); - - return true; -}; - /** * Calculate peer block inv type (filtered, * compact, witness, or non-witness). @@ -1146,7 +1200,7 @@ Peer.prototype.blockType = function blockType() { if (this.options.spv) return invTypes.FILTERED_BLOCK; - if (this.options.compact && this.compactMode) { + if (this.options.compact && this.compactMode !== -1) { if (!this.options.witness || this.compactWitness) return invTypes.CMPCT_BLOCK; } @@ -1171,19 +1225,47 @@ Peer.prototype.txType = function txType() { /** * Send `getdata` to peer. - * @param {LoadRequest[]} items + * @param {InvItem[]} items */ Peer.prototype.getData = function getData(items) { - var inv = []; - var i, item; + this.send(new packets.GetDataPacket(items)); +}; - for (i = 0; i < items.length; i++) { - item = items[i]; - inv.push(item.toInv()); +/** + * Send batched `getdata` to peer. + * @param {InvType} type + * @param {Hash[]} hashes + */ + +Peer.prototype.getItems = function getItems(type, hashes) { + var items = []; + var i, hash; + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + items.push(new InvItem(type, hash)); } - this.send(new packets.GetDataPacket(inv)); + this.getData(items); +}; + +/** + * Send batched `getdata` to peer (blocks). + * @param {Hash[]} hashes + */ + +Peer.prototype.getBlock = function getBlock(hashes) { + this.getItems(this.blockType(), hashes); +}; + +/** + * Send batched `getdata` to peer (txs). + * @param {Hash[]} hashes + */ + +Peer.prototype.getTX = function getTX(hashes) { + this.getItems(this.txType(), hashes); }; /** @@ -1224,6 +1306,8 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { */ Peer.prototype.onPacket = co(function* onPacket(packet) { + var entry; + if (this.destroyed) throw new Error('Destroyed peer sent a packet.'); @@ -1247,85 +1331,124 @@ Peer.prototype.onPacket = co(function* onPacket(packet) { this.flushMerkle(); } - this.lastRecv = util.ms(); + entry = this.fulfill(packet); switch (packet.type) { case packetTypes.VERSION: - return yield this.handleVersion(packet); + yield this.handleVersion(packet); + break; case packetTypes.VERACK: - return yield this.handleVerack(packet); + yield this.handleVerack(packet); + break; case packetTypes.PING: - return yield this.handlePing(packet); + yield this.handlePing(packet); + break; case packetTypes.PONG: - return yield this.handlePong(packet); + yield this.handlePong(packet); + break; case packetTypes.ALERT: - return yield this.handleAlert(packet); + yield this.handleAlert(packet); + break; case packetTypes.GETADDR: - return yield this.handleGetAddr(packet); + yield this.handleGetAddr(packet); + break; case packetTypes.ADDR: - return yield this.handleAddr(packet); + yield this.handleAddr(packet); + break; case packetTypes.INV: - return yield this.handleInv(packet); + yield this.handleInv(packet); + break; case packetTypes.GETDATA: - return yield this.handleGetData(packet); + yield this.handleGetData(packet); + break; case packetTypes.NOTFOUND: - return yield this.handleNotFound(packet); + yield this.handleNotFound(packet); + break; case packetTypes.GETBLOCKS: - return yield this.handleGetBlocks(packet); + yield this.handleGetBlocks(packet); + break; case packetTypes.GETHEADERS: - return yield this.handleGetHeaders(packet); + yield this.handleGetHeaders(packet); + break; case packetTypes.HEADERS: - return yield this.handleHeaders(packet); + yield this.handleHeaders(packet); + break; case packetTypes.SENDHEADERS: - return yield this.handleSendHeaders(packet); + yield this.handleSendHeaders(packet); + break; case packetTypes.BLOCK: - return yield this.handleBlock(packet); + yield this.handleBlock(packet); + break; case packetTypes.TX: - return yield this.handleTX(packet); + yield this.handleTX(packet); + break; case packetTypes.REJECT: - return yield this.handleReject(packet); + yield this.handleReject(packet); + break; case packetTypes.MEMPOOL: - return yield this.handleMempool(packet); + yield this.handleMempool(packet); + break; case packetTypes.FILTERLOAD: - return yield this.handleFilterLoad(packet); + yield this.handleFilterLoad(packet); + break; case packetTypes.FILTERADD: - return yield this.handleFilterAdd(packet); + yield this.handleFilterAdd(packet); + break; case packetTypes.FILTERCLEAR: - return yield this.handleFilterClear(packet); + yield this.handleFilterClear(packet); + break; case packetTypes.MERKLEBLOCK: - return yield this.handleMerkleBlock(packet); + yield this.handleMerkleBlock(packet); + break; case packetTypes.GETUTXOS: - return yield this.handleGetUTXOs(packet); + yield this.handleGetUTXOs(packet); + break; case packetTypes.UTXOS: - return yield this.handleUTXOs(packet); + yield this.handleUTXOs(packet); + break; case packetTypes.HAVEWITNESS: - return yield this.handleHaveWitness(packet); + yield this.handleHaveWitness(packet); + break; case packetTypes.FEEFILTER: - return yield this.handleFeeFilter(packet); + yield this.handleFeeFilter(packet); + break; case packetTypes.SENDCMPCT: - return yield this.handleSendCmpct(packet); + yield this.handleSendCmpct(packet); + break; case packetTypes.CMPCTBLOCK: - return yield this.handleCmpctBlock(packet); + yield this.handleCmpctBlock(packet); + break; case packetTypes.GETBLOCKTXN: - return yield this.handleGetBlockTxn(packet); + yield this.handleGetBlockTxn(packet); + break; case packetTypes.BLOCKTXN: - return yield this.handleBlockTxn(packet); + yield this.handleBlockTxn(packet); + break; case packetTypes.ENCINIT: - return yield this.handleEncinit(packet); + yield this.handleEncinit(packet); + break; case packetTypes.ENCACK: - return yield this.handleEncack(packet); + yield this.handleEncack(packet); + break; case packetTypes.AUTHCHALLENGE: - return yield this.handleAuthChallenge(packet); + yield this.handleAuthChallenge(packet); + break; case packetTypes.AUTHREPLY: - return yield this.handleAuthReply(packet); + yield this.handleAuthReply(packet); + break; case packetTypes.AUTHPROPOSE: - return yield this.handleAuthPropose(packet); + yield this.handleAuthPropose(packet); + break; case packetTypes.UNKNOWN: - return yield this.handleUnknown(packet); + yield this.handleUnknown(packet); + break; default: assert(false, 'Bad packet type.'); break; } + + if (entry) + entry.resolve(packet); }); /** @@ -1336,22 +1459,12 @@ Peer.prototype.onPacket = co(function* onPacket(packet) { Peer.prototype.flushMerkle = function flushMerkle() { assert(this.lastMerkle); - this.fire('merkleblock', this.lastMerkle); + this.lastBlock = util.ms(); + this.emit('merkleblock', this.lastMerkle); this.lastMerkle = null; this.waitingTX = 0; }; -/** - * Emit an event and fulfill a response. - * @param {String} cmd - * @param {Object} payload - */ - -Peer.prototype.fire = function fire(cmd, payload) { - this.response(cmd, payload); - this.emit(cmd, payload); -}; - /** * Handle `filterload` packet. * @private @@ -1365,7 +1478,7 @@ Peer.prototype.handleFilterLoad = co(function* handleFilterLoad(packet) { } this.spvFilter = packet.filter; - this.relay = true; + this.noRelay = false; }); /** @@ -1385,7 +1498,7 @@ Peer.prototype.handleFilterAdd = co(function* handleFilterAdd(packet) { if (this.spvFilter) this.spvFilter.add(data); - this.relay = true; + this.noRelay = false; }); /** @@ -1398,7 +1511,7 @@ Peer.prototype.handleFilterClear = co(function* handleFilterClear(packet) { if (this.spvFilter) this.spvFilter.reset(); - this.relay = true; + this.noRelay = false; }); /** @@ -1444,7 +1557,7 @@ Peer.prototype.handleFeeFilter = co(function* handleFeeFilter(packet) { this.feeRate = rate; - this.fire('feefilter', rate); + this.emit('feefilter', rate); }); /** @@ -1456,7 +1569,7 @@ Peer.prototype.handleFeeFilter = co(function* handleFeeFilter(packet) { Peer.prototype.handleUTXOs = co(function* handleUTXOs(utxos) { this.logger.debug('Received %d utxos (%s).', utxos.coins.length, this.hostname); - this.fire('utxos', utxos); + this.emit('utxos', utxos); }); /** @@ -1526,7 +1639,7 @@ Peer.prototype.handleGetUTXOs = co(function* handleGetUTXOs(packet) { Peer.prototype.handleHaveWitness = co(function* handleHaveWitness(packet) { this.haveWitness = true; - this.fire('havewitness'); + this.emit('havewitness'); }); /** @@ -1624,19 +1737,58 @@ Peer.prototype.handleGetBlocks = co(function* handleGetBlocks(packet) { /** * Handle `version` packet. * @private - * @param {VersionPacket} + * @param {VersionPacket} packet */ -Peer.prototype.handleVersion = co(function* handleVersion(version) { +Peer.prototype.handleVersion = co(function* handleVersion(packet) { if (this.version) throw new Error('Peer sent a duplicate version.'); - this.relay = version.relay; - this.version = version; + this.version = packet; + this.noRelay = packet.noRelay; + + if (!this.network.selfConnect) { + if (util.equal(packet.nonce, this.pool.localNonce)) + throw new Error('We connected to ourself. Oops.'); + } + + if (packet.version < constants.MIN_VERSION) + throw new Error('Peer does not support required protocol version.'); + + if (this.options.witness) { + this.haveWitness = packet.hasWitness(); + if (!this.haveWitness && this.network.oldWitness) { + try { + yield this.wait(packetTypes.HAVEWITNESS, 10000); + } catch (err) { + ; + } + } + } + + if (this.outbound) { + if (!packet.hasNetwork()) + throw new Error('Peer does not support network services.'); + + if (this.options.headers) { + if (!packet.hasHeaders()) + throw new Error('Peer does not support getheaders.'); + } + + if (this.options.spv) { + if (!packet.hasBloom()) + throw new Error('Peer does not support BIP37.'); + } + + if (this.options.witness) { + if (!this.haveWitness) + throw new Error('Peer does not support segregated witness.'); + } + } this.send(new packets.VerackPacket()); - this.fire('version', version); + this.emit('version', packet); }); /** @@ -1646,7 +1798,8 @@ Peer.prototype.handleVersion = co(function* handleVersion(version) { */ Peer.prototype.handleVerack = co(function* handleVerack(packet) { - this.fire('verack'); + this.ack = true; + this.emit('verack'); }); /** @@ -1686,28 +1839,28 @@ Peer.prototype.handleMempool = co(function* handleMempool(packet) { */ Peer.prototype.getBroadcasted = function getBroadcasted(item) { + var type = item.isTX() ? invTypes.TX : invTypes.BLOCK; var entry = this.pool.invMap[item.hash]; if (!entry) return; + if (type !== entry.type) { + this.logger.debug( + 'Peer requested item with the wrong type (%s).', + this.hostname); + return; + } + this.logger.debug( 'Peer requested %s %s as a %s packet (%s).', - entry.type === invTypes.TX ? 'tx' : 'block', - util.revHex(entry.hash), + item.isTX() ? 'tx' : 'block', + item.rhash(), item.hasWitness() ? 'witness' : 'normal', this.hostname); entry.ack(this); - if (item.isTX()) { - if (entry.type !== invTypes.TX) - return; - } else { - if (entry.type !== invTypes.BLOCK) - return; - } - return entry.msg; }; @@ -1959,7 +2112,7 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) { */ Peer.prototype.handleNotFound = co(function* handleNotFound(packet) { - this.fire('notfound', packet.items); + this.emit('notfound', packet.items); }); /** @@ -1989,7 +2142,7 @@ Peer.prototype.handleAddr = co(function* handleAddr(packet) { this.pool.peers.size(), this.hostname); - this.fire('addr', addrs); + this.emit('addr', addrs); }); /** @@ -1999,7 +2152,7 @@ Peer.prototype.handleAddr = co(function* handleAddr(packet) { */ Peer.prototype.handlePing = co(function* handlePing(packet) { - this.fire('ping', this.minPing); + this.emit('ping', this.minPing); if (packet.nonce) this.send(new packets.PongPacket(packet.nonce)); }); @@ -2040,7 +2193,7 @@ Peer.prototype.handlePong = co(function* handlePong(packet) { this.challenge = null; - this.fire('pong', this.minPing); + this.emit('pong', this.minPing); }); /** @@ -2109,12 +2262,12 @@ Peer.prototype.handleInv = co(function* handleInv(packet) { for (i = 0; i < items.length; i++) { item = items[i]; switch (item.type) { - case invTypes.TX: - txs.push(item.hash); - break; case invTypes.BLOCK: blocks.push(item.hash); break; + case invTypes.TX: + txs.push(item.hash); + break; default: unknown = item.type; continue; @@ -2122,7 +2275,7 @@ Peer.prototype.handleInv = co(function* handleInv(packet) { this.invFilter.add(item.hash, 'hex'); } - this.fire('inv', items); + this.emit('inv', items); if (blocks.length > 0) this.emit('blocks', blocks); @@ -2159,7 +2312,7 @@ Peer.prototype.handleHeaders = co(function* handleHeaders(packet) { return; } - this.fire('headers', headers); + this.emit('headers', headers); }); /** @@ -2170,7 +2323,7 @@ Peer.prototype.handleHeaders = co(function* handleHeaders(packet) { Peer.prototype.handleSendHeaders = co(function* handleSendHeaders(packet) { this.preferHeaders = true; - this.fire('sendheaders'); + this.emit('sendheaders'); }); /** @@ -2187,7 +2340,9 @@ Peer.prototype.handleBlock = co(function* handleBlock(packet) { return; } - this.fire('block', packet.block); + this.lastBlock = util.ms(); + + this.emit('block', packet.block); }); /** @@ -2208,7 +2363,7 @@ Peer.prototype.handleTX = co(function* handleTX(packet) { } } - this.fire('tx', tx); + this.emit('tx', tx); }); /** @@ -2220,7 +2375,7 @@ Peer.prototype.handleTX = co(function* handleTX(packet) { Peer.prototype.handleReject = co(function* handleReject(reject) { var entry; - this.fire('reject', reject); + this.emit('reject', reject); if (!reject.hash) return; @@ -2241,7 +2396,7 @@ Peer.prototype.handleReject = co(function* handleReject(reject) { Peer.prototype.handleAlert = co(function* handleAlert(alert) { this.invFilter.add(alert.hash()); - this.fire('alert', alert); + this.emit('alert', alert); }); /** @@ -2256,7 +2411,7 @@ Peer.prototype.handleEncinit = co(function* handleEncinit(packet) { this.bip151.encinit(packet.publicKey, packet.cipher); - this.fire('encinit', packet); + this.emit('encinit', packet); this.send(this.bip151.toEncack()); }); @@ -2273,7 +2428,7 @@ Peer.prototype.handleEncack = co(function* handleEncack(packet) { this.bip151.encack(packet.publicKey); - this.fire('encack', packet); + this.emit('encack', packet); }); /** @@ -2290,7 +2445,7 @@ Peer.prototype.handleAuthChallenge = co(function* handleAuthChallenge(packet) { sig = this.bip150.challenge(packet.hash); - this.fire('authchallenge', packet.hash); + this.emit('authchallenge', packet.hash); this.send(new packets.AuthReplyPacket(sig)); }); @@ -2312,7 +2467,7 @@ Peer.prototype.handleAuthReply = co(function* handleAuthReply(packet) { if (hash) this.send(new packets.AuthProposePacket(hash)); - this.fire('authreply', packet.signature); + this.emit('authreply', packet.signature); }); /** @@ -2331,7 +2486,7 @@ Peer.prototype.handleAuthPropose = co(function* handleAuthPropose(packet) { this.send(new packets.AuthChallengePacket(hash)); - this.fire('authpropose', packet.hash); + this.emit('authpropose', packet.hash); }); /** @@ -2342,7 +2497,7 @@ Peer.prototype.handleAuthPropose = co(function* handleAuthPropose(packet) { Peer.prototype.handleUnknown = co(function* handleUnknown(packet) { this.logger.warning('Unknown packet: %s.', packet.cmd); - this.fire('unknown', packet); + this.emit('unknown', packet); }); /** @@ -2371,14 +2526,14 @@ Peer.prototype.handleSendCmpct = co(function* handleSendCmpct(packet) { // with both version 1 and 2 (why // would you even _want_ non-witness // blocks if you use segwit??). - if (this.compactMode) + if (this.compactMode !== -1) return; this.logger.info('Peer initialized compact blocks (%s).', this.hostname); - this.compactMode = packet; + this.compactMode = packet.mode; this.compactWitness = packet.version === 2; - this.fire('sendcmpct', packet); + this.emit('sendcmpct', packet); }); /** @@ -2421,7 +2576,8 @@ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) { result = block.fillMempool(this.options.witness, this.mempool); if (result) { - this.fire('block', block.toBlock()); + this.lastBlock = util.ms(); + this.emit('block', block.toBlock()); this.logger.debug( 'Received full compact block %s (%s).', block.rhash(), this.hostname); @@ -2435,16 +2591,6 @@ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) { this.logger.debug( 'Received semi-full compact block %s (%s).', block.rhash(), this.hostname); - - try { - yield block.wait(10000); - } catch (e) { - this.logger.debug( - 'Compact block timed out: %s (%s).', - block.rhash(), this.hostname); - - delete this.compactBlocks[hash]; - } }); /** @@ -2491,7 +2637,7 @@ Peer.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(packet) { this.send(new packets.BlockTxnPacket(res, this.compactWitness)); - this.fire('blocktxn', req); + this.emit('blocktxn', req); }); /** @@ -2509,8 +2655,6 @@ Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(packet) { return; } - block.complete(); - delete this.compactBlocks[res.hash]; if (!block.fillMissing(res)) { @@ -2523,8 +2667,10 @@ Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(packet) { 'Filled compact block %s (%s).', block.rhash(), this.hostname); - this.fire('block', block.toBlock()); - this.fire('getblocktxn', res); + this.lastBlock = util.ms(); + + this.emit('block', block.toBlock()); + this.emit('getblocktxn', res); }); /** @@ -2688,31 +2834,6 @@ Peer.prototype.ban = function ban() { this.pool.ban(this.address); }; -/** - * Mark connection attempt. - */ - -Peer.prototype.markAttempt = function markAttempt() { - this.pool.hosts.markAttempt(this.hostname); -}; - -/** - * Mark connection success. - */ - -Peer.prototype.markSuccess = function markSuccess() { - this.pool.hosts.markSuccess(this.hostname); -}; - -/** - * Mark ack success. - */ - -Peer.prototype.markAck = function markAck() { - assert(this.version); - this.pool.hosts.markAck(this.hostname, this.version.services); -}; - /** * Send a `reject` packet to peer. * @see Framer.reject @@ -2812,6 +2933,8 @@ Peer.prototype.sync = co(function* sync() { this.syncSent = true; + this.lastBlock = util.ms(); + if (this.options.headers) { if (!this.chain.tip.isGenesis()) tip = this.chain.tip.prevBlock; @@ -2829,7 +2952,6 @@ Peer.prototype.sync = co(function* sync() { Peer.prototype.inspect = function inspect() { return '= 4 && this.chain.total % 20 === 0) { this.logger.debug('Status:' + ' ts=%s height=%d highest=%d progress=%s' @@ -1130,8 +994,8 @@ Pool.prototype.handleBlock = co(function* handleBlock(block, peer) { (this.chain.getProgress() * 100).toFixed(2) + '%', this.chain.total, this.chain.orphanCount, - this.activeBlocks, - peer.queueBlock.size, + this.activeRequests, + 0, block.bits, this.peers.size(), this.chain.locker.pending, @@ -1146,17 +1010,17 @@ Pool.prototype.handleBlock = co(function* handleBlock(block, peer) { } }); - /** * Handle a transaction. Attempt to add to mempool. * @private - * @param {TX} tx * @param {Peer} peer + * @param {TX} tx * @returns {Promise} */ -Pool.prototype.handleTX = co(function* handleTX(tx, peer) { - var requested = this.fulfill(tx); +Pool.prototype.handleTX = co(function* handleTX(peer, tx) { + var hash = tx.hash('hex'); + var requested = this.fulfill(peer, hash); var i, missing; if (!requested) { @@ -1178,6 +1042,7 @@ Pool.prototype.handleTX = co(function* handleTX(tx, peer) { if (!this.mempool) { this.emit('tx', tx, peer); + this.scheduleRequests(peer); return; } @@ -1195,30 +1060,29 @@ Pool.prototype.handleTX = co(function* handleTX(tx, peer) { missing.length, peer.hostname); try { - for (i = 0; i < missing.length; i++) - this.getTX(peer, missing[i]); + this.getTX(peer, missing); } catch (e) { this.emit('error', e); } - - this.scheduleRequests(peer); } + this.scheduleRequests(peer); + this.emit('tx', tx, peer); }); /** * Handle `headers` packet from a given peer. * @private - * @param {Headers[]} headers * @param {Peer} peer + * @param {Headers[]} headers * @returns {Promise} */ -Pool.prototype.handleHeaders = co(function* handleHeaders(headers, peer) { +Pool.prototype.handleHeaders = co(function* handleHeaders(peer, headers) { var unlock = yield this.locker.lock(); try { - return yield this._handleHeaders(headers, peer); + return yield this._handleHeaders(peer, headers); } finally { unlock(); } @@ -1228,12 +1092,12 @@ Pool.prototype.handleHeaders = co(function* handleHeaders(headers, peer) { * Handle `headers` packet from * a given peer without a lock. * @private - * @param {Headers[]} headers * @param {Peer} peer + * @param {Headers[]} headers * @returns {Promise} */ -Pool.prototype._handleHeaders = co(function* handleHeaders(headers, peer) { +Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) { var i, ret, header, hash, last; if (!this.options.headers) @@ -1251,13 +1115,6 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(headers, peer) { this.emit('headers', headers); - if (peer.isLoader()) { - // Reset interval to avoid stall behavior. - this.startInterval(); - // Reset timeout to avoid killing the loader. - this.startTimeout(); - } - for (i = 0; i < headers.length; i++) { header = headers[i]; hash = header.hash('hex'); @@ -1298,15 +1155,15 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(headers, peer) { * Handle `inv` packet from peer (containing only BLOCK types). * Potentially request headers if headers mode is enabled. * @private - * @param {Hash[]} hashes * @param {Peer} peer + * @param {Hash[]} hashes * @returns {Promise} */ Pool.prototype.handleBlockInv = co(function* handleBlockInv(hashes, peer) { var unlock = yield this.locker.lock(); try { - return yield this._handleBlockInv(hashes, peer); + return yield this._handleBlockInv(peer, hashes); } finally { unlock(); } @@ -1315,12 +1172,12 @@ Pool.prototype.handleBlockInv = co(function* handleBlockInv(hashes, peer) { /** * Handle `inv` packet from peer without a lock. * @private - * @param {Hash[]} hashes * @param {Peer} peer + * @param {Hash[]} hashes * @returns {Promise} */ -Pool.prototype._handleBlockInv = co(function* handleBlockInv(hashes, peer) { +Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) { var i, hash; if (!this.syncing) @@ -1352,13 +1209,6 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(hashes, peer) { this.emit('blocks', hashes); - if (peer.isLoader()) { - // Reset interval to avoid stall behavior. - this.startInterval(); - // Reset timeout to avoid killing the loader. - this.startTimeout(); - } - for (i = 0; i < hashes.length; i++) { hash = hashes[i]; @@ -1400,38 +1250,27 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(hashes, peer) { /** * Handle peer inv packet (txs). * @private - * @param {Hash[]} hashes * @param {Peer} peer + * @param {Hash[]} hashes */ -Pool.prototype.handleTXInv = function handleTXInv(hashes, peer) { - var i, hash; - +Pool.prototype.handleTXInv = function handleTXInv(peer, hashes) { this.emit('txs', hashes, peer); if (this.syncing && !this.chain.synced) return; - for (i = 0; i < hashes.length; i++) { - hash = hashes[i]; - try { - this.getTX(peer, hash); - } catch (e) { - this.emit('error', e); - } - } - - this.scheduleRequests(peer); + this.getTX(peer, hashes); }; /** * Handle peer reject event. * @private - * @param {RejectPacket} reject * @param {Peer} peer + * @param {RejectPacket} reject */ -Pool.prototype.handleReject = function handleReject(reject, peer) { +Pool.prototype.handleReject = function handleReject(peer, reject) { this.logger.warning( 'Received reject (%s): msg=%s code=%s reason=%s hash=%s.', peer.hostname, @@ -1450,25 +1289,23 @@ Pool.prototype.handleReject = function handleReject(reject, peer) { * @param {Peer} peer */ -Pool.prototype.handleNotFound = function handleNotFound(items, peer) { - var i, item, req; +Pool.prototype.handleNotFound = function handleNotFound(peer, items) { + var i, item; for (i = 0; i < items.length; i++) { item = items[i]; - req = this.requestMap[item.hash]; - if (req && req.peer === peer) - req.finish(new Error('Not found.')); + this.fulfill(peer, item.hash); } }; /** * Handle an alert packet. * @private - * @param {AlertPacket} alert * @param {Peer} peer + * @param {AlertPacket} alert */ -Pool.prototype.handleAlert = function handleAlert(alert, peer) { +Pool.prototype.handleAlert = function handleAlert(peer, alert) { var now = this.network.now(); if (!alert.verify(this.network.alertKey)) { @@ -1542,7 +1379,6 @@ Pool.prototype.hasReject = function hasReject(hash) { */ Pool.prototype.addInbound = function addInbound(socket) { - var self = this; var peer; if (!this.loaded) @@ -1554,9 +1390,7 @@ Pool.prototype.addInbound = function addInbound(socket) { this.peers.add(peer); - util.nextTick(function() { - self.emit('peer', peer); - }); + this.emit('peer', peer); }; /** @@ -1605,7 +1439,6 @@ Pool.prototype.getHost = function getHost(unique) { */ Pool.prototype.addOutbound = function addOutbound() { - var self = this; var peer, addr; if (!this.loaded) @@ -1627,9 +1460,7 @@ Pool.prototype.addOutbound = function addOutbound() { this.peers.add(peer); - util.nextTick(function() { - self.emit('peer', peer); - }); + this.emit('peer', peer); }; /** @@ -1637,10 +1468,13 @@ Pool.prototype.addOutbound = function addOutbound() { * @private */ -Pool.prototype.fillPeers = function fillPeers() { +Pool.prototype.fillOutbound = function fillOutbound() { var need = this.maxOutbound - this.peers.outbound; var i; + if (!this.peers.load) + this.addLoader(); + if (need <= 0) return; @@ -1652,6 +1486,23 @@ Pool.prototype.fillPeers = function fillPeers() { this.addOutbound(); }; +/** + * Attempt to refill the pool with peers (no lock). + * @private + */ + +Pool.prototype.refill = function refill() { + var self = this; + + if (this.pendingRefill != null) + return; + + this.pendingRefill = setTimeout(function() { + self.pendingRefill = null; + self.fillOutbound(); + }, 3000); +}; + /** * Remove a peer from any list. Drop all load requests. * @private @@ -1659,22 +1510,15 @@ Pool.prototype.fillPeers = function fillPeers() { */ Pool.prototype.removePeer = function removePeer(peer) { - var i, hashes, hash, item; - - if (peer.isLoader() && this.syncing) { - this.stopTimeout(); - this.stopInterval(); - } + var i, hashes, hash; this.peers.remove(peer); - hashes = Object.keys(this.requestMap); + hashes = Object.keys(peer.requestMap); for (i = 0; i < hashes.length; i++) { hash = hashes[i]; - item = this.requestMap[hash]; - if (item.peer === peer) - item.finish(new Error('Peer closed.')); + this.fulfill(peer, hash); } }; @@ -1768,7 +1612,18 @@ Pool.prototype.updateWatch = function updateWatch() { */ Pool.prototype.watchAddress = function watchAddress(address) { - this.watch(Address.getHash(address)); + var hash = Address.getHash(address); + assert(hash, 'Bad address.'); + this.watch(hash); +}; + +/** + * Add an outpoint to the bloom filter (SPV-only). + * @param {Outpoint} outpoint + */ + +Pool.prototype.watchOutpoint = function watchOutpoint(outpoint) { + this.watch(outpoint.toRaw()); }; /** @@ -1780,8 +1635,6 @@ Pool.prototype.watchAddress = function watchAddress(address) { */ Pool.prototype.getBlock = function getBlock(peer, hash) { - var item; - if (!this.loaded) return; @@ -1789,14 +1642,12 @@ Pool.prototype.getBlock = function getBlock(peer, hash) { throw new Error('Peer handshake not complete (getdata).'); if (peer.destroyed) - throw new Error('Peer is already destroyed (getdata).'); + throw new Error('Peer is destroyed (getdata).'); if (this.requestMap[hash]) return; - item = new LoadRequest(this, peer, invTypes.BLOCK, hash); - - peer.queueBlock.push(item); + peer.queueMap[hash] = true; }; /** @@ -1827,8 +1678,9 @@ Pool.prototype.hasBlock = co(function* hasBlock(hash) { * @returns {Boolean} */ -Pool.prototype.getTX = function getTX(peer, hash) { - var item; +Pool.prototype.getTX = function getTX(peer, hashes) { + var items = []; + var i, hash; if (!this.loaded) return; @@ -1837,16 +1689,25 @@ Pool.prototype.getTX = function getTX(peer, hash) { throw new Error('Peer handshake not complete (getdata).'); if (peer.destroyed) - throw new Error('Peer is already destroyed (getdata).'); + throw new Error('Peer is destroyed (getdata).'); - if (this.hasTX(hash)) - return true; + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; - item = new LoadRequest(this, peer, invTypes.TX, hash); + if (this.hasTX(hash)) + continue; - peer.queueTX.push(item); + assert(!this.requestMap[hash]); - return false; + this.requestMap[hash] = true; + peer.requestMap[hash] = true; + + this.activeRequests++; + + items.push(hash); + } + + peer.getTX(items); }; /** @@ -1897,7 +1758,6 @@ Pool.prototype.scheduleRequests = co(function* scheduleRequests(peer) { yield this.chain.onDrain(); this.sendBlockRequests(peer); - this.sendTXRequests(peer); this.scheduled = false; }); @@ -1909,82 +1769,74 @@ Pool.prototype.scheduleRequests = co(function* scheduleRequests(peer) { */ Pool.prototype.sendBlockRequests = function sendBlockRequests(peer) { - var i, size, items, item; + var queue = Object.keys(peer.queueMap); + var hashes = []; + var i, size, hash; - if (peer.queueBlock.size === 0) + if (queue.length === 0) return; if (this.options.spv) { - if (this.activeBlocks >= 2000) + if (this.activeRequests >= 2000) return; - size = peer.queueBlock.size; + size = Math.min(queue.length, 50000); } else { size = this.network.getBatchSize(this.chain.height); - if (this.activeBlocks >= size) + if (this.activeRequests >= size) return; } - items = peer.queueBlock.slice(size); + for (i = 0; i < queue.length; i++) { + hash = queue[i]; - for (i = 0; i < items.length; i++) { - item = items[i]; - item.start(); + delete peer.queueMap[hash]; + + if (this.requestMap[hash]) + continue; + + assert(!this.requestMap[hash]); + + this.requestMap[hash] = true; + peer.requestMap[hash] = true; + + this.activeRequests++; + + hashes.push(hash); + + if (hashes.length === size) + break; } this.logger.debug( 'Requesting %d/%d blocks from peer with getdata (%s).', - items.length, - this.activeBlocks, + hashes.length, + this.activeRequests, peer.hostname); - peer.getData(items); + peer.getBlock(hashes); }; /** - * Schedule next batch of `getdata` tx requests for peer. + * Fulfill a requested item. * @param {Peer} peer - * @returns {Promise} + * @param {Hash} hash + * @returns {Boolean} */ -Pool.prototype.sendTXRequests = function sendTXRequests(peer) { - var size = peer.queueTX.size; - var i, items, item; - - if (size === 0) - return; - - items = peer.queueTX.slice(size); - - for (i = 0; i < items.length; i++) { - item = items[i]; - item.start(); - } - - this.logger.debug( - 'Requesting %d/%d txs from peer with getdata (%s).', - size, this.activeTX, peer.hostname); - - peer.getData(items); -}; - -/** - * Fulfill a requested block. - * @param {Hash} - * @returns {LoadRequest|null} - */ - -Pool.prototype.fulfill = function fulfill(data) { - var hash = data.hash('hex'); - var item = this.requestMap[hash]; - - if (!item) +Pool.prototype.fulfill = function fulfill(peer, hash) { + if (!peer.requestMap[hash]) return false; - item.finish(); + delete peer.requestMap[hash]; - return item; + assert(this.requestMap[hash]); + delete this.requestMap[hash]; + + this.activeRequests--; + + return true; }; /** @@ -2007,7 +1859,7 @@ Pool.prototype.broadcast = function broadcast(msg) { } return new Promise(function(resolve, reject) { - item.addCallback(co.wrap(resolve, reject)); + item.addJob(resolve, reject); }); }; @@ -2248,135 +2100,6 @@ PeerList.prototype.destroy = function destroy() { } }; -/** - * Represents an in-flight block or transaction. - * @exports LoadRequest - * @constructor - * @private - * @param {Pool} pool - * @param {Peer} peer - * @param {Number} type - `getdata` type (see {@link constants.inv}). - * @param {Hash} hash - * @returns {Promise} - */ - -function LoadRequest(pool, peer, type, hash) { - if (!(this instanceof LoadRequest)) - return new LoadRequest(pool, peer, type, hash); - - this.pool = pool; - this.peer = peer; - this.type = type; - this.hash = hash; - this.active = false; - this.timeout = null; - this.onTimeout = this._onTimeout.bind(this); - - this.prev = null; - this.next = null; - - assert(!this.pool.requestMap[this.hash]); - this.pool.requestMap[this.hash] = this; -} - -/** - * Destroy load request with an error. - */ - -LoadRequest.prototype.destroy = function destroy() { - return this.finish(); -}; - -/** - * Handle timeout. Potentially kill loader. - * @private - */ - -LoadRequest.prototype._onTimeout = function _onTimeout() { - if (this.type === invTypes.BLOCK && this.peer.isLoader()) { - this.pool.logger.debug( - 'Loader took too long serving a block. Finding a new one.'); - this.peer.destroy(); - } - return this.finish(); -}; - -/** - * Mark the request as in-flight. Start timeout. - */ - -LoadRequest.prototype.start = function start() { - this.timeout = setTimeout(this.onTimeout, this.pool.requestTimeout); - - this.active = true; - this.pool.activeRequest++; - - if (this.type === invTypes.TX) - this.pool.activeTX++; - else - this.pool.activeBlocks++; - - return this; -}; - -/** - * Mark the request as completed. - * Remove from queue and map. Clear timeout. - */ - -LoadRequest.prototype.finish = function finish() { - var entry = this.pool.requestMap[this.hash]; - - if (entry) { - assert(entry === this); - delete this.pool.requestMap[this.hash]; - if (this.active) { - this.active = false; - this.pool.activeRequest--; - if (this.type === invTypes.TX) - this.pool.activeTX--; - else - this.pool.activeBlocks--; - } - } - - if (this.type === invTypes.TX) - this.peer.queueTX.remove(this); - else - this.peer.queueBlock.remove(this); - - if (this.timeout != null) { - clearTimeout(this.timeout); - this.timeout = null; - } -}; - -/** - * Inspect the load request. - * @returns {String} - */ - -LoadRequest.prototype.inspect = function inspect() { - return ''; -}; - -/** - * Convert load request to an inv item. - * @returns {InvItem} - */ - -LoadRequest.prototype.toInv = function toInv() { - var type = this.type === invTypes.BLOCK - ? this.peer.blockType() - : this.peer.txType(); - - return new InvItem(type, this.hash); -}; - /** * Represents an item that is broadcasted via an inv/getdata cycle. * @exports BroadcastItem @@ -2403,21 +2126,18 @@ function BroadcastItem(pool, msg) { this.hash = item.hash; this.type = item.type; this.msg = msg; - this.callback = []; - - this.prev = null; - this.next = null; + this.jobs = []; } util.inherits(BroadcastItem, EventEmitter); /** - * Add a callback to be executed on ack, timeout, or reject. + * Add a job to be executed on ack, timeout, or reject. * @returns {Promise} */ -BroadcastItem.prototype.addCallback = function addCallback(callback) { - this.callback.push(callback); +BroadcastItem.prototype.addJob = function addJob(resolve, reject) { + this.jobs.push(new Job(resolve, reject)); }; /** @@ -2429,7 +2149,6 @@ BroadcastItem.prototype.start = function start() { assert(!this.pool.invMap[this.hash], 'Already started.'); this.pool.invMap[this.hash] = this; - assert(this.pool.invItems.push(this)); this.refresh(); @@ -2478,7 +2197,7 @@ BroadcastItem.prototype.announce = function announce() { */ BroadcastItem.prototype.finish = function finish(err) { - var i; + var i, job; assert(this.timeout, 'Already finished.'); assert(this.pool.invMap[this.hash], 'Already finished.'); @@ -2487,12 +2206,17 @@ BroadcastItem.prototype.finish = function finish(err) { this.timeout = null; delete this.pool.invMap[this.hash]; - assert(this.pool.invItems.remove(this)); - for (i = 0; i < this.callback.length; i++) - this.callback[i](err); + for (i = 0; i < this.jobs.length; i++) { + job = this.jobs[i]; + if (err) { + job.reject(err); + continue; + } + job.resolve(); + } - this.callback.length = 0; + this.jobs.length = 0; }; /** @@ -2502,15 +2226,17 @@ BroadcastItem.prototype.finish = function finish(err) { BroadcastItem.prototype.ack = function ack(peer) { var self = this; - var i; + var i, job; setTimeout(function() { self.emit('ack', peer); - for (i = 0; i < self.callback.length; i++) - self.callback[i](null, true); + for (i = 0; i < self.jobs.length; i++) { + job = self.jobs[i]; + job.resolve(true); + } - self.callback.length = 0; + self.jobs.length = 0; }, 1000); }; @@ -2520,14 +2246,16 @@ BroadcastItem.prototype.ack = function ack(peer) { */ BroadcastItem.prototype.reject = function reject(peer) { - var i; + var i, job; this.emit('reject', peer); - for (i = 0; i < this.callback.length; i++) - this.callback[i](null, false); + for (i = 0; i < this.jobs.length; i++) { + job = this.jobs[i]; + job.resolve(false); + } - this.callback.length = 0; + this.jobs.length = 0; }; /** @@ -2542,6 +2270,16 @@ BroadcastItem.prototype.inspect = function inspect() { + '>'; }; +/** + * Job + * @constructor + */ + +function Job(resolve, reject) { + this.resolve = resolve; + this.reject = reject; +} + /* * Expose */ diff --git a/lib/primitives/invitem.js b/lib/primitives/invitem.js index 098c9de0..c87e7244 100644 --- a/lib/primitives/invitem.js +++ b/lib/primitives/invitem.js @@ -142,6 +142,15 @@ InvItem.prototype.hasWitness = function hasWitness() { return (this.type & constants.WITNESS_MASK) !== 0; }; +/** + * Get little-endian hash. + * @returns {Hash} + */ + +InvItem.prototype.rhash = function() { + return util.revHex(this.hash); +}; + /* * Expose */ diff --git a/package.json b/package.json index e587c4fa..32c16b9a 100644 --- a/package.json +++ b/package.json @@ -56,6 +56,7 @@ "browserify": "13.1.0", "hash.js": "1.0.3", "jsdoc": "3.4.0", + "jshint": "2.9.4", "level-js": "2.2.4", "mocha": "3.0.2", "uglify-js": "2.7.3"