diff --git a/lib/net/peer.js b/lib/net/peer.js index 07616b9b..d0a8e790 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -153,15 +153,6 @@ function Peer(pool, addr, socket) { this.port = addr.port; this.hostname = addr.hostname; - if (!socket) { - this.socket = this.connect(this.port, this.host); - this.outbound = true; - } else { - this.socket = socket; - this.connected = true; - this.pending = false; - } - if (this.options.bip151) { this.bip151 = new BIP151(); if (this.options.bip150) { @@ -178,6 +169,16 @@ function Peer(pool, addr, socket) { this.parser = new Parser(this); this.framer = new Framer(this); + if (!socket) { + this.socket = this.connect(this.port, this.host); + this.outbound = true; + } else { + this.socket = socket; + this.ts = util.now(); + this.connected = true; + this.pending = false; + } + this._init(); } @@ -222,7 +223,7 @@ Peer.prototype._init = function init() { }); this.socket.on('drain', function() { - self.finishDrain(); + self.drainSize = 0; }); this.socket.on('data', function(chunk) { @@ -252,7 +253,7 @@ Peer.prototype._init = function init() { }); this.bip151.on('rekey', function() { self.logger.debug('Rekeying with peer (%s).', self.hostname); - self.trySend(self.bip151.toRekey()); + self.send(self.bip151.toRekey()); }); } @@ -374,7 +375,7 @@ Peer.prototype._bip151 = co(function* _bip151() { this.logger.info('Attempting BIP151 handshake (%s).', this.hostname); - yield this.send(this.bip151.toEncinit()); + this.send(this.bip151.toEncinit()); try { yield this.bip151.wait(3000); @@ -409,7 +410,7 @@ Peer.prototype._bip150 = co(function* _bip150() { if (this.bip150.outbound) { if (!this.bip150.peerIdentity) throw new Error('No known identity for peer.'); - yield this.send(this.bip150.toChallenge()); + this.send(this.bip150.toChallenge()); } yield this.bip150.wait(3000); @@ -430,13 +431,13 @@ Peer.prototype._bip150 = co(function* _bip150() { Peer.prototype._handshake = co(function* _handshake() { // Say hello. - yield this.sendVersion(); + this.sendVersion(); // Advertise our address. if (this.pool.address.host !== '0.0.0.0' && !this.options.selfish && this.pool.server) { - yield this.send(new packets.AddrPacket([this.pool.address])); + this.send(new packets.AddrPacket([this.pool.address])); } yield this.request('verack'); @@ -462,7 +463,7 @@ Peer.prototype._handshake = co(function* _handshake() { * @private */ -Peer.prototype._finalize = co(function* _finalize() { +Peer.prototype._finalize = function _finalize() { var self = this; // Setup the ping interval. @@ -473,7 +474,7 @@ Peer.prototype._finalize = co(function* _finalize() { // Ask for headers-only. if (this.options.headers) { if (this.version.version >= 70012) - yield this.send(new packets.SendHeadersPacket()); + this.send(new packets.SendHeadersPacket()); } // Let them know we support segwit (old @@ -481,31 +482,33 @@ Peer.prototype._finalize = co(function* _finalize() { // of service bits). if (this.options.witness && this.network.oldWitness) { if (this.version.version >= 70012) - yield this.send(new packets.HaveWitnessPacket()); + this.send(new packets.HaveWitnessPacket()); } // We want compact blocks! if (this.options.compact) { if (this.version.version >= 70014) - yield this.sendCompact(); + this.sendCompact(); } // Find some more peers. - yield this.send(new packets.GetAddrPacket()); + this.send(new packets.GetAddrPacket()); // Relay our spv filter if we have one. - yield this.updateWatch(); + this.updateWatch(); // Announce our currently broadcasted items. - yield this.announce(this.pool.invItems.toArray()); + this.announceList(); // Set a fee rate filter. if (this.pool.feeRate !== -1) - yield this.sendFeeRate(this.pool.feeRate); + this.sendFeeRate(this.pool.feeRate); // Start syncing the chain. - yield this.sync(); -}); + this.sync(); + + return Promise.resolve(); +}; /** * Test whether the peer is the loader peer. @@ -517,23 +520,140 @@ Peer.prototype.isLoader = function isLoader() { }; /** - * Broadcast items to peer (transactions or blocks). - * @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items + * Broadcast blocks to peer. + * @param {AbstractBlock[]} blocks */ -Peer.prototype.tryAnnounce = function tryAnnounce(items) { - return this.announce(items).catch(util.nop); +Peer.prototype.announceBlock = function announceBlock(blocks) { + var inv = []; + var i, block; + + if (this.destroyed) + return; + + if (!Array.isArray(blocks)) + blocks = [blocks]; + + for (i = 0; i < blocks.length; i++) { + block = blocks[i]; + + assert(block instanceof Block); + + // Don't send if they already have it. + if (this.invFilter.test(block.hash())) + continue; + + // Send them the block immediately if + // they're using compact block mode 1. + if (this.compactMode && this.compactMode.mode === 1) { + this.invFilter.add(block.hash()); + this._sendCompactBlock(block, this.compactWitness); + continue; + } + + // Convert item to block headers + // for peers that request it. + if (this.preferHeaders) { + inv.push(block.toHeaders()); + continue; + } + + inv.push(block.toInv()); + } + + if (this.preferHeaders) { + this.sendHeaders(inv); + return; + } + + this.sendInv(inv); }; /** - * Broadcast items to peer (transactions or blocks). - * @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items + * Broadcast transactions to peer. + * @param {TX[]} txs */ -Peer.prototype.announce = co(function* announce(items) { +Peer.prototype.announceTX = function announceTX(txs) { var inv = []; - var headers = []; - var i, item, entry; + var i, tx, entry; + + if (this.destroyed) + return; + + // Do not send txs to spv clients + // that have relay unset. + if (!this.relay) + return; + + if (!Array.isArray(txs)) + txs = [txs]; + + for (i = 0; i < txs.length; i++) { + tx = txs[i]; + + assert(tx instanceof TX); + + // Check the peer's bloom + // filter if they're using spv. + if (this.spvFilter) { + if (!tx.isWatched(this.spvFilter)) + continue; + } + + if (this.feeRate !== -1 && this.mempool) { + entry = this.mempool.getEntry(tx.hash('hex')); + if (entry && entry.getRate() < this.feeRate) + continue; + } + + // Don't send if they already have it. + if (this.invFilter.test(tx.hash())) + continue; + + inv.push(tx.toInv()); + } + + this.sendInv(inv); +}; + +/** + * Announce broadcast list to peer. + */ + +Peer.prototype.announceList = function announceList() { + var txs = []; + var blocks = []; + var item; + + for (item = this.pool.invItems.head; item; item = item.next) { + switch (item.type) { + case constants.inv.TX: + txs.push(item.msg); + break; + case constants.inv.BLOCK: + blocks.push(item.msg); + break; + default: + assert(false, 'Bad item type.'); + break; + } + } + + if (blocks.length > 0) + this.announceBlock(blocks); + + if (txs.length > 0) + this.announceTX(txs); +}; + +/** + * Send inv to a peer. + * @param {InvItem[]} items + */ + +Peer.prototype.sendInv = function sendInv(items) { + var i, item, chunk; if (this.destroyed) return; @@ -543,82 +663,9 @@ Peer.prototype.announce = co(function* announce(items) { for (i = 0; i < items.length; i++) { item = items[i]; - - // Check the peer's bloom - // filter if they're using spv. - if (!this.isWatched(item)) - continue; - - // Send them the block immediately if - // they're using compact block mode 1. - if (this.compactMode && this.compactMode.mode === 1) { - if (item instanceof Block) { - if (!this.invFilter.added(item.hash())) - continue; - yield this._sendCompactBlock(item, this.compactWitness); - continue; - } - } - - // Convert item to block headers - // for peers that request it. - if (this.preferHeaders && item.toHeaders) { - item = item.toHeaders(); - if (this.invFilter.test(item.hash())) - continue; - headers.push(item); - continue; - } - - if (item.toInv) - item = item.toInv(); - - // Do not send txs to spv clients - // that have relay unset. - if (!this.relay) { - if (item.type === constants.inv.TX) - continue; - } - - // Filter according to peer's fee filter. - if (this.feeRate !== -1 && this.mempool) { - if (item.type === constants.inv.TX) { - entry = this.mempool.getEntry(item.hash); - if (entry && entry.getRate() < this.feeRate) - continue; - } - } - - // Don't send if they already have it. - if (this.invFilter.test(item.hash, 'hex')) - continue; - - inv.push(item); + this.invFilter.add(item.hash, 'hex'); } - yield this.sendInv(inv); - - if (headers.length > 0) - yield this.sendHeaders(headers); -}); - -/** - * Send inv to a peer. - * @param {InvItem[]} items - */ - -Peer.prototype.sendInv = co(function* sendInv(items) { - var i, chunk; - - if (this.destroyed) - return; - - if (!Array.isArray(items)) - items = [items]; - - for (i = 0; i < items.length; i++) - this.invFilter.add(items[i].hash, 'hex'); - if (items.length === 0) return; @@ -627,17 +674,17 @@ Peer.prototype.sendInv = co(function* sendInv(items) { for (i = 0; i < items.length; i += 50000) { chunk = items.slice(i, i + 50000); - yield this.send(new packets.InvPacket(chunk)); + this.send(new packets.InvPacket(chunk)); } -}); +}; /** * Send headers to a peer. * @param {Headers[]} items */ -Peer.prototype.sendHeaders = co(function* sendHeaders(items) { - var i, chunk; +Peer.prototype.sendHeaders = function sendHeaders(items) { + var i, item, chunk; if (this.destroyed) return; @@ -645,8 +692,10 @@ Peer.prototype.sendHeaders = co(function* sendHeaders(items) { if (!Array.isArray(items)) items = [items]; - for (i = 0; i < items.length; i++) - this.invFilter.add(items[i].hash()); + for (i = 0; i < items.length; i++) { + item = items[i]; + this.invFilter.add(item.hash()); + } if (items.length === 0) return; @@ -656,28 +705,25 @@ Peer.prototype.sendHeaders = co(function* sendHeaders(items) { for (i = 0; i < items.length; i += 2000) { chunk = items.slice(i, i + 2000); - yield this.send(new packets.HeadersPacket(chunk)); + this.send(new packets.HeadersPacket(chunk)); } -}); +}; /** * Send a `version` packet. */ Peer.prototype.sendVersion = function sendVersion() { - var packet = new packets.VersionPacket({ - version: constants.VERSION, - services: this.pool.services, - ts: this.network.now(), - recv: new NetworkAddress(), - from: this.pool.address, - nonce: this.pool.localNonce, - agent: constants.USER_AGENT, - height: this.chain.height, - relay: this.options.relay - }); - - return this.send(packet); + var packet = new packets.VersionPacket(); + packet.version = constants.VERSION; + packet.services = this.pool.services; + packet.ts = this.network.now(); + packet.from = this.pool.address; + packet.nonce = this.pool.localNonce; + packet.agent = constants.USER_AGENT; + packet.height = this.chain.height; + packet.relay = this.options.relay; + this.send(packet); }; /** @@ -686,43 +732,23 @@ Peer.prototype.sendVersion = function sendVersion() { Peer.prototype.sendPing = function sendPing() { if (!this.version) - return Promise.resolve(); + return; - if (this.version.version <= 60000) - return this.trySend(new packets.PingPacket()); + if (this.version.version <= 60000) { + this.send(new packets.PingPacket()); + return; + } if (this.challenge) { this.logger.debug('Peer has not responded to ping (%s).', this.hostname); this.destroy(); - return Promise.resolve(); + return; } this.lastPing = util.ms(); this.challenge = util.nonce(); - return this.trySend(new packets.PingPacket(this.challenge)); -}; - -/** - * Test whether an is being watched by the peer. - * @param {BroadcastItem|TX} item - * @returns {Boolean} - */ - -Peer.prototype.isWatched = function isWatched(item) { - if (!this.spvFilter) - return true; - - if (!item) - return true; - - if (item instanceof TX) - return item.isWatched(this.spvFilter); - - if (item.msg instanceof TX) - return item.msg.isWatched(this.spvFilter); - - return true; + this.send(new packets.PingPacket(this.challenge)); }; /** @@ -731,9 +757,9 @@ Peer.prototype.isWatched = function isWatched(item) { Peer.prototype.updateWatch = function updateWatch() { if (!this.options.spv) - return Promise.resolve(); + return; - return this.trySend(new packets.FilterLoadPacket(this.pool.spvFilter)); + this.send(new packets.FilterLoadPacket(this.pool.spvFilter)); }; /** @@ -742,7 +768,7 @@ Peer.prototype.updateWatch = function updateWatch() { */ Peer.prototype.sendFeeRate = function sendFeeRate(rate) { - return this.trySend(new packets.FeeFilterPacket(rate)); + this.send(new packets.FeeFilterPacket(rate)); }; /** @@ -755,7 +781,7 @@ Peer.prototype.destroy = function destroy() { if (this.destroyed) return; - this.finishDrain(new Error('Peer destroyed.')); + this.drainSize = 0; this.destroyed = true; this.connected = false; @@ -812,44 +838,40 @@ Peer.prototype.destroy = function destroy() { Peer.prototype.write = function write(data) { if (this.destroyed) - return Promise.reject(new Error('Peer destroyed (write).')); + return; this.lastSend = util.ms(); if (this.socket.write(data) === false) - return this.onDrain(data.length); - - return Promise.resolve(); + this.needsDrain(data.length); }; /** - * Wait for drain. + * Add to drain counter. * @private * @param {Function} resolve * @param {Function} reject */ -Peer.prototype.onDrain = function onDrain(size) { +Peer.prototype.needsDrain = function needsDrain(size) { var self = this; - if (this.maybeStall()) - return Promise.reject(new Error('Peer stalled (drain).')); + if (this.maybeStall()) { + this.error('Peer stalled (drain).'); + return; + } this.drainStart = util.now(); this.drainSize += size; - if (this.drainSize >= (10 << 20)) { + if (this.drainSize >= (5 << 20)) { this.logger.warning( 'Peer is not reading: %s buffered (%s).', util.mb(this.drainSize), this.hostname); - this.error('Peer stalled.'); - return Promise.reject(new Error('Peer stalled (drain).')); + this.error('Peer stalled (drain).'); + return; } - - return new Promise(function(resolve, reject) { - self.drainQueue.push(co.wrap(resolve, reject)); - }); }; /** @@ -858,44 +880,18 @@ Peer.prototype.onDrain = function onDrain(size) { */ Peer.prototype.maybeStall = function maybeStall() { - if (this.drainQueue.length === 0) + if (this.drainSize === 0) return false; if (util.now() < this.drainStart + 10) return false; - this.finishDrain(new Error('Peer stalled.')); + this.drainSize = 0; this.error('Peer stalled.'); return true; }; -/** - * Notify drainers of the latest drain. - * @private - */ - -Peer.prototype.finishDrain = function finishDrain(err) { - var jobs = this.drainQueue.slice(); - var i; - - this.drainQueue.length = 0; - this.drainSize = 0; - - for (i = 0; i < jobs.length; i++) - jobs[i](err); -}; - -/** - * Send a packet (no error handling). - * @param {Packet} packet - * @returns {Promise} - */ - -Peer.prototype.trySend = function trySend(packet) { - return this.send(packet).catch(util.nop); -}; - /** * Send a packet. * @param {Packet} packet @@ -917,7 +913,7 @@ Peer.prototype.send = function send(packet) { } } - return this.sendRaw(packet.cmd, packet.toRaw(), checksum); + this.sendRaw(packet.cmd, packet.toRaw(), checksum); }; /** @@ -928,7 +924,7 @@ Peer.prototype.send = function send(packet) { Peer.prototype.sendRaw = function sendRaw(cmd, body, checksum) { var payload = this.framer.packet(cmd, body, checksum); - return this.write(payload); + this.write(payload); }; /** @@ -1048,7 +1044,7 @@ Peer.prototype.getData = function getData(items) { data[i] = item; } - return this.trySend(new packets.GetDataPacket(data)); + this.send(new packets.GetDataPacket(data)); }; /** @@ -1114,67 +1110,67 @@ Peer.prototype.__onPacket = co(function* onPacket(packet) { case packetTypes.VERSION: return yield this._handleVersion(packet); case packetTypes.VERACK: - return this._handleVerack(packet); + return yield this._handleVerack(packet); case packetTypes.PING: return yield this._handlePing(packet); case packetTypes.PONG: - return this._handlePong(packet); + return yield this._handlePong(packet); case packetTypes.ALERT: return this._handleAlert(packet); case packetTypes.GETADDR: return yield this._handleGetAddr(packet); case packetTypes.ADDR: - return this._handleAddr(packet); + return yield this._handleAddr(packet); case packetTypes.INV: - return this._handleInv(packet); + return yield this._handleInv(packet); case packetTypes.GETDATA: return yield this._handleGetData(packet); case packetTypes.NOTFOUND: - return this._handleNotFound(packet); + return yield this._handleNotFound(packet); case packetTypes.GETBLOCKS: return yield this._handleGetBlocks(packet); case packetTypes.GETHEADERS: return yield this._handleGetHeaders(packet); case packetTypes.HEADERS: - return this._handleHeaders(packet); + return yield this._handleHeaders(packet); case packetTypes.SENDHEADERS: - return this._handleSendHeaders(packet); + return yield this._handleSendHeaders(packet); case packetTypes.BLOCK: - return this._handleBlock(packet); + return yield this._handleBlock(packet); case packetTypes.TX: - return this._handleTX(packet); + return yield this._handleTX(packet); case packetTypes.REJECT: - return this._handleReject(packet); + return yield this._handleReject(packet); case packetTypes.MEMPOOL: return yield this._handleMempool(packet); case packetTypes.FILTERLOAD: - return this._handleFilterLoad(packet); + return yield this._handleFilterLoad(packet); case packetTypes.FILTERADD: - return this._handleFilterAdd(packet); + return yield this._handleFilterAdd(packet); case packetTypes.FILTERCLEAR: - return this._handleFilterClear(packet); + return yield this._handleFilterClear(packet); case packetTypes.MERKLEBLOCK: - return this._handleMerkleBlock(packet); + return yield this._handleMerkleBlock(packet); case packetTypes.GETUTXOS: return yield this._handleGetUTXOs(packet); case packetTypes.UTXOS: - return this._handleUTXOs(packet); + return yield this._handleUTXOs(packet); case packetTypes.HAVEWITNESS: - return this._handleHaveWitness(packet); + return yield this._handleHaveWitness(packet); case packetTypes.FEEFILTER: - return this._handleFeeFilter(packet); + return yield this._handleFeeFilter(packet); case packetTypes.SENDCMPCT: - return this._handleSendCmpct(packet); + return yield this._handleSendCmpct(packet); case packetTypes.CMPCTBLOCK: return yield this._handleCmpctBlock(packet); case packetTypes.GETBLOCKTXN: return yield this._handleGetBlockTxn(packet); case packetTypes.BLOCKTXN: - return this._handleBlockTxn(packet); + return yield this._handleBlockTxn(packet); case packetTypes.ENCINIT: return yield this._handleEncinit(packet); case packetTypes.ENCACK: - return this._handleEncack(packet); + return yield this._handleEncack(packet); case packetTypes.AUTHCHALLENGE: return yield this._handleAuthChallenge(packet); case packetTypes.AUTHREPLY: @@ -1182,7 +1178,7 @@ Peer.prototype.__onPacket = co(function* onPacket(packet) { case packetTypes.AUTHPROPOSE: return yield this._handleAuthPropose(packet); case packetTypes.UNKNOWN: - return this._handleUnknown(packet); + return yield this._handleUnknown(packet); default: assert(false, 'Bad packet type.'); break; @@ -1219,7 +1215,7 @@ Peer.prototype.fire = function fire(cmd, payload) { * @param {FilterLoadPacket} */ -Peer.prototype._handleFilterLoad = function _handleFilterLoad(packet) { +Peer.prototype._handleFilterLoad = co(function* _handleFilterLoad(packet) { if (!packet.isWithinConstraints()) { this.setMisbehavior(100); return; @@ -1227,7 +1223,7 @@ Peer.prototype._handleFilterLoad = function _handleFilterLoad(packet) { this.spvFilter = packet.filter; this.relay = true; -}; +}); /** * Handle `filteradd` packet. @@ -1235,7 +1231,7 @@ Peer.prototype._handleFilterLoad = function _handleFilterLoad(packet) { * @param {FilterAddPacket} */ -Peer.prototype._handleFilterAdd = function _handleFilterAdd(packet) { +Peer.prototype._handleFilterAdd = co(function* _handleFilterAdd(packet) { var data = packet.data; if (data.length > constants.script.MAX_PUSH) { @@ -1247,7 +1243,7 @@ Peer.prototype._handleFilterAdd = function _handleFilterAdd(packet) { this.spvFilter.add(data); this.relay = true; -}; +}); /** * Handle `filterclear` packet. @@ -1255,12 +1251,12 @@ Peer.prototype._handleFilterAdd = function _handleFilterAdd(packet) { * @param {FilterClearPacket} */ -Peer.prototype._handleFilterClear = function _handleFilterClear(packet) { +Peer.prototype._handleFilterClear = co(function* _handleFilterClear(packet) { if (this.spvFilter) this.spvFilter.reset(); this.relay = true; -}; +}); /** * Handle `merkleblock` packet. @@ -1268,7 +1264,7 @@ Peer.prototype._handleFilterClear = function _handleFilterClear(packet) { * @param {MerkleBlockPacket} */ -Peer.prototype._handleMerkleBlock = function _handleMerkleBlock(packet) { +Peer.prototype._handleMerkleBlock = co(function* _handleMerkleBlock(packet) { var block = packet.block; block.verifyPartial(); @@ -1278,7 +1274,7 @@ Peer.prototype._handleMerkleBlock = function _handleMerkleBlock(packet) { if (this.waiting === 0) this._flushMerkle(); -}; +}); /** * Handle `feefilter` packet. @@ -1286,7 +1282,7 @@ Peer.prototype._handleMerkleBlock = function _handleMerkleBlock(packet) { * @param {FeeFilterPacket} */ -Peer.prototype._handleFeeFilter = function _handleFeeFilter(packet) { +Peer.prototype._handleFeeFilter = co(function* _handleFeeFilter(packet) { var rate = packet.rate; if (!(rate >= 0 && rate <= constants.MAX_MONEY)) { @@ -1297,7 +1293,7 @@ Peer.prototype._handleFeeFilter = function _handleFeeFilter(packet) { this.feeRate = rate; this.fire('feefilter', rate); -}; +}); /** * Handle `utxos` packet. @@ -1305,11 +1301,11 @@ Peer.prototype._handleFeeFilter = function _handleFeeFilter(packet) { * @param {UTXOsPacket} */ -Peer.prototype._handleUTXOs = function _handleUTXOs(utxos) { +Peer.prototype._handleUTXOs = co(function* _handleUTXOs(utxos) { this.logger.debug('Received %d utxos (%s).', utxos.coins.length, this.hostname); this.fire('utxos', utxos); -}; +}); /** * Handle `getutxos` packet. @@ -1367,7 +1363,7 @@ Peer.prototype._handleGetUTXOs = co(function* _handleGetUTXOs(packet) { utxos.height = this.chain.height; utxos.tip = this.chain.tip.hash; - yield this.send(utxos); + this.send(utxos); }); /** @@ -1376,10 +1372,10 @@ Peer.prototype._handleGetUTXOs = co(function* _handleGetUTXOs(packet) { * @param {HaveWitnessPacket} */ -Peer.prototype._handleHaveWitness = function _handleHaveWitness(packet) { +Peer.prototype._handleHaveWitness = co(function* _handleHaveWitness(packet) { this.haveWitness = true; this.fire('havewitness'); -}; +}); /** * Handle `getheaders` packet. @@ -1426,7 +1422,7 @@ Peer.prototype._handleGetHeaders = co(function* _handleGetHeaders(packet) { entry = yield entry.getNext(); } - yield this.sendHeaders(headers); + this.sendHeaders(headers); }); /** @@ -1470,7 +1466,7 @@ Peer.prototype._handleGetBlocks = co(function* _handleGetBlocks(packet) { hash = yield this.chain.db.getNextHash(hash); } - yield this.sendInv(blocks); + this.sendInv(blocks); }); /** @@ -1541,7 +1537,7 @@ Peer.prototype._handleVersion = co(function* _handleVersion(version) { this.fire('version', version); - yield this.send(new packets.VerackPacket()); + this.send(new packets.VerackPacket()); }); /** @@ -1550,9 +1546,9 @@ Peer.prototype._handleVersion = co(function* _handleVersion(version) { * @param {VerackPacket} */ -Peer.prototype._handleVerack = function _handleVerack(packet) { +Peer.prototype._handleVerack = co(function* _handleVerack(packet) { this.fire('verack'); -}; +}); /** * Handle `mempool` packet. @@ -1560,7 +1556,7 @@ Peer.prototype._handleVerack = function _handleVerack(packet) { * @param {MempoolPacket} */ -Peer.prototype._handleMempool = function _handleMempool(packet) { +Peer.prototype._handleMempool = co(function* _handleMempool(packet) { var items = []; var i, hashes; @@ -1580,8 +1576,8 @@ Peer.prototype._handleMempool = function _handleMempool(packet) { this.logger.debug('Sending mempool snapshot (%s).', this.hostname); - return this.sendInv(items); -}; + this.sendInv(items); +}); /** * Get a block/tx from the broadcast map. @@ -1659,7 +1655,7 @@ Peer.prototype._sendBlock = co(function* _sendBlock(item, witness) { // Check for a broadcasted item first. if (block) { - yield this.send(new packets.BlockPacket(block, witness)); + this.send(new packets.BlockPacket(block, witness)); return true; } @@ -1687,7 +1683,7 @@ Peer.prototype._sendBlock = co(function* _sendBlock(item, witness) { if (!block) return false; - yield this.send(new packets.BlockPacket(block, witness)); + this.send(new packets.BlockPacket(block, witness)); return true; }); @@ -1711,7 +1707,7 @@ Peer.prototype._sendCompactBlock = function _sendCompactBlock(block, witness) { break; } - return this.send(new packets.CmpctBlockPacket(block, witness)); + this.send(new packets.CmpctBlockPacket(block, witness)); }; /** @@ -1752,7 +1748,7 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) { continue; } - yield this.send(new packets.TXPacket(tx, item.hasWitness())); + this.send(new packets.TXPacket(tx, item.hasWitness())); txs++; @@ -1785,11 +1781,11 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) { block = block.toMerkle(this.spvFilter); - yield this.send(new packets.MerkleBlockPacket(block)); + this.send(new packets.MerkleBlockPacket(block)); for (j = 0; j < block.txs.length; j++) { tx = block.txs[j]; - yield this.send(new packets.TXPacket(tx, item.hasWitness())); + this.send(new packets.TXPacket(tx, item.hasWitness())); txs++; } @@ -1828,13 +1824,13 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) { } if (item.hash === this.hashContinue) { - yield this.sendInv(new InvItem(constants.inv.BLOCK, this.chain.tip.hash)); + this.sendInv([new InvItem(constants.inv.BLOCK, this.chain.tip.hash)]); this.hashContinue = null; } } if (notFound.length > 0) - yield this.send(new packets.NotFoundPacket(notFound)); + this.send(new packets.NotFoundPacket(notFound)); if (txs > 0) { this.logger.debug( @@ -1861,9 +1857,9 @@ Peer.prototype._handleGetData = co(function* _handleGetData(packet) { * @param {NotFoundPacket} */ -Peer.prototype._handleNotFound = function _handleNotFound(packet) { +Peer.prototype._handleNotFound = co(function* _handleNotFound(packet) { this.fire('notfound', packet.items); -}; +}); /** * Handle `addr` packet. @@ -1871,7 +1867,7 @@ Peer.prototype._handleNotFound = function _handleNotFound(packet) { * @param {AddrPacket} */ -Peer.prototype._handleAddr = function _handleAddr(packet) { +Peer.prototype._handleAddr = co(function* _handleAddr(packet) { var now = this.network.now(); var addrs = packet.items; var i, addr; @@ -1893,7 +1889,7 @@ Peer.prototype._handleAddr = function _handleAddr(packet) { this.hostname); this.fire('addr', addrs); -}; +}); /** * Handle `ping` packet. @@ -1904,7 +1900,7 @@ Peer.prototype._handleAddr = function _handleAddr(packet) { Peer.prototype._handlePing = co(function* _handlePing(packet) { this.fire('ping', this.minPing); if (packet.nonce) - yield this.send(new packets.PongPacket(packet.nonce)); + this.send(new packets.PongPacket(packet.nonce)); }); /** @@ -1913,7 +1909,7 @@ Peer.prototype._handlePing = co(function* _handlePing(packet) { * @param {PongPacket} */ -Peer.prototype._handlePong = function _handlePong(packet) { +Peer.prototype._handlePong = co(function* _handlePong(packet) { var nonce = packet.nonce; var now = util.ms(); @@ -1944,7 +1940,7 @@ Peer.prototype._handlePong = function _handlePong(packet) { this.challenge = null; this.fire('pong', this.minPing); -}; +}); /** * Handle `getaddr` packet. @@ -1989,7 +1985,7 @@ Peer.prototype._handleGetAddr = co(function* _handleGetAddr(packet) { items.length, this.hostname); - yield this.send(new packets.AddrPacket(items)); + this.send(new packets.AddrPacket(items)); }); /** @@ -1998,7 +1994,7 @@ Peer.prototype._handleGetAddr = co(function* _handleGetAddr(packet) { * @param {InvPacket} */ -Peer.prototype._handleInv = function _handleInv(packet) { +Peer.prototype._handleInv = co(function* _handleInv(packet) { var items = packet.items; var blocks = []; var txs = []; @@ -2043,7 +2039,7 @@ Peer.prototype._handleInv = function _handleInv(packet) { 'Peer sent an unknown inv type: %d (%s).', unknown, this.hostname); } -}; +}); /** * Handle `headers` packet. @@ -2051,7 +2047,7 @@ Peer.prototype._handleInv = function _handleInv(packet) { * @param {HeadersPacket} */ -Peer.prototype._handleHeaders = function _handleHeaders(packet) { +Peer.prototype._handleHeaders = co(function* _handleHeaders(packet) { var headers = packet.items; this.logger.debug( @@ -2064,7 +2060,7 @@ Peer.prototype._handleHeaders = function _handleHeaders(packet) { } this.fire('headers', headers); -}; +}); /** * Handle `sendheaders` packet. @@ -2072,10 +2068,10 @@ Peer.prototype._handleHeaders = function _handleHeaders(packet) { * @param {SendHeadersPacket} */ -Peer.prototype._handleSendHeaders = function _handleSendHeaders(packet) { +Peer.prototype._handleSendHeaders = co(function* _handleSendHeaders(packet) { this.preferHeaders = true; this.fire('sendheaders'); -}; +}); /** * Handle `block` packet. @@ -2083,9 +2079,9 @@ Peer.prototype._handleSendHeaders = function _handleSendHeaders(packet) { * @param {BlockPacket} */ -Peer.prototype._handleBlock = function _handleBlock(packet) { +Peer.prototype._handleBlock = co(function* _handleBlock(packet) { this.fire('block', packet.block); -}; +}); /** * Handle `tx` packet. @@ -2093,7 +2089,7 @@ Peer.prototype._handleBlock = function _handleBlock(packet) { * @param {TXPacket} */ -Peer.prototype._handleTX = function _handleTX(packet) { +Peer.prototype._handleTX = co(function* _handleTX(packet) { var tx = packet.tx; if (this.lastBlock) { @@ -2106,7 +2102,7 @@ Peer.prototype._handleTX = function _handleTX(packet) { } this.fire('tx', tx); -}; +}); /** * Handle `reject` packet. @@ -2114,7 +2110,7 @@ Peer.prototype._handleTX = function _handleTX(packet) { * @param {RejectPacket} */ -Peer.prototype._handleReject = function _handleReject(details) { +Peer.prototype._handleReject = co(function* _handleReject(details) { var hash, entry; this.fire('reject', details); @@ -2129,7 +2125,7 @@ Peer.prototype._handleReject = function _handleReject(details) { return; entry.reject(this); -}; +}); /** * Handle `alert` packet. @@ -2137,10 +2133,10 @@ Peer.prototype._handleReject = function _handleReject(details) { * @param {AlertPacket} */ -Peer.prototype._handleAlert = function _handleAlert(alert) { +Peer.prototype._handleAlert = co(function* _handleAlert(alert) { this.invFilter.add(alert.hash()); this.fire('alert', alert); -}; +}); /** * Handle `encinit` packet. @@ -2156,7 +2152,7 @@ Peer.prototype._handleEncinit = co(function* _handleEncinit(packet) { this.fire('encinit', packet); - yield this.send(this.bip151.toEncack()); + this.send(this.bip151.toEncack()); }); /** @@ -2165,14 +2161,14 @@ Peer.prototype._handleEncinit = co(function* _handleEncinit(packet) { * @param {EncackPacket} */ -Peer.prototype._handleEncack = function _handleEncack(packet) { +Peer.prototype._handleEncack = co(function* _handleEncack(packet) { if (!this.bip151) return; this.bip151.encack(packet.publicKey); this.fire('encack', packet); -}; +}); /** * Handle `authchallenge` packet. @@ -2190,7 +2186,7 @@ Peer.prototype._handleAuthChallenge = co(function* _handleAuthChallenge(packet) this.fire('authchallenge', packet.hash); - yield this.send(new packets.AuthReplyPacket(sig)); + this.send(new packets.AuthReplyPacket(sig)); }); /** @@ -2208,7 +2204,7 @@ Peer.prototype._handleAuthReply = co(function* _handleAuthReply(packet) { hash = this.bip150.reply(packet.signature); if (hash) - yield this.send(new packets.AuthProposePacket(hash)); + this.send(new packets.AuthProposePacket(hash)); this.fire('authreply', packet.signature); }); @@ -2227,7 +2223,7 @@ Peer.prototype._handleAuthPropose = co(function* _handleAuthPropose(packet) { hash = this.bip150.propose(packet.hash); - yield this.send(new packets.AuthChallengePacket(hash)); + this.send(new packets.AuthChallengePacket(hash)); this.fire('authpropose', packet.hash); }); @@ -2238,10 +2234,10 @@ Peer.prototype._handleAuthPropose = co(function* _handleAuthPropose(packet) { * @param {UnknownPacket} */ -Peer.prototype._handleUnknown = function _handleUnknown(packet) { +Peer.prototype._handleUnknown = co(function* _handleUnknown(packet) { this.logger.warning('Unknown packet: %s.', packet.cmd); this.fire('unknown', packet); -}; +}); /** * Handle `sendcmpct` packet. @@ -2249,7 +2245,7 @@ Peer.prototype._handleUnknown = function _handleUnknown(packet) { * @param {SendCmpctPacket} */ -Peer.prototype._handleSendCmpct = function _handleSendCmpct(packet) { +Peer.prototype._handleSendCmpct = co(function* _handleSendCmpct(packet) { var max = this.options.witness ? 2 : 1; if (packet.version > max) { @@ -2277,7 +2273,7 @@ Peer.prototype._handleSendCmpct = function _handleSendCmpct(packet) { this.compactMode = packet; this.compactWitness = packet.version === 2; this.fire('sendcmpct', packet); -}; +}); /** * Handle `cmpctblock` packet. @@ -2328,7 +2324,7 @@ Peer.prototype._handleCmpctBlock = co(function* _handleCmpctBlock(packet) { this.compactBlocks[hash] = block; - yield this.send(new packets.GetBlockTxnPacket(block.toRequest())); + this.send(new packets.GetBlockTxnPacket(block.toRequest())); this.logger.debug( 'Received semi-full compact block %s (%s).', @@ -2387,7 +2383,7 @@ Peer.prototype._handleGetBlockTxn = co(function* _handleGetBlockTxn(packet) { res = BIP152.TXResponse.fromBlock(block, req); - yield this.send(new packets.BlockTxnPacket(res, this.compactWitness)); + this.send(new packets.BlockTxnPacket(res, this.compactWitness)); this.fire('blocktxn', req); }); @@ -2398,7 +2394,7 @@ Peer.prototype._handleGetBlockTxn = co(function* _handleGetBlockTxn(packet) { * @param {BlockTxnPacket} */ -Peer.prototype._handleBlockTxn = function _handleBlockTxn(packet) { +Peer.prototype._handleBlockTxn = co(function* _handleBlockTxn(packet) { var res = packet.response; var block = this.compactBlocks[res.hash]; @@ -2423,7 +2419,7 @@ Peer.prototype._handleBlockTxn = function _handleBlockTxn(packet) { this.fire('block', block.toBlock()); this.fire('getblocktxn', res); -}; +}); /** * Send an `alert` to peer. @@ -2432,9 +2428,9 @@ Peer.prototype._handleBlockTxn = function _handleBlockTxn(packet) { Peer.prototype.sendAlert = function sendAlert(alert) { if (!this.invFilter.added(alert.hash())) - return Promise.resolve(); + return; - return this.trySend(alert); + this.send(alert); }; /** @@ -2465,7 +2461,7 @@ Peer.prototype.sendGetHeaders = function sendGetHeaders(locator, stop) { 'Height: %d, Hash: %s, Stop: %s', height, hash, stop || null); - return this.send(packet); + this.send(packet); }; /** @@ -2495,7 +2491,7 @@ Peer.prototype.sendGetBlocks = function getBlocks(locator, stop) { 'Height: %d, Hash: %s, Stop: %s', height, hash, stop || null); - return this.send(packet); + this.send(packet); }; /** @@ -2504,20 +2500,20 @@ Peer.prototype.sendGetBlocks = function getBlocks(locator, stop) { Peer.prototype.sendMempool = function sendMempool() { if (!this.version) - return Promise.resolve(); + return; if (!this.version.hasBloom()) { this.logger.debug( 'Cannot request mempool for non-bloom peer (%s).', this.hostname); - return Promise.resolve(); + return; } this.logger.debug( 'Requesting inv packet from peer with mempool (%s).', this.hostname); - return this.trySend(new packets.MempoolPacket()); + this.send(new packets.MempoolPacket()); }; /** @@ -2542,7 +2538,7 @@ Peer.prototype.sendReject = function sendReject(code, reason, obj) { 'Sending reject packet to peer (%s).', this.hostname); - return this.trySend(reject); + this.send(reject); }; /** @@ -2552,7 +2548,7 @@ Peer.prototype.sendReject = function sendReject(code, reason, obj) { Peer.prototype.sendCompact = function sendCompact() { var version = this.options.witness ? 2 : 1; this.logger.info('Initializing compact blocks (%s).', this.hostname); - return this.trySend(new packets.SendCmpctPacket(0, version)); + this.send(new packets.SendCmpctPacket(0, version)); }; /** @@ -2600,10 +2596,9 @@ Peer.prototype.ignore = function ignore() { */ Peer.prototype.reject = function reject(obj, code, reason, score) { - var promise = this.sendReject(code, reason, obj); + this.sendReject(code, reason, obj); if (score > 0) this.setMisbehavior(score); - return promise; }; /** @@ -2628,7 +2623,7 @@ Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) { return; } - yield this.sendGetBlocks(locator, root); + this.sendGetBlocks(locator, root); }); /** @@ -2640,7 +2635,7 @@ Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) { Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) { var locator = yield this.chain.getLocator(tip); - yield this.sendGetHeaders(locator, stop); + this.sendGetHeaders(locator, stop); }); /** @@ -2652,18 +2647,9 @@ Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) { Peer.prototype.getBlocks = co(function* getBlocks(tip, stop) { var locator = yield this.chain.getLocator(tip); - yield this.sendGetBlocks(locator, stop); + this.sendGetBlocks(locator, stop); }); -/** - * Start syncing from peer. - * @returns {Promise} - */ - -Peer.prototype.trySync = function trySync() { - return this.sync().catch(util.nop); -}; - /** * Start syncing from peer. * @returns {Promise} @@ -2695,7 +2681,7 @@ Peer.prototype.sync = co(function* sync() { // Ask for the mempool if we're synced. if (this.network.requestMempool) { if (this.isLoader() && this.chain.synced) - yield this.sendMempool(); + this.sendMempool(); } this.syncSent = true; diff --git a/lib/net/pool.js b/lib/net/pool.js index 2c37f87b..37ded217 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -154,6 +154,9 @@ function Pool(options) { this.timeout = null; this.interval = null; + this._onTimeout = this.onTimeout.bind(this); + this._onInterval = this.onInterval.bind(this); + this._initOptions(); this._init(); }; @@ -372,7 +375,7 @@ Pool.prototype.connect = function connect() { if (!this.options.selfish && !this.options.spv) { if (this.mempool) { this.mempool.on('tx', function(tx) { - self.announce(tx); + self.announceTX(tx); }); } @@ -385,7 +388,7 @@ Pool.prototype.connect = function connect() { this.chain.on('block', function(block) { if (!self.chain.synced) return; - self.announce(block); + self.announceBlock(block); }); } @@ -503,24 +506,8 @@ Pool.prototype._handleLeech = function _handleLeech(socket) { */ Pool.prototype.startTimeout = function startTimeout() { - var self = this; - - function destroy() { - if (!self.syncing) - return; - - if (self.chain.synced) - return; - - if (self.peers.load) { - self.peers.load.destroy(); - self.logger.debug('Timer ran out. Finding new loader peer.'); - } - } - this.stopTimeout(); - - this.timeout = setTimeout(destroy, this.loadTimeout); + this.timeout = setTimeout(this._onTimeout, this.loadTimeout); }; /** @@ -535,6 +522,24 @@ Pool.prototype.stopTimeout = function stopTimeout() { } }; +/** + * Potentially kill a stalling peer. + * @private + */ + +Pool.prototype.onTimeout = function onTimeout() { + if (!this.syncing) + return; + + if (this.chain.synced) + return; + + if (this.peers.load) { + this.peers.load.destroy(); + this.logger.debug('Timer ran out. Finding new loader peer.'); + } +}; + /** * Start the stall interval (shorter than the * stall timeout, inteded to give warnings and @@ -544,27 +549,8 @@ Pool.prototype.stopTimeout = function stopTimeout() { */ Pool.prototype.startInterval = function startInterval() { - var self = this; - - function load() { - var peer = self.peers.load; - var hostname = peer ? peer.hostname : null; - - if (!self.syncing) - return; - - if (self.chain.synced) - return self.stopInterval(); - - if (self.chain.isBusy()) - return self.startTimeout(); - - self.logger.warning('Loader peer is stalling (%s).', hostname); - } - this.stopInterval(); - - this.interval = setInterval(load, this.loadTimeout / 6 | 0); + this.interval = setInterval(this._onInterval, this.loadTimeout / 6 | 0); }; /** @@ -579,6 +565,27 @@ Pool.prototype.stopInterval = function stopInterval() { } }; +/** + * Warn that the loader peer is stalling. + * @private + */ + +Pool.prototype.onInterval = function onInterval() { + var peer = this.peers.load; + var hostname = peer ? peer.hostname : null; + + if (!this.syncing) + return; + + if (this.chain.synced) + return this.stopInterval(); + + if (this.chain.isBusy()) + return this.startTimeout(); + + this.logger.warning('Loader peer is stalling (%s).', hostname); +}; + /** * Add a loader peer. Necessary for * a sync to even begin. @@ -645,7 +652,7 @@ Pool.prototype.setLoader = function setLoader(peer) { this.peers.repurpose(peer); this.fillPeers(); - peer.trySync(); + peer.sync(); util.nextTick(function() { self.emit('loader', peer); @@ -683,7 +690,7 @@ Pool.prototype.sync = function sync() { for (peer = this.peers.head(); peer; peer = peer.next) { if (!peer.outbound || peer.pending) continue; - peer.trySync(); + peer.sync(); } }; @@ -699,7 +706,7 @@ Pool.prototype.forceSync = function forceSync() { if (!peer.outbound || peer.pending) continue; peer.syncSent = false; - peer.trySync(); + peer.sync(); } }; @@ -1753,20 +1760,13 @@ Pool.prototype.fulfill = function fulfill(data) { /** * Broadcast a transaction or block. - * @param {TX|Block|InvItem} msg + * @param {TX|Block} msg * @returns {Promise} - * or timeout. - * @returns {BroadcastItem} */ Pool.prototype.broadcast = function broadcast(msg) { - var hash = msg.hash; - var item; - - if (msg.toInv) - hash = msg.toInv().hash; - - item = this.invMap[hash]; + var hash = msg.hash('hex'); + var item = this.invMap[hash]; if (item) { item.refresh(); @@ -1783,19 +1783,32 @@ Pool.prototype.broadcast = function broadcast(msg) { }; /** - * Announce an item by sending an inv to all - * peers. This does not add it to the broadcast - * queue. - * @param {TX|Block} tx + * Announce a block to all peers. + * @param {Block} tx */ -Pool.prototype.announce = function announce(msg) { +Pool.prototype.announceBlock = function announceBlock(msg) { var peer; for (peer = this.peers.head(); peer; peer = peer.next) { if (!peer.outbound || peer.pending) continue; - peer.tryAnnounce(msg); + peer.announceBlock(msg); + } +}; + +/** + * Announce a transaction to all peers. + * @param {TX} tx + */ + +Pool.prototype.announceTX = function announceTX(msg) { + var peer; + + for (peer = this.peers.head(); peer; peer = peer.next) { + if (!peer.outbound || peer.pending) + continue; + peer.announceTX(msg); } }; @@ -2180,16 +2193,16 @@ HostList.prototype.add = function add(addr) { */ HostList.prototype.remove = function remove(addr) { - addr = this.map[addr.hostname]; + var item = this.map[addr.hostname]; - if (!addr) + if (!item) return; - util.binaryRemove(this.items, addr, compare); + util.binaryRemove(this.items, item, compare); - delete this.map[addr.hostname]; + delete this.map[item.hostname]; - return addr; + return item; }; /** @@ -2199,7 +2212,7 @@ HostList.prototype.remove = function remove(addr) { HostList.prototype.ban = function ban(addr) { this.misbehaving[addr.host] = util.now(); - this.remove(addr); + return this.remove(addr); }; /** @@ -2303,8 +2316,8 @@ function LoadRequest(pool, peer, type, hash) { this.timeout = null; this.onTimeout = this._onTimeout.bind(this); - this.next = null; this.prev = null; + this.next = null; assert(!this.pool.requestMap[this.hash]); this.pool.requestMap[this.hash] = this; @@ -2408,39 +2421,31 @@ LoadRequest.prototype.toInv = function toInv() { * @constructor * @private * @param {Pool} pool - * @param {TX|Block|InvItem} item + * @param {TX|Block} msg * @emits BroadcastItem#ack * @emits BroadcastItem#reject * @emits BroadcastItem#timeout */ -function BroadcastItem(pool, item) { +function BroadcastItem(pool, msg) { + var item; + if (!(this instanceof BroadcastItem)) - return new BroadcastItem(pool, item); + return new BroadcastItem(pool, msg); + + assert(!msg.mutable, 'Cannot broadcast mutable item.'); + + item = msg.toInv(); this.pool = pool; - this.callback = []; - - this.id = this.pool.uid++; - this.msg = null; - - if (item instanceof TX) - assert(!item.mutable, 'Cannot broadcast mutable TX.'); - - if (item.toInv) { - this.msg = item; - item = item.toInv(); - } + this.msg = msg; this.hash = item.hash; this.type = item.type; + this.callback = []; - assert(this.type != null); - assert(typeof this.hash === 'string'); - - // INV does not set the witness - // mask (only GETDATA does this). - assert((this.type & constants.WITNESS_MASK) === 0); + this.prev = null; + this.next = null; } util.inherits(BroadcastItem, EventEmitter); @@ -2493,7 +2498,17 @@ BroadcastItem.prototype.refresh = function refresh() { */ BroadcastItem.prototype.announce = function announce() { - return this.pool.announce(this); + switch (this.type) { + case constants.inv.TX: + this.pool.announceTX(this.msg); + break; + case constants.inv.BLOCK: + this.pool.announceBlock(this.msg); + break; + default: + assert(false, 'Bad type.'); + break; + } }; /** @@ -2567,15 +2582,6 @@ BroadcastItem.prototype.inspect = function inspect() { + '>'; }; -/** - * Convert broadcast item to an inv item. - * @returns {InvItem} - */ - -BroadcastItem.prototype.toInv = function toInv() { - return new InvItem(this.type, this.hash); -}; - /* * Helpers */