diff --git a/lib/net/packets.js b/lib/net/packets.js index 175f927e..130b917b 100644 --- a/lib/net/packets.js +++ b/lib/net/packets.js @@ -1639,10 +1639,14 @@ function FilterLoadPacket(filter, n, tweak, update) { Packet.call(this); - this.filter = filter || DUMMY; - this.n = n || 0; - this.tweak = tweak || 0; - this.update = update || 0; + if (filter instanceof bcoin.bloom) { + this.fromFilter(filter); + } else { + this.filter = filter || DUMMY; + this.n = n || 0; + this.tweak = tweak || 0; + this.update = update || 0; + } } utils.inherits(FilterLoadPacket, Packet); diff --git a/lib/net/peer.js b/lib/net/peer.js index 5b83a4be..c5d9864e 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -207,9 +207,13 @@ Peer.prototype._init = function init() { self.parser.feed(chunk); }); - this.parser.on('packet', function(packet) { - self._onPacket(packet); - }); + this.parser.on('packet', co(function *(packet) { + try { + yield self._onPacket(packet); + } catch (e) { + self.error(e); + } + })); this.parser.on('error', function(err) { self.error(err, true); @@ -486,7 +490,7 @@ Peer.prototype.isLoader = function isLoader() { * @param {Block[]|TX[]|InvItem[]|BroadcastEntry[]} items */ -Peer.prototype.announce = function announce(items) { +Peer.prototype.announce = co(function* announce(items) { var inv = []; var headers = []; var i, item, entry; @@ -541,18 +545,18 @@ Peer.prototype.announce = function announce(items) { inv.push(item); } - this.sendInv(inv); + yield this.sendInv(inv); if (headers.length > 0) - this.sendHeaders(headers); -}; + yield this.sendHeaders(headers); +}); /** * Send inv to a peer. * @param {InvItem[]} items */ -Peer.prototype.sendInv = function sendInv(items) { +Peer.prototype.sendInv = co(function* sendInv(items) { var i, chunk; if (this.destroyed) @@ -572,16 +576,16 @@ Peer.prototype.sendInv = function sendInv(items) { for (i = 0; i < items.length; i += 50000) { chunk = items.slice(i, i + 50000); - this.send(new packets.InvPacket(chunk)); + yield this.send(new packets.InvPacket(chunk)); } -}; +}); /** * Send headers to a peer. * @param {Headers[]} items */ -Peer.prototype.sendHeaders = function sendHeaders(items) { +Peer.prototype.sendHeaders = co(function* sendHeaders(items) { var i, chunk; if (this.destroyed) @@ -601,9 +605,9 @@ Peer.prototype.sendHeaders = function sendHeaders(items) { for (i = 0; i < items.length; i += 2000) { chunk = items.slice(i, i + 2000); - this.send(new packets.HeadersPacket(chunk)); + yield this.send(new packets.HeadersPacket(chunk)); } -}; +}); /** * Send a `version` packet. @@ -622,7 +626,7 @@ Peer.prototype.sendVersion = function sendVersion() { relay: this.options.relay }); - this.send(packet); + return this.send(packet); }; /** @@ -633,20 +637,18 @@ Peer.prototype.sendPing = function sendPing() { if (!this.version) return; - if (this.version.version <= 60000) { - this.send(new packets.PingPacket()); - return; - } + if (this.version.version <= 60000) + return this.send(new packets.PingPacket()); if (this.challenge) { this.logger.debug('Peer has not responded to ping (%s).', this.hostname); - return; + return Promise.resolve(null); } this.lastPing = utils.ms(); this.challenge = utils.nonce(); - this.send(new packets.PingPacket(this.challenge)); + return this.send(new packets.PingPacket(this.challenge)); }; /** @@ -677,9 +679,9 @@ Peer.prototype.isWatched = function isWatched(item) { Peer.prototype.updateWatch = function updateWatch() { if (!this.options.spv) - return; + return Promise.resolve(null); - this.send(packets.FilterLoadPacket.fromFilter(this.pool.spvFilter)); + return this.send(new packets.FilterLoadPacket(this.pool.spvFilter)); }; /** @@ -688,7 +690,7 @@ Peer.prototype.updateWatch = function updateWatch() { */ Peer.prototype.sendFeeRate = function sendFeeRate(rate) { - this.send(new packets.FeeFilterPacket(rate)); + return this.send(new packets.FeeFilterPacket(rate)); }; /** @@ -757,9 +759,13 @@ Peer.prototype.write = function write(data) { this.lastSend = utils.ms(); - return new Promise(function(resolve, reject) { - self.socket.write(data, wrap(resolve, reject)); - }); + if (this.socket.write(data) === false) { + return new Promise(function(resolve, reject) { + self.socket.once('drain', resolve); + }); + } + + return Promise.resolve(null); }; /** @@ -788,24 +794,6 @@ Peer.prototype.send = function send(packet) { return this.write(payload); }; -/** - * Send a packet. Throttle reads and wait for flush. - * @param {Packet} packet - * @returns {Promise} - */ - -Peer.prototype.flush = co(function* flush(packet) { - assert(this.outgoing >= 0); - - if (this.outgoing++ === 0) - this.socket.pause(); - - yield this.send(packet); - - if (--this.outgoing === 0) - this.socket.resume(); -}); - /** * Emit an error and destroy the peer. * @private @@ -923,7 +911,7 @@ Peer.prototype.getData = function getData(items) { data[i] = item; } - this.send(new packets.GetDataPacket(data)); + return this.send(new packets.GetDataPacket(data)); }; /** @@ -932,7 +920,26 @@ Peer.prototype.getData = function getData(items) { * @param {Packet} packet */ -Peer.prototype._onPacket = function onPacket(packet) { +Peer.prototype._onPacket = co(function* onPacket(packet) { + var unlock = yield this.locker.lock(); + + this.socket.pause(); + + try { + return yield this.__onPacket(packet); + } finally { + this.socket.resume(); + unlock(); + } +}); + +/** + * Handle a packet payload without a lock. + * @private + * @param {Packet} packet + */ + +Peer.prototype.__onPacket = co(function* onPacket(packet) { this.lastRecv = utils.ms(); if (this.bip151 @@ -957,29 +964,29 @@ Peer.prototype._onPacket = function onPacket(packet) { switch (packet.type) { case packetTypes.VERSION: - return this._handleVersion(packet); + return yield this._handleVersion(packet); case packetTypes.VERACK: return this._handleVerack(packet); case packetTypes.PING: - return this._handlePing(packet); + return yield this._handlePing(packet); case packetTypes.PONG: return this._handlePong(packet); case packetTypes.ALERT: return this._handleAlert(packet); case packetTypes.GETADDR: - return this._handleGetAddr(packet); + return yield this._handleGetAddr(packet); case packetTypes.ADDR: return this._handleAddr(packet); case packetTypes.INV: return this._handleInv(packet); case packetTypes.GETDATA: - return this._handleGetData(packet); + return yield this._handleGetData(packet); case packetTypes.NOTFOUND: return this._handleNotFound(packet); case packetTypes.GETBLOCKS: - return this._handleGetBlocks(packet); + return yield this._handleGetBlocks(packet); case packetTypes.GETHEADERS: - return this._handleGetHeaders(packet); + return yield this._handleGetHeaders(packet); case packetTypes.HEADERS: return this._handleHeaders(packet); case packetTypes.SENDHEADERS: @@ -991,7 +998,7 @@ Peer.prototype._onPacket = function onPacket(packet) { case packetTypes.REJECT: return this._handleReject(packet); case packetTypes.MEMPOOL: - return this._handleMempool(packet); + return yield this._handleMempool(packet); case packetTypes.FILTERLOAD: return this._handleFilterLoad(packet); case packetTypes.FILTERADD: @@ -1001,7 +1008,7 @@ Peer.prototype._onPacket = function onPacket(packet) { case packetTypes.MERKLEBLOCK: return this._handleMerkleBlock(packet); case packetTypes.GETUTXOS: - return this._handleGetUTXOs(packet); + return yield this._handleGetUTXOs(packet); case packetTypes.UTXOS: return this._handleUTXOs(packet); case packetTypes.HAVEWITNESS: @@ -1011,28 +1018,28 @@ Peer.prototype._onPacket = function onPacket(packet) { case packetTypes.SENDCMPCT: return this._handleSendCmpct(packet); case packetTypes.CMPCTBLOCK: - return this._handleCmpctBlock(packet); + return yield this._handleCmpctBlock(packet); case packetTypes.GETBLOCKTXN: - return this._handleGetBlockTxn(packet); + return yield this._handleGetBlockTxn(packet); case packetTypes.BLOCKTXN: return this._handleBlockTxn(packet); case packetTypes.ENCINIT: - return this._handleEncinit(packet); + return yield this._handleEncinit(packet); case packetTypes.ENCACK: return this._handleEncack(packet); case packetTypes.AUTHCHALLENGE: - return this._handleAuthChallenge(packet); + return yield this._handleAuthChallenge(packet); case packetTypes.AUTHREPLY: - return this._handleAuthReply(packet); + return yield this._handleAuthReply(packet); case packetTypes.AUTHPROPOSE: - return this._handleAuthPropose(packet); + return yield this._handleAuthPropose(packet); case packetTypes.UNKNOWN: return this._handleUnknown(packet); default: assert(false, 'Bad packet type.'); break; } -}; +}); /** * Flush merkle block once all matched @@ -1162,23 +1169,6 @@ Peer.prototype._handleUTXOs = function _handleUTXOs(utxos) { */ Peer.prototype._handleGetUTXOs = co(function* _handleGetUTXOs(packet) { - var unlock = yield this.locker.lock(); - try { - return yield this.__handleGetUTXOs(packet); - } catch (e) { - this.emit('error', e); - } finally { - unlock(); - } -}); - -/** - * Handle `getheaders` packet without lock. - * @private - * @param {GetHeadersPacket} - */ - -Peer.prototype.__handleGetUTXOs = co(function* _handleGetUTXOs(packet) { var i, utxos, prevout, hash, index, coin; if (!this.chain.synced) @@ -1229,7 +1219,7 @@ Peer.prototype.__handleGetUTXOs = co(function* _handleGetUTXOs(packet) { utxos.height = this.chain.height; utxos.tip = this.chain.tip.hash; - this.send(utxos); + yield this.send(utxos); }); /** @@ -1250,23 +1240,6 @@ Peer.prototype._handleHaveWitness = function _handleHaveWitness(packet) { */ Peer.prototype._handleGetHeaders = co(function* _handleGetHeaders(packet) { - var unlock = yield this.locker.lock(); - try { - return yield this.__handleGetHeaders(packet); - } catch (e) { - this.emit('error', e); - } finally { - unlock(); - } -}); - -/** - * Handle `getheaders` packet without lock. - * @private - * @param {GetHeadersPacket} - */ - -Peer.prototype.__handleGetHeaders = co(function* _handleGetHeaders(packet) { var headers = []; var hash, entry; @@ -1305,7 +1278,7 @@ Peer.prototype.__handleGetHeaders = co(function* _handleGetHeaders(packet) { entry = yield entry.getNext(); } - this.sendHeaders(headers); + yield this.sendHeaders(headers); }); /** @@ -1315,23 +1288,6 @@ Peer.prototype.__handleGetHeaders = co(function* _handleGetHeaders(packet) { */ Peer.prototype._handleGetBlocks = co(function* _handleGetBlocks(packet) { - var unlock = yield this.locker.lock(); - try { - return yield this.__handleGetBlocks(packet); - } catch (e) { - this.emit('error', e); - } finally { - unlock(); - } -}); - -/** - * Handle `getblocks` packet without lock. - * @private - * @param {GetBlocksPacket} - */ - -Peer.prototype.__handleGetBlocks = co(function* _handleGetBlocks(packet) { var blocks = []; var hash; @@ -1366,7 +1322,7 @@ Peer.prototype.__handleGetBlocks = co(function* _handleGetBlocks(packet) { hash = yield this.chain.db.getNextHash(hash); } - this.sendInv(blocks); + yield this.sendInv(blocks); }); /** @@ -1439,8 +1395,9 @@ Peer.prototype._handleVersion = co(function* _handleVersion(version) { this.relay = version.relay; this.version = version; - this.send(new packets.VerackPacket()); this.fire('version', version); + + yield this.send(new packets.VerackPacket()); }); /** @@ -1479,7 +1436,7 @@ Peer.prototype._handleMempool = function _handleMempool(packet) { this.logger.debug('Sending mempool snapshot (%s).', this.hostname); - this.sendInv(items); + return this.sendInv(items); }; /** @@ -1539,23 +1496,6 @@ Peer.prototype._getItem = co(function* _getItem(item) { */ Peer.prototype._handleGetData = co(function* _handleGetData(packet) { - var unlock = yield this.locker.lock(); - try { - return yield this.__handleGetData(packet); - } catch (e) { - this.emit('error', e); - } finally { - unlock(); - } -}); - -/** - * Handle `getdata` packet without lock. - * @private - * @param {GetDataPacket} - */ - -Peer.prototype.__handleGetData = co(function* _handleGetData(packet) { var notFound = []; var items = packet.items; var i, j, item, entry, tx, block; @@ -1587,7 +1527,7 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) { continue; } - yield this.flush(new packets.TXPacket(tx, item.hasWitness())); + yield this.send(new packets.TXPacket(tx, item.hasWitness())); continue; } @@ -1597,7 +1537,7 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) { switch (item.type) { case constants.inv.BLOCK: case constants.inv.WITNESS_BLOCK: - yield this.flush(new packets.BlockPacket(block, item.hasWitness())); + yield this.send(new packets.BlockPacket(block, item.hasWitness())); break; case constants.inv.FILTERED_BLOCK: case constants.inv.WITNESS_FILTERED_BLOCK: @@ -1608,18 +1548,18 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) { block = block.toMerkle(this.spvFilter); - yield this.flush(new packets.MerkleBlockPacket(block)); + yield this.send(new packets.MerkleBlockPacket(block)); for (j = 0; j < block.txs.length; j++) { tx = block.txs[j]; - yield this.flush(new packets.TXPacket(tx, item.hasWitness())); + yield this.send(new packets.TXPacket(tx, item.hasWitness())); } break; case constants.inv.CMPCT_BLOCK: // Fallback to full block. if (block.height < this.chain.tip.height - 10) { - yield this.flush(new packets.BlockPacket(block, false)); + yield this.send(new packets.BlockPacket(block, false)); break; } @@ -1634,7 +1574,8 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) { break; } - yield this.flush(new packets.CmpctBlockPacket(block, false)); + yield this.send(new packets.CmpctBlockPacket(block, false)); + break; default: this.logger.warning( @@ -1646,7 +1587,7 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) { } if (item.hash === this.hashContinue) { - this.sendInv(new InvItem(constants.inv.BLOCK, this.chain.tip.hash)); + yield this.sendInv(new InvItem(constants.inv.BLOCK, this.chain.tip.hash)); this.hashContinue = null; } } @@ -1658,7 +1599,7 @@ Peer.prototype.__handleGetData = co(function* _handleGetData(packet) { this.hostname); if (notFound.length > 0) - this.send(new packets.NotFoundPacket(notFound)); + yield this.send(new packets.NotFoundPacket(notFound)); }); /** @@ -1700,11 +1641,11 @@ Peer.prototype._handleAddr = function _handleAddr(packet) { * @param {PingPacket} */ -Peer.prototype._handlePing = function _handlePing(packet) { - if (packet.nonce) - this.send(new packets.PongPacket(packet.nonce)); +Peer.prototype._handlePing = co(function* _handlePing(packet) { this.fire('ping', this.minPing); -}; + if (packet.nonce) + yield this.send(new packets.PongPacket(packet.nonce)); +}); /** * Handle `pong` packet. @@ -1751,7 +1692,7 @@ Peer.prototype._handlePong = function _handlePong(packet) { * @param {GetAddrPacket} */ -Peer.prototype._handleGetAddr = function _handleGetAddr(packet) { +Peer.prototype._handleGetAddr = co(function* _handleGetAddr(packet) { var items = []; var i, addr; @@ -1788,8 +1729,8 @@ Peer.prototype._handleGetAddr = function _handleGetAddr(packet) { items.length, this.hostname); - this.send(new packets.AddrPacket(items)); -}; + yield this.send(new packets.AddrPacket(items)); +}); /** * Handle `inv` packet. @@ -1939,21 +1880,16 @@ Peer.prototype._handleAlert = function _handleAlert(alert) { * @param {EncinitPacket} */ -Peer.prototype._handleEncinit = function _handleEncinit(packet) { +Peer.prototype._handleEncinit = co(function* _handleEncinit(packet) { if (!this.bip151) return; - try { - this.bip151.encinit(packet.publicKey, packet.cipher); - } catch (e) { - this.error(e); - return; - } - - this.send(this.bip151.toEncack()); + this.bip151.encinit(packet.publicKey, packet.cipher); this.fire('encinit', packet); -}; + + yield this.send(this.bip151.toEncack()); +}); /** * Handle `encack` packet. @@ -1961,19 +1897,14 @@ Peer.prototype._handleEncinit = function _handleEncinit(packet) { * @param {EncackPacket} */ -Peer.prototype._handleEncack = function _handleEncack(packet) { +Peer.prototype._handleEncack = co(function* _handleEncack(packet) { if (!this.bip151) return; - try { - this.bip151.encack(packet.publicKey); - } catch (e) { - this.error(e); - return; - } + this.bip151.encack(packet.publicKey); this.fire('encack', packet); -}; +}); /** * Handle `authchallenge` packet. @@ -1981,23 +1912,18 @@ Peer.prototype._handleEncack = function _handleEncack(packet) { * @param {AuthChallengePacket} */ -Peer.prototype._handleAuthChallenge = function _handleAuthChallenge(packet) { +Peer.prototype._handleAuthChallenge = co(function* _handleAuthChallenge(packet) { var sig; if (!this.bip150) return; - try { - sig = this.bip150.challenge(packet.hash); - } catch (e) { - this.error(e); - return; - } - - this.send(new packets.AuthReplyPacket(sig)); + sig = this.bip150.challenge(packet.hash); this.fire('authchallenge', packet.hash); -}; + + yield this.send(new packets.AuthReplyPacket(sig)); +}); /** * Handle `authreply` packet. @@ -2005,24 +1931,19 @@ Peer.prototype._handleAuthChallenge = function _handleAuthChallenge(packet) { * @param {AuthReplyPacket} */ -Peer.prototype._handleAuthReply = function _handleAuthReply(packet) { +Peer.prototype._handleAuthReply = co(function* _handleAuthReply(packet) { var hash; if (!this.bip150) return; - try { - hash = this.bip150.reply(packet.signature); - } catch (e) { - this.error(e); - return; - } + hash = this.bip150.reply(packet.signature); if (hash) - this.send(new packets.AuthProposePacket(hash)); + yield this.send(new packets.AuthProposePacket(hash)); this.fire('authreply', packet.signature); -}; +}); /** * Handle `authpropose` packet. @@ -2030,23 +1951,18 @@ Peer.prototype._handleAuthReply = function _handleAuthReply(packet) { * @param {AuthProposePacket} */ -Peer.prototype._handleAuthPropose = function _handleAuthPropose(packet) { +Peer.prototype._handleAuthPropose = co(function* _handleAuthPropose(packet) { var hash; if (!this.bip150) return; - try { - hash = this.bip150.propose(packet.hash); - } catch (e) { - this.error(e); - return; - } + hash = this.bip150.propose(packet.hash); - this.send(new packets.AuthChallengePacket(hash)); + yield this.send(new packets.AuthChallengePacket(hash)); this.fire('authpropose', packet.hash); -}; +}); /** * Handle an unknown packet. @@ -2127,7 +2043,7 @@ Peer.prototype._handleCmpctBlock = co(function* _handleCmpctBlock(packet) { return; } - this.send(new packets.GetBlockTxnPacket(block.toRequest())); + yield this.send(new packets.GetBlockTxnPacket(block.toRequest())); this.logger.debug( 'Received semi-full compact block %s (%s).', @@ -2184,7 +2100,7 @@ Peer.prototype._handleGetBlockTxn = co(function* _handleGetBlockTxn(packet) { res = bcoin.bip152.TXResponse.fromBlock(block, req); - yield this.flush(new packets.BlockTxnPacket(res, false)); + yield this.send(new packets.BlockTxnPacket(res, false)); this.fire('blocktxn', req); }); @@ -2229,9 +2145,9 @@ Peer.prototype._handleBlockTxn = function _handleBlockTxn(packet) { Peer.prototype.sendAlert = function sendAlert(alert) { if (!this.invFilter.added(alert.hash())) - return; + return Promise.resolve(null); - this.send(alert); + return this.send(alert); }; /** @@ -2260,7 +2176,7 @@ Peer.prototype.sendGetHeaders = function sendGetHeaders(locator, stop) { this.logger.debug('Height: %d, Hash: %s, Stop: %s', height, hash, stop); - this.send(packet); + return this.send(packet); }; /** @@ -2288,7 +2204,7 @@ Peer.prototype.sendGetBlocks = function getBlocks(locator, stop) { this.logger.debug('Height: %d, Hash: %s, Stop: %s', height, hash, stop); - this.send(packet); + return this.send(packet); }; /** @@ -2303,14 +2219,14 @@ Peer.prototype.sendMempool = function sendMempool() { this.logger.debug( 'Cannot request mempool for non-bloom peer (%s).', this.hostname); - return; + return Promise.resolve(null); } this.logger.debug( 'Requesting inv packet from peer with mempool (%s).', this.hostname); - this.send(new packets.MempoolPacket()); + return this.send(new packets.MempoolPacket()); }; /** @@ -2335,7 +2251,7 @@ Peer.prototype.sendReject = function sendReject(code, reason, obj) { 'Sending reject packet to peer (%s).', this.hostname); - this.send(reject); + return this.send(reject); }; /** @@ -2344,7 +2260,7 @@ Peer.prototype.sendReject = function sendReject(code, reason, obj) { Peer.prototype.sendCompact = function sendCompact() { this.logger.info('Initializing compact blocks (%s).', this.hostname); - this.send(new packets.SendCmpctPacket(0, 1)); + return this.send(new packets.SendCmpctPacket(0, 1)); }; /** @@ -2392,9 +2308,10 @@ Peer.prototype.ignore = function ignore() { */ Peer.prototype.reject = function reject(obj, code, reason, score) { - this.sendReject(code, reason, obj); + var promise = this.sendReject(code, reason, obj); if (score > 0) this.setMisbehavior(score); + return promise; }; /** @@ -2419,7 +2336,7 @@ Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) { return; } - this.sendGetBlocks(locator, root); + yield this.sendGetBlocks(locator, root); }); /** @@ -2431,7 +2348,7 @@ Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) { Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) { var locator = yield this.chain.getLocator(tip); - this.sendGetHeaders(locator, stop); + return this.sendGetHeaders(locator, stop); }); /** @@ -2443,7 +2360,7 @@ Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) { Peer.prototype.getBlocks = co(function* getBlocks(tip, stop) { var locator = yield this.chain.getLocator(tip); - this.sendGetBlocks(locator, stop); + return this.sendGetBlocks(locator, stop); }); /** @@ -2486,7 +2403,7 @@ Peer.prototype.sync = function sync() { return this.getHeaders(tip); } - this.getBlocks(); + return this.getBlocks(); }; /**