diff --git a/lib/net/peer.js b/lib/net/peer.js index 44cc583e..8eb14e55 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -125,8 +125,10 @@ function Peer(pool, addr, socket) { this.banScore = 0; - this.pingTimeout = null; + this.pingTimer = null; this.pingInterval = 30000; + this.stallTimer = null; + this.stallInterval = 5000; this.requestTimeout = 10000; this.requestMap = {}; @@ -243,7 +245,7 @@ Peer.prototype._init = function init() { }); this.bip151.on('rekey', function() { self.logger.debug('Rekeying with peer (%s).', self.hostname); - self.send(self.bip151.toRekey()); + self.trySend(self.bip151.toRekey()); }); } @@ -286,6 +288,7 @@ Peer.prototype.connect = function connect(port, host) { Peer.prototype.open = co(function* open() { try { yield this._connect(); + yield this._stallify(); yield this._bip151(); yield this._bip150(); yield this._handshake(); @@ -295,11 +298,7 @@ Peer.prototype.open = co(function* open() { return; } - // The peer may have been killed before - // the handshake finished. In this case, - // do not emit `open`. - if (this.destroyed) - return; + assert(!this.destroyed); // Finally we can let the pool know // that this peer is ready to go. @@ -339,6 +338,20 @@ Peer.prototype._connect = function _connect() { }); }; +/** + * Setup stall timer. + * @private + */ + +Peer.prototype._stallify = function _stallify() { + var self = this; + assert(!this.stallTimer); + this.stallTimer = setInterval(function() { + self.maybeStall(); + }, this.stallInterval); + return Promise.resolve(); +}; + /** * Handle `connect` event (called immediately * if a socket was passed into peer). @@ -354,7 +367,7 @@ Peer.prototype._bip151 = co(function* _bip151() { this.logger.info('Attempting BIP151 handshake (%s).', this.hostname); - this.send(this.bip151.toEncinit()); + yield this.send(this.bip151.toEncinit()); try { yield this.bip151.wait(3000); @@ -389,7 +402,7 @@ Peer.prototype._bip150 = co(function* _bip150() { if (this.bip150.outbound) { if (!this.bip150.peerIdentity) throw new Error('No known identity for peer.'); - this.send(this.bip150.toChallenge()); + yield this.send(this.bip150.toChallenge()); } yield this.bip150.wait(3000); @@ -410,13 +423,13 @@ Peer.prototype._bip150 = co(function* _bip150() { Peer.prototype._handshake = co(function* _handshake() { // Say hello. - this.sendVersion(); + yield this.sendVersion(); // Advertise our address. if (this.pool.address.host !== '0.0.0.0' && !this.options.selfish && this.pool.server) { - this.send(new packets.AddrPacket([this.pool.address])); + yield this.send(new packets.AddrPacket([this.pool.address])); } yield this.request('verack'); @@ -446,14 +459,14 @@ Peer.prototype._finalize = co(function* _finalize() { var self = this; // Setup the ping interval. - this.pingTimeout = setInterval(function() { + this.pingTimer = setInterval(function() { self.sendPing(); }, this.pingInterval); // Ask for headers-only. if (this.options.headers) { if (this.version.version >= 70012) - this.send(new packets.SendHeadersPacket()); + yield this.send(new packets.SendHeadersPacket()); } // Let them know we support segwit (old @@ -461,32 +474,30 @@ Peer.prototype._finalize = co(function* _finalize() { // of service bits). if (this.options.witness && this.network.oldWitness) { if (this.version.version >= 70012) - this.send(new packets.HaveWitnessPacket()); + yield this.send(new packets.HaveWitnessPacket()); } // We want compact blocks! if (this.options.compact) { if (this.version.version >= 70014) - this.sendCompact(); + yield this.sendCompact(); } // Find some more peers. - this.send(new packets.GetAddrPacket()); + yield this.send(new packets.GetAddrPacket()); // Relay our spv filter if we have one. - this.updateWatch(); + yield this.updateWatch(); // Announce our currently broadcasted items. - this.announce(this.pool.invItems); + yield this.announce(this.pool.invItems); // Set a fee rate filter. if (this.pool.feeRate !== -1) - this.sendFeeRate(this.pool.feeRate); + yield this.sendFeeRate(this.pool.feeRate); // Start syncing the chain. - this.sync(); - - yield co.wait(); + yield this.sync(); }); /** @@ -498,6 +509,15 @@ Peer.prototype.isLoader = function isLoader() { return this === this.pool.peers.load; }; +/** + * Broadcast items to peer (transactions or blocks). + * @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items + */ + +Peer.prototype.tryAnnounce = function tryAnnounce(items) { + return this.announce().catch(util.nop); +}; + /** * Broadcast items to peer (transactions or blocks). * @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items @@ -658,14 +678,11 @@ Peer.prototype.sendVersion = function sendVersion() { */ Peer.prototype.sendPing = function sendPing() { - if (this.maybeStall()) - return Promise.resolve(); - if (!this.version) return Promise.resolve(); if (this.version.version <= 60000) - return this.send(new packets.PingPacket()); + return this.trySend(new packets.PingPacket()); if (this.challenge) { this.logger.debug('Peer has not responded to ping (%s).', this.hostname); @@ -676,7 +693,7 @@ Peer.prototype.sendPing = function sendPing() { this.lastPing = util.ms(); this.challenge = util.nonce(); - return this.send(new packets.PingPacket(this.challenge)); + return this.trySend(new packets.PingPacket(this.challenge)); }; /** @@ -709,7 +726,7 @@ Peer.prototype.updateWatch = function updateWatch() { if (!this.options.spv) return Promise.resolve(); - return this.send(new packets.FilterLoadPacket(this.pool.spvFilter)); + return this.trySend(new packets.FilterLoadPacket(this.pool.spvFilter)); }; /** @@ -718,7 +735,7 @@ Peer.prototype.updateWatch = function updateWatch() { */ Peer.prototype.sendFeeRate = function sendFeeRate(rate) { - return this.send(new packets.FeeFilterPacket(rate)); + return this.trySend(new packets.FeeFilterPacket(rate)); }; /** @@ -731,7 +748,7 @@ Peer.prototype.destroy = function destroy() { if (this.destroyed) return; - this.finishDrain(); + this.finishDrain(new Error('Peer destroyed.')); this.destroyed = true; this.connected = false; @@ -745,9 +762,14 @@ Peer.prototype.destroy = function destroy() { if (this.bip150) this.bip150.destroy(); - if (this.pingTimeout != null) { - clearInterval(this.pingTimeout); - this.pingTimeout = null; + if (this.pingTimer != null) { + clearInterval(this.pingTimer); + this.pingTimer = null; + } + + if (this.stallTimer != null) { + clearInterval(this.stallTimer); + this.stallTimer = null; } if (this.connectTimeout != null) { @@ -783,7 +805,7 @@ Peer.prototype.destroy = function destroy() { Peer.prototype.write = function write(data) { if (this.destroyed) - return Promise.resolve(); + return Promise.reject(new Error('Peer destroyed (write).')); this.lastSend = util.ms(); @@ -804,7 +826,7 @@ Peer.prototype.onDrain = function onDrain(size) { var self = this; if (this.maybeStall()) - return Promise.resolve(); + return Promise.reject(new Error('Peer stalled (drain).')); this.drainStart = util.now(); this.drainSize += size; @@ -815,7 +837,7 @@ Peer.prototype.onDrain = function onDrain(size) { util.mb(this.drainSize), this.hostname); this.error('Peer stalled.'); - return Promise.resolve(); + return Promise.reject(new Error('Peer stalled (drain).')); } return new Promise(function(resolve, reject) { @@ -857,6 +879,16 @@ Peer.prototype.finishDrain = function finishDrain(err) { jobs[i](err); }; +/** + * Send a packet (no error handling). + * @param {Packet} packet + * @returns {Promise} + */ + +Peer.prototype.trySend = function send(packet) { + return this.send(packet).catch(util.nop); +}; + /** * Send a packet. * @param {Packet} packet @@ -1009,7 +1041,7 @@ Peer.prototype.getData = function getData(items) { data[i] = item; } - return this.send(new packets.GetDataPacket(data)); + return this.trySend(new packets.GetDataPacket(data)); }; /** @@ -2368,7 +2400,7 @@ Peer.prototype.sendAlert = function sendAlert(alert) { if (!this.invFilter.added(alert.hash())) return Promise.resolve(); - return this.send(alert); + return this.trySend(alert); }; /** @@ -2451,7 +2483,7 @@ Peer.prototype.sendMempool = function sendMempool() { 'Requesting inv packet from peer with mempool (%s).', this.hostname); - return this.send(new packets.MempoolPacket()); + return this.trySend(new packets.MempoolPacket()); }; /** @@ -2476,7 +2508,7 @@ Peer.prototype.sendReject = function sendReject(code, reason, obj) { 'Sending reject packet to peer (%s).', this.hostname); - return this.send(reject); + return this.trySend(reject); }; /** @@ -2486,7 +2518,7 @@ Peer.prototype.sendReject = function sendReject(code, reason, obj) { Peer.prototype.sendCompact = function sendCompact() { var version = this.options.witness ? 2 : 1; this.logger.info('Initializing compact blocks (%s).', this.hostname); - return this.send(new packets.SendCmpctPacket(0, version)); + return this.trySend(new packets.SendCmpctPacket(0, version)); }; /** @@ -2562,8 +2594,7 @@ Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) { return; } - // Note: call and forget - this.sendGetBlocks(locator, root); + yield this.sendGetBlocks(locator, root); }); /** @@ -2575,8 +2606,7 @@ Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) { Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) { var locator = yield this.chain.getLocator(tip); - // Note: call and forget - this.sendGetHeaders(locator, stop); + yield this.sendGetHeaders(locator, stop); }); /** @@ -2588,8 +2618,7 @@ Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) { Peer.prototype.getBlocks = co(function* getBlocks(tip, stop) { var locator = yield this.chain.getLocator(tip); - // Note: call and forget - this.sendGetBlocks(locator, stop); + yield this.sendGetBlocks(locator, stop); }); /** @@ -2597,33 +2626,42 @@ Peer.prototype.getBlocks = co(function* getBlocks(tip, stop) { * @returns {Promise} */ -Peer.prototype.sync = function sync() { +Peer.prototype.trySync = function trySync() { + return this.sync().catch(util.nop); +}; + +/** + * Start syncing from peer. + * @returns {Promise} + */ + +Peer.prototype.sync = co(function* sync() { var tip; if (!this.pool.syncing) - return Promise.resolve(); + return; if (!this.ack) - return Promise.resolve(); + return; if (this.syncSent) - return Promise.resolve(); + return; if (!this.version.hasNetwork()) - return Promise.resolve(); + return; if (this.options.witness && !this.version.hasWitness()) - return Promise.resolve(); + return; if (!this.isLoader()) { if (!this.chain.synced) - return Promise.resolve(); + return; } // Ask for the mempool if we're synced. if (this.network.requestMempool) { if (this.isLoader() && this.chain.synced) - this.sendMempool(); + yield this.sendMempool(); } this.syncSent = true; @@ -2632,11 +2670,11 @@ Peer.prototype.sync = function sync() { if (!this.chain.tip.isGenesis()) tip = this.chain.tip.prevBlock; - return this.getHeaders(tip); + return yield this.getHeaders(tip); } - return this.getBlocks(); -}; + return yield this.getBlocks(); +}); /** * Inspect the peer. diff --git a/lib/net/pool.js b/lib/net/pool.js index 5c87e6e0..7ef82094 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -639,7 +639,7 @@ Pool.prototype.setLoader = function setLoader(peer) { this.peers.repurpose(peer); this.fillPeers(); - peer.sync(); + peer.trySync(); util.nextTick(function() { self.emit('loader', peer); @@ -675,10 +675,10 @@ Pool.prototype.sync = function sync() { var i; if (this.peers.load) - this.peers.load.sync(); + this.peers.load.trySync(); for (i = 0; i < this.peers.regular.length; i++) - this.peers.regular[i].sync(); + this.peers.regular[i].trySync(); }; /** @@ -770,7 +770,7 @@ Pool.prototype.__handleHeaders = co(function* _handleHeaders(headers, peer) { last = hash; - yield this.getData(peer, this.blockType, hash); + yield this.getBlock(peer, hash); } // Schedule the getdata's we just added. @@ -830,7 +830,7 @@ Pool.prototype._handleBlocks = co(function* _handleBlocks(hashes, peer) { continue; } - exists = yield this.getData(peer, this.blockType, hash); + exists = yield this.getBlock(peer, hash); // Normally we request the hashContinue. // In the odd case where we already have @@ -1200,7 +1200,7 @@ Pool.prototype.createPeer = function createPeer(addr, socket) { for (i = 0; i < txs.length; i++) { hash = txs[i]; try { - yield self.getData(peer, self.txType, hash); + yield self.getTX(peer, hash); } catch (e) { self.emit('error', e); } @@ -1367,7 +1367,7 @@ Pool.prototype._handleTX = co(function* _handleTX(tx, peer) { if (this.options.requestMissing && missing) { for (i = 0; i < missing.length; i++) - yield this.getData(peer, this.txType, missing[i]); + yield this.getTX(peer, missing[i]); } this.emit('tx', tx, peer); @@ -1545,46 +1545,23 @@ Pool.prototype.watchAddress = function watchAddress(address) { * Queue a `getdata` request to be sent. Checks existence * in the chain before requesting. * @param {Peer} peer - * @param {Number} type - `getdata` type (see {@link constants.inv}). - * @param {Hash} hash - {@link Block} or {@link TX} hash. + * @param {Hash} hash - Block hash. * @returns {Promise} */ -Pool.prototype.getData = co(function* getData(peer, type, hash) { - var self = this; - var item, exists; +Pool.prototype.getBlock = co(function* getBlock(peer, hash) { + var item; if (!this.loaded) return; - exists = yield this.has(peer, type, hash); - - if (exists) - return true; - if (peer.destroyed) throw new Error('Peer is already destroyed (getdata).'); - item = new LoadRequest(this, peer, type, hash); + if (yield this.hasBlock(hash)) + return true; - if (type === this.txType) { - if (peer.queueTX.length === 0) { - util.nextTick(function() { - self.logger.debug( - 'Requesting %d/%d txs from peer with getdata (%s).', - peer.queueTX.length, - self.activeTX, - peer.hostname); - - peer.getData(peer.queueTX); - peer.queueTX.length = 0; - }); - } - - peer.queueTX.push(item.start()); - - return false; - } + item = new LoadRequest(this, peer, this.blockType, hash); peer.queueBlock.push(item); @@ -1592,60 +1569,97 @@ Pool.prototype.getData = co(function* getData(peer, type, hash) { }); /** - * Test whether the pool has or has seen an item. + * Test whether the chain has or has seen an item. * @param {Peer} peer * @param {InvType} type * @param {Hash} hash * @returns {Promise} - Returns Boolean. */ -Pool.prototype.has = co(function* has(peer, type, hash) { - var exists = yield this.exists(type, hash); - - if (exists) +Pool.prototype.hasBlock = co(function* hasBlock(hash) { + // Check the chain. + if (yield this.chain.has(hash)) return true; // Check the pending requests. if (this.requestMap[hash]) return true; - if (type !== this.txType) - return false; - - // If we recently rejected this item. Ignore. - if (this.hasReject(hash)) { - this.logger.spam( - 'Peer sent a known reject of %s (%s).', - util.revHex(hash), peer.hostname); - return true; - } - return false; }); /** - * Test whether the chain or mempool has seen an item. + * Queue a `getdata` request to be sent. Checks existence + * in the mempool before requesting. + * @param {Peer} peer + * @param {Hash} hash - TX hash. + * @returns {Promise} + */ + +Pool.prototype.getTX = function getTX(peer, hash) { + var self = this; + var item; + + if (!this.loaded) + return; + + if (peer.destroyed) + throw new Error('Peer is already destroyed (getdata).'); + + if (this.hasTX(hash)) + return true; + + item = new LoadRequest(this, peer, this.txType, hash); + + if (peer.queueTX.length === 0) { + util.nextTick(function() { + self.logger.debug( + 'Requesting %d/%d txs from peer with getdata (%s).', + peer.queueTX.length, + self.activeTX, + peer.hostname); + + peer.getData(peer.queueTX); + peer.queueTX.length = 0; + }); + } + + peer.queueTX.push(item.start()); + + return false; +}; + +/** + * Test whether the mempool has or has seen an item. + * @param {Peer} peer * @param {InvType} type * @param {Hash} hash * @returns {Promise} - Returns Boolean. */ -Pool.prototype.exists = function exists(type, hash) { - if (type === this.txType) { +Pool.prototype.hasTX = function hasTX(hash) { + if (!this.mempool) { // Check the TX filter if // we don't have a mempool. - if (!this.mempool) { - if (this.txFilter.added(hash, 'hex')) - return Promise.resolve(false); - return Promise.resolve(true); - } - + if (!this.txFilter.added(hash, 'hex')) + return true; + } else { // Check the mempool. - return Promise.resolve(this.mempool.has(hash)); + if (this.mempool.has(hash)) + return true; } - // Check the chain. - return this.chain.has(hash); + // If we recently rejected this item. Ignore. + if (this.hasReject(hash)) { + this.logger.spam('Saw known reject of %s.', util.revHex(hash)); + return true; + } + + // Check the pending requests. + if (this.requestMap[hash]) + return true; + + return false; }; /** @@ -1769,10 +1783,10 @@ Pool.prototype.announce = function announce(msg) { var i; if (this.peers.load) - this.peers.load.announce(msg); + this.peers.load.tryAnnounce(msg); for (i = 0; i < this.peers.regular.length; i++) - this.peers.regular[i].announce(msg); + this.peers.regular[i].tryAnnounce(msg); }; /** @@ -2499,7 +2513,7 @@ BroadcastItem.prototype.refresh = function refresh() { */ BroadcastItem.prototype.announce = function announce() { - this.pool.announce(this); + return this.pool.announce(this); }; /**