From 75430491809ab331c855bd9684a7368579664af9 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Thu, 19 Jan 2017 17:26:40 -0800 Subject: [PATCH] pool: refactor packet handling. --- lib/net/peer.js | 202 ++---- lib/net/pool.js | 1780 +++++++++++++++++++++++++++-------------------- 2 files changed, 1089 insertions(+), 893 deletions(-) diff --git a/lib/net/peer.js b/lib/net/peer.js index 5670509c..3b5b963f 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -31,7 +31,6 @@ var NetAddress = require('../primitives/netaddress'); var services = common.services; var invTypes = InvItem.types; var packetTypes = packets.types; -var VerifyResult = errors.VerifyResult; /** * Represents a remote peer. @@ -1501,6 +1500,13 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { if (entry) entry.resolve(packet); + + if (packet.type === packetTypes.UNKNOWN) { + this.emit('unknown', packet); + return; + } + + this.emit(packet.cmd, packet); }); /** @@ -1513,9 +1519,7 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { Peer.prototype.flushMerkle = co(function* flushMerkle() { assert(this.lastMerkle); - this.emit('merkleblock', this.lastMerkle); - - yield this.pool.handleBlock(this, this.lastMerkle); + yield this.pool.addBlock(this, this.lastMerkle); this.lastMerkle = null; this.waitingTX = 0; @@ -1535,6 +1539,8 @@ Peer.prototype.handleFilterLoad = co(function* handleFilterLoad(packet) { this.spvFilter = packet.filter; this.noRelay = false; + + yield this.pool.handleFilterLoad(this, packet); }); /** @@ -1555,12 +1561,14 @@ Peer.prototype.handleFilterAdd = co(function* handleFilterAdd(packet) { this.spvFilter.add(data); this.noRelay = false; + + yield this.pool.handleFilterAdd(this, packet); }); /** * Handle `filterclear` packet. * @private - * @param {FilterClearPacket} + * @param {FilterClearPacket} packet */ Peer.prototype.handleFilterClear = co(function* handleFilterClear(packet) { @@ -1568,54 +1576,24 @@ Peer.prototype.handleFilterClear = co(function* handleFilterClear(packet) { this.spvFilter.reset(); this.noRelay = false; + + yield this.pool.handleFilterClear(this, packet); }); /** * Handle `merkleblock` packet. * @private - * @param {MerkleBlockPacket} + * @param {MerkleBlockPacket} packet */ Peer.prototype.handleMerkleBlock = co(function* handleMerkleBlock(packet) { - var block = packet.block; - var ret = new VerifyResult(); - - // Potential DoS. - if (!this.options.spv) { - this.logger.warning( - 'Peer sent unsolicited merkleblock (%s).', - this.hostname); - this.increaseBan(100); - return; - } - - if (this.lastMerkle) { - this.logger.warning( - 'Peer sent a merkleblock prematurely (%s).', - this.hostname); - this.increaseBan(100); - return; - } - - if (!block.verify(ret)) { - this.logger.warning( - 'Peer sent an invalid merkleblock (%s).', - this.hostname); - this.reject(block, 'invalid', ret.reason, ret.score); - return; - } - - this.lastMerkle = block; - this.waitingTX = block.matches.length; - - if (this.waitingTX === 0) - yield this.flushMerkle(); + yield this.pool.handleMerkleBlock(this, packet); }); /** * Handle `feefilter` packet. * @private - * @param {FeeFilterPacket} + * @param {FeeFilterPacket} packet */ Peer.prototype.handleFeeFilter = co(function* handleFeeFilter(packet) { @@ -1628,28 +1606,26 @@ Peer.prototype.handleFeeFilter = co(function* handleFeeFilter(packet) { this.feeRate = rate; - this.emit('feefilter', rate); + yield this.pool.handleFeeFilter(this, packet); }); /** * Handle `getheaders` packet. * @private - * @param {GetHeadersPacket} + * @param {GetHeadersPacket} packet */ Peer.prototype.handleGetHeaders = co(function* handleGetHeaders(packet) { - this.emit('getheaders', packet); yield this.pool.handleGetHeaders(this, packet); }); /** * Handle `getblocks` packet. * @private - * @param {GetBlocksPacket} + * @param {GetBlocksPacket} packet */ Peer.prototype.handleGetBlocks = co(function* handleGetBlocks(packet) { - this.emit('getblocks', packet); yield this.pool.handleGetBlocks(this, packet); }); @@ -1700,8 +1676,6 @@ Peer.prototype.handleVersion = co(function* handleVersion(packet) { } } - this.emit('version', packet); - this.send(new packets.VerackPacket()); yield this.pool.handleVersion(this, packet); @@ -1710,75 +1684,77 @@ Peer.prototype.handleVersion = co(function* handleVersion(packet) { /** * Handle `verack` packet. * @private - * @param {VerackPacket} + * @param {VerackPacket} packet */ Peer.prototype.handleVerack = co(function* handleVerack(packet) { + if (this.ack) { + this.logger.debug('Peer sent duplicate ack (%s).', this.hostname); + return; + } + this.ack = true; - this.emit('verack', packet); this.logger.debug('Received verack (%s).', this.hostname); + + yield this.pool.handleVerack(this, packet); }); /** * Handle `mempool` packet. * @private - * @param {MempoolPacket} + * @param {MempoolPacket} packet */ Peer.prototype.handleMempool = co(function* handleMempool(packet) { - this.emit('mempool', packet); - yield this.pool.handleMempool(this); + yield this.pool.handleMempool(this, packet); }); /** * Handle `getdata` packet. * @private - * @param {GetDataPacket} + * @param {GetDataPacket} packet */ Peer.prototype.handleGetData = co(function* handleGetData(packet) { - this.emit('getdata', packet.items); - yield this.pool.handleGetData(this, packet.items); + yield this.pool.handleGetData(this, packet); }); /** * Handle `notfound` packet. * @private - * @param {NotFoundPacket} + * @param {NotFoundPacket} packet */ Peer.prototype.handleNotFound = co(function* handleNotFound(packet) { - this.emit('notfound', packet.items); - yield this.pool.handleNotFound(this, packet.items); + yield this.pool.handleNotFound(this, packet); }); /** * Handle `addr` packet. * @private - * @param {AddrPacket} + * @param {AddrPacket} packet */ Peer.prototype.handleAddr = co(function* handleAddr(packet) { - this.emit('addr', packet.items); - yield this.pool.handleAddr(this, packet.items); + yield this.pool.handleAddr(this, packet); }); /** * Handle `ping` packet. * @private - * @param {PingPacket} + * @param {PingPacket} packet */ Peer.prototype.handlePing = co(function* handlePing(packet) { - this.emit('ping', this.minPing); if (packet.nonce) this.send(new packets.PongPacket(packet.nonce)); + yield this.pool.handlePing(this, packet); }); /** * Handle `pong` packet. * @private - * @param {PongPacket} + * @param {PongPacket} packet */ Peer.prototype.handlePong = co(function* handlePong(packet) { @@ -1811,111 +1787,84 @@ Peer.prototype.handlePong = co(function* handlePong(packet) { this.challenge = null; - this.emit('pong', this.minPing); + yield this.pool.handlePong(this, packet); }); /** * Handle `getaddr` packet. * @private - * @param {GetAddrPacket} + * @param {GetAddrPacket} packet */ Peer.prototype.handleGetAddr = co(function* handleGetAddr(packet) { - this.emit('getaddr', packet); yield this.pool.handleGetAddr(this, packet); }); /** * Handle `inv` packet. * @private - * @param {InvPacket} + * @param {InvPacket} packet */ Peer.prototype.handleInv = co(function* handleInv(packet) { - this.emit('inv', packet.items); - yield this.pool.handleInv(this, packet.items); + yield this.pool.handleInv(this, packet); }); /** * Handle `headers` packet. * @private - * @param {HeadersPacket} + * @param {HeadersPacket} packet */ Peer.prototype.handleHeaders = co(function* handleHeaders(packet) { - this.emit('headers', packet.items); - yield this.pool.handleHeaders(this, packet.items); + yield this.pool.handleHeaders(this, packet); }); /** * Handle `sendheaders` packet. * @private - * @param {SendHeadersPacket} + * @param {SendHeadersPacket} packet */ Peer.prototype.handleSendHeaders = co(function* handleSendHeaders(packet) { this.preferHeaders = true; - this.emit('sendheaders'); + yield this.pool.handleSendHeaders(this, packet); }); /** * Handle `block` packet. * @private - * @param {BlockPacket} + * @param {BlockPacket} packet */ Peer.prototype.handleBlock = co(function* handleBlock(packet) { - if (this.options.spv) { - this.logger.warning( - 'Peer sent unsolicited block (%s).', - this.hostname); - return; - } - - this.emit('block', packet.block); - - yield this.pool.handleBlock(this, packet.block); + yield this.pool.handleBlock(this, packet); }); /** * Handle `tx` packet. * @private - * @param {TXPacket} + * @param {TXPacket} packet */ Peer.prototype.handleTX = co(function* handleTX(packet) { - var tx = packet.tx; - - if (this.lastMerkle) { - assert(this.waitingTX > 0); - if (this.lastMerkle.hasTX(tx)) { - this.lastMerkle.addTX(tx); - if (--this.waitingTX === 0) - yield this.flushMerkle(); - return; - } - } - - this.emit('tx', tx); - - yield this.pool.handleTX(this, tx); + yield this.pool.handleTX(this, packet); }); /** * Handle `reject` packet. * @private - * @param {RejectPacket} reject + * @param {RejectPacket} packet */ -Peer.prototype.handleReject = co(function* handleReject(reject) { - this.emit('reject', reject); - yield this.pool.handleReject(this, reject); +Peer.prototype.handleReject = co(function* handleReject(packet) { + yield this.pool.handleReject(this, packet); }); /** * Handle `encinit` packet. * @private - * @param {EncinitPacket} + * @param {EncinitPacket} packet */ Peer.prototype.handleEncinit = co(function* handleEncinit(packet) { @@ -1924,15 +1873,15 @@ Peer.prototype.handleEncinit = co(function* handleEncinit(packet) { this.bip151.encinit(packet.publicKey, packet.cipher); - this.emit('encinit', packet); - this.send(this.bip151.toEncack()); + + yield this.pool.handleEncinit(this, packet); }); /** * Handle `encack` packet. * @private - * @param {EncackPacket} + * @param {EncackPacket} packet */ Peer.prototype.handleEncack = co(function* handleEncack(packet) { @@ -1941,13 +1890,13 @@ Peer.prototype.handleEncack = co(function* handleEncack(packet) { this.bip151.encack(packet.publicKey); - this.emit('encack', packet); + yield this.pool.handleEncack(this, packet); }); /** * Handle `authchallenge` packet. * @private - * @param {AuthChallengePacket} + * @param {AuthChallengePacket} packet */ Peer.prototype.handleAuthChallenge = co(function* handleAuthChallenge(packet) { @@ -1958,15 +1907,15 @@ Peer.prototype.handleAuthChallenge = co(function* handleAuthChallenge(packet) { sig = this.bip150.challenge(packet.hash); - this.emit('authchallenge', packet.hash); - this.send(new packets.AuthReplyPacket(sig)); + + yield this.pool.handleAuthChallenge(this, packet); }); /** * Handle `authreply` packet. * @private - * @param {AuthReplyPacket} + * @param {AuthReplyPacket} packet */ Peer.prototype.handleAuthReply = co(function* handleAuthReply(packet) { @@ -1980,13 +1929,13 @@ Peer.prototype.handleAuthReply = co(function* handleAuthReply(packet) { if (hash) this.send(new packets.AuthProposePacket(hash)); - this.emit('authreply', packet.signature); + yield this.pool.handleAuthReply(this, packet); }); /** * Handle `authpropose` packet. * @private - * @param {AuthProposePacket} + * @param {AuthProposePacket} packet */ Peer.prototype.handleAuthPropose = co(function* handleAuthPropose(packet) { @@ -1999,18 +1948,17 @@ Peer.prototype.handleAuthPropose = co(function* handleAuthPropose(packet) { this.send(new packets.AuthChallengePacket(hash)); - this.emit('authpropose', packet.hash); + yield this.pool.handleAuthPropose(this, packet); }); /** * Handle an unknown packet. * @private - * @param {UnknownPacket} + * @param {UnknownPacket} packet */ Peer.prototype.handleUnknown = co(function* handleUnknown(packet) { - this.logger.warning('Unknown packet: %s.', packet.cmd); - this.emit('unknown', packet); + yield this.pool.handleUnknown(this, packet); }); /** @@ -2046,7 +1994,8 @@ Peer.prototype.handleSendCmpct = co(function* handleSendCmpct(packet) { this.compactMode = packet.mode; this.compactWitness = packet.version === 2; - this.emit('sendcmpct', packet); + + yield this.pool.handleSendCmpct(this, packet); }); /** @@ -2056,8 +2005,7 @@ Peer.prototype.handleSendCmpct = co(function* handleSendCmpct(packet) { */ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) { - this.emit('cmpctblock', packet.block); - yield this.pool.handleCmpctBlock(this, packet.block); + yield this.pool.handleCmpctBlock(this, packet); }); /** @@ -2067,8 +2015,7 @@ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) { */ Peer.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(packet) { - this.emit('getblocktxn', packet.request); - yield this.pool.handleGetBlockTxn(this, packet.request); + yield this.pool.handleGetBlockTxn(this, packet); }); /** @@ -2078,8 +2025,7 @@ Peer.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(packet) { */ Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(packet) { - this.emit('blocktxn', packet.response); - yield this.pool.handleGetBlockTxn(this, packet.response); + yield this.pool.handleGetBlockTxn(this, packet); }); /** diff --git a/lib/net/pool.js b/lib/net/pool.js index 1562e19c..d4534e0e 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -34,6 +34,7 @@ var Map = require('../utils/map'); var packets = require('./packets'); var invTypes = InvItem.types; var VerifyError = errors.VerifyError; +var VerifyResult = errors.VerifyResult; /** * A pool of peers for handling all network activity. @@ -609,6 +610,200 @@ Pool.prototype.sendGetAddr = function sendGetAddr() { peer.sendGetAddr(); }; +/** + * Request current header chain blocks. + * @param {Peer} peer + * @returns {Promise} + */ + +Pool.prototype.resolveHeaders = co(function* resolveHeaders(peer) { + var items = []; + var node; + + if (!this.headerNext) + return; + + for (node = this.headerNext; node; node = node.next) { + this.headerNext = node.next; + + if (yield this.hasBlock(node.hash)) + continue; + + items.push(node.hash); + } + + // Checkpoints should never be + // spaced more than 50k blocks + // apart from each other. + assert(items.length <= 50000); + + this.getBlock(peer, items); +}); + +/** + * Find the next highest checkpoint. + * @private + * @param {Number} height + * @returns {Object} + */ + +Pool.prototype.getNextCheckpoint = function getNextCheckpoint(height) { + var i, next; + + for (i = 0; i < this.checkpoints.length; i++) { + next = this.checkpoints[i]; + if (next.height > height) + return next; + } +}; + +/** + * Announce broadcast list to peer. + * @param {Peer} peer + */ + +Pool.prototype.announceList = function announceList(peer) { + var blocks = []; + var txs = []; + var hashes = this.invMap.keys(); + var i, hash, item; + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + item = this.invMap.get(hash); + + switch (item.type) { + case invTypes.BLOCK: + blocks.push(item.msg); + break; + case invTypes.TX: + txs.push(item.msg); + break; + default: + assert(false, 'Bad item type.'); + break; + } + } + + if (blocks.length > 0) + peer.announceBlock(blocks); + + if (txs.length > 0) + peer.announceTX(txs); +}; + +/** + * Get a block/tx from the broadcast map. + * @private + * @param {Peer} peer + * @param {InvItem} item + * @returns {Promise} + */ + +Pool.prototype.getBroadcasted = function getBroadcasted(peer, item) { + var type = item.isTX() ? invTypes.TX : invTypes.BLOCK; + var entry = this.invMap.get(item.hash); + + if (!entry) + return; + + if (type !== entry.type) { + this.logger.debug( + 'Peer requested item with the wrong type (%s).', + peer.hostname); + return; + } + + this.logger.debug( + 'Peer requested %s %s as a %s packet (%s).', + item.isTX() ? 'tx' : 'block', + item.rhash(), + item.hasWitness() ? 'witness' : 'normal', + peer.hostname); + + entry.ack(peer); + + return entry.msg; +}; + +/** + * Get a block/tx either from the broadcast map, mempool, or blockchain. + * @private + * @param {Peer} peer + * @param {InvItem} item + * @returns {Promise} + */ + +Pool.prototype.getItem = co(function* getItem(peer, item) { + var entry = this.getBroadcasted(peer, item); + + if (entry) + return entry; + + if (this.options.selfish) + return; + + if (item.isTX()) { + if (!this.mempool) + return; + return this.mempool.getTX(item.hash); + } + + if (this.chain.options.spv) + return; + + if (this.chain.options.prune) + return; + + return yield this.chain.db.getBlock(item.hash); +}); + +/** + * Send a block from the broadcast list or chain. + * @private + * @param {Peer} peer + * @param {InvItem} item + * @returns {Boolean} + */ + +Pool.prototype.sendBlock = co(function* sendBlock(peer, item, witness) { + var block = this.getBroadcasted(peer, item); + + // Check for a broadcasted item first. + if (block) { + peer.send(new packets.BlockPacket(block, witness)); + return true; + } + + if (this.options.selfish + || this.chain.options.spv + || this.chain.options.prune) { + return false; + } + + // If we have the same serialization, we + // can write the raw binary to the socket. + if (witness === this.chain.options.witness) { + block = yield this.chain.db.getRawBlock(item.hash); + + if (block) { + peer.sendRaw('block', block); + return true; + } + + return false; + } + + block = yield this.chain.db.getBlock(item.hash); + + if (block) { + peer.send(new packets.BlockPacket(block, witness)); + return true; + } + + return false; +}); + /** * Create an outbound peer with no special purpose. * @private @@ -779,14 +974,94 @@ Pool.prototype.handleVersion = co(function* handleVersion(peer, packet) { this.emit('version', packet, peer); }); +/** + * Handle `verack` packet. + * @private + * @param {Peer} peer + * @param {VerackPacket} packet + */ + +Pool.prototype.handleVerack = co(function* handleVerack(peer, packet) { + ; +}); + +/** + * Handle `ping` packet. + * @private + * @param {Peer} peer + * @param {PingPacket} packet + */ + +Pool.prototype.handlePing = co(function* handlePing(peer, packet) { + ; +}); + +/** + * Handle `pong` packet. + * @private + * @param {Peer} peer + * @param {PongPacket} packet + */ + +Pool.prototype.handlePong = co(function* handlePong(peer, packet) { + ; +}); + +/** + * Handle `getaddr` packet. + * @private + * @param {Peer} peer + * @param {GetAddrPacket} packet + */ + +Pool.prototype.handleGetAddr = co(function* handleGetAddr(peer, packet) { + var items = []; + var i, addrs, addr; + + if (this.options.selfish) + return; + + if (peer.sentAddr) { + this.logger.debug('Ignoring repeated getaddr (%s).', peer.hostname); + return; + } + + peer.sentAddr = true; + + addrs = this.hosts.toArray(); + + for (i = 0; i < addrs.length; i++) { + addr = addrs[i]; + + if (!peer.addrFilter.added(addr.hostname, 'ascii')) + continue; + + items.push(addr); + + if (items.length === 1000) + break; + } + + if (items.length === 0) + return; + + this.logger.debug( + 'Sending %d addrs to peer (%s)', + items.length, + peer.hostname); + + peer.send(new packets.AddrPacket(items)); +}); + /** * Handle peer addr event. * @private * @param {Peer} peer - * @param {NetAddress[]} addrs + * @param {AddrPacket} packet */ -Pool.prototype.handleAddr = co(function* handleAddr(peer, addrs) { +Pool.prototype.handleAddr = co(function* handleAddr(peer, packet) { + var addrs = packet.items; var now = this.network.now(); var services = this.options.requiredServices; var i, addr; @@ -823,384 +1098,17 @@ Pool.prototype.handleAddr = co(function* handleAddr(peer, addrs) { this.fillOutbound(); }); -/** - * Handle `block` packet. Attempt to add to chain. - * @private - * @param {Peer} peer - * @param {MemBlock|MerkleBlock} block - * @returns {Promise} - */ - -Pool.prototype.handleBlock = co(function* handleBlock(peer, block) { - var hash = block.hash('hex'); - var unlock = yield this.locker.lock(hash); - try { - return yield this._handleBlock(peer, block); - } finally { - unlock(); - } -}); - -/** - * Handle `block` packet. Attempt to add to chain (without a lock). - * @private - * @param {Peer} peer - * @param {MemBlock|MerkleBlock} block - * @returns {Promise} - */ - -Pool.prototype._handleBlock = co(function* handleBlock(peer, block) { - var hash = block.hash('hex'); - var isCheckpoint = false; - var requested, node, checkpoint; - - if (!this.syncing) - return; - - requested = this.fulfill(peer, hash); - - // Someone is sending us blocks without - // us requesting them. - if (!requested) { - peer.invFilter.add(block.hash()); - this.logger.warning( - 'Received unrequested block: %s (%s).', - block.rhash(), peer.hostname); - return; - } - - if (this.headersFirst) { - node = this.headerChain.head; - assert(node); - - if (hash === node.hash) { - if (hash === this.nextCheckpoint.hash) { - this.logger.info( - 'Received checkpoint block %s (%d).', - hash, node.height); - isCheckpoint = true; - } else { - this.headerChain.shift(); - } - } - } - - peer.lastBlock = util.ms(); - - try { - yield this.chain.add(block); - } catch (err) { - if (err.type === 'VerifyError') { - if (err.reason === 'bad-prevblk') { - if (this.headersFirst) { - peer.increaseBan(10); - this.emit('error', err, peer); - return; - } - this.logger.debug('Peer sent an orphan block. Resolving.'); - yield this.resolveOrphan(peer, hash); - return; - } - peer.reject(block, err.code, err.reason, err.score); - this.emit('error', err, peer); - return; - } - throw err; - } - - this.logStatus(block); - - if (!this.headersFirst) - return; - - if (!isCheckpoint) { - yield this.resolveHeaders(peer); - return; - } - - node = this.nextCheckpoint; - checkpoint = this.getNextCheckpoint(node.height); - - if (checkpoint) { - this.nextCheckpoint = checkpoint; - peer.sendGetHeaders([node.hash], checkpoint.hash); - return; - } - - this.headersFirst = false; - this.nextCheckpoint = null; - this.headerChain.reset(); - this.headerNext = null; - - peer.sendGetBlocks([hash], null); -}); - -/** - * Log sync status. - * @private - * @param {Block} block - */ - -Pool.prototype.logStatus = function logStatus(block) { - if (this.logger.level >= 4 && this.chain.total % 20 === 0) { - this.logger.debug('Status:' - + ' ts=%s height=%d progress=%s' - + ' blocks=%d orphans=%d active=%d' - + ' target=%s peers=%d' - + ' pending=%d jobs=%d', - util.date(block.ts), - this.chain.height, - (this.chain.getProgress() * 100).toFixed(2) + '%', - this.chain.total, - this.chain.orphanCount, - this.requestMap.size, - block.bits, - this.peers.size(), - this.locker.pending, - this.locker.jobs.length); - } - - if (this.chain.total % 2000 === 0) { - this.logger.info( - 'Received 2000 more blocks (height=%d, hash=%s).', - this.chain.height, - block.rhash()); - } -}; - -/** - * Handle a transaction. Attempt to add to mempool. - * @private - * @param {Peer} peer - * @param {TX} tx - * @returns {Promise} - */ - -Pool.prototype.handleTX = co(function* handleTX(peer, tx) { - var hash = tx.hash('hex'); - var unlock = yield this.locker.lock(hash); - try { - return yield this._handleTX(peer, tx); - } finally { - unlock(); - } -}); - -/** - * Handle a transaction. Attempt to add to mempool (without a lock). - * @private - * @param {Peer} peer - * @param {TX} tx - * @returns {Promise} - */ - -Pool.prototype._handleTX = co(function* handleTX(peer, tx) { - var hash = tx.hash('hex'); - var requested = this.fulfill(peer, hash); - var missing; - - if (!requested) { - this.logger.warning('Peer sent unrequested tx: %s (%s).', - tx.txid(), peer.hostname); - - peer.invFilter.add(tx.hash()); - } - - if (!this.mempool) { - if (!requested) - this.txFilter.add(tx.hash()); - this.emit('tx', tx, peer); - return; - } - - if (!requested) { - if (this.mempool.hasReject(tx.hash())) { - throw new VerifyError(tx, - 'alreadyknown', - 'txn-already-in-mempool', - 0); - } - } - - try { - missing = yield this.mempool.addTX(tx); - } catch (err) { - if (err.type === 'VerifyError') { - peer.reject(tx, err.code, err.reason, err.score); - this.emit('error', err, peer); - return; - } - throw err; - } - - if (missing) { - this.logger.debug( - 'Requesting %d missing transactions (%s).', - missing.length, peer.hostname); - - this.getTX(peer, missing); - } - - this.emit('tx', tx, peer); -}); - -/** - * Handle `headers` packet from a given peer. - * @private - * @param {Peer} peer - * @param {Headers[]} headers - * @returns {Promise} - */ - -Pool.prototype.handleHeaders = co(function* handleHeaders(peer, headers) { - var unlock = yield this.locker.lock(); - try { - return yield this._handleHeaders(peer, headers); - } finally { - unlock(); - } -}); - -/** - * Handle `headers` packet from - * a given peer without a lock. - * @private - * @param {Peer} peer - * @param {Headers[]} headers - * @returns {Promise} - */ - -Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) { - var isCheckpoint = false; - var i, header, hash, height, last, node; - - if (!this.headersFirst) - return; - - if (!this.syncing) - return; - - if (!peer.isLoader()) - return; - - if (headers.length > 2000) { - this.increaseBan(100); - return; - } - - if (headers.length === 0) - return; - - assert(this.headerChain.size > 0); - - for (i = 0; i < headers.length; i++) { - header = headers[i]; - hash = header.hash('hex'); - last = this.headerChain.tail; - height = last.height + 1; - - if (header.prevBlock !== last.hash) { - peer.increaseBan(100); - throw new Error('Bad header chain.'); - } - - if (!header.verify()) { - peer.increaseBan(100); - throw new Error('Invalid header.'); - } - - node = new BlockNode(hash, last.height + 1); - - if (!this.headerNext) - this.headerNext = node; - - if (node.height === this.nextCheckpoint.height) { - if (node.hash !== this.nextCheckpoint.hash) { - peer.increaseBan(100); - throw new Error('Bad checkpoint header.'); - } - isCheckpoint = true; - } - - this.headerChain.push(node); - } - - this.emit('headers', headers, peer); - - this.logger.debug( - 'Received %s headers from peer (%s).', - headers.length, - peer.hostname); - - // Request the hashes we just added. - if (isCheckpoint) { - this.headerChain.shift(); - yield this.resolveHeaders(peer); - return; - } - - // Restart the getheaders process. - peer.sendGetHeaders([node.hash], this.nextCheckpoint.hash); -}); - -/** - * Request current header chain blocks. - * @param {Peer} peer - * @returns {Promise} - */ - -Pool.prototype.resolveHeaders = co(function* resolveHeaders(peer) { - var items = []; - var node; - - if (!this.headerNext) - return; - - for (node = this.headerNext; node; node = node.next) { - this.headerNext = node.next; - - if (yield this.hasBlock(node.hash)) - continue; - - items.push(node.hash); - } - - // Checkpoints should never be - // spaced more than 50k blocks - // apart from each other. - assert(items.length <= 50000); - - this.getBlock(peer, items); -}); - -/** - * Find the next highest checkpoint. - * @private - * @param {Number} height - * @returns {Object} - */ - -Pool.prototype.getNextCheckpoint = function getNextCheckpoint(height) { - var i, next; - - for (i = 0; i < this.checkpoints.length; i++) { - next = this.checkpoints[i]; - if (next.height > height) - return next; - } -}; - /** * Handle `inv` packet. * @private * @param {Peer} peer - * @param {InvItem[]} items + * @param {InvPacket} packet */ -Pool.prototype.handleInv = co(function* handleInv(peer, items) { +Pool.prototype.handleInv = co(function* handleInv(peer, packet) { var unlock = yield this.locker.lock(); try { - return yield this._handleInv(peer, items); + return yield this._handleInv(peer, packet); } finally { unlock(); } @@ -1210,10 +1118,11 @@ Pool.prototype.handleInv = co(function* handleInv(peer, items) { * Handle `inv` packet (without a lock). * @private * @param {Peer} peer - * @param {InvItem[]} items + * @param {InvPacket} packet */ -Pool.prototype._handleInv = co(function* handleInv(peer, items) { +Pool.prototype._handleInv = co(function* handleInv(peer, packet) { + var items = packet.items; var blocks = []; var txs = []; var unknown = -1; @@ -1348,348 +1257,15 @@ Pool.prototype.handleTXInv = co(function* handleTXInv(peer, hashes) { this.getTX(peer, items); }); -/** - * Announce broadcast list to peer. - * @param {Peer} peer - */ - -Pool.prototype.announceList = function announceList(peer) { - var blocks = []; - var txs = []; - var hashes = this.invMap.keys(); - var i, hash, item; - - for (i = 0; i < hashes.length; i++) { - hash = hashes[i]; - item = this.invMap.get(hash); - - switch (item.type) { - case invTypes.BLOCK: - blocks.push(item.msg); - break; - case invTypes.TX: - txs.push(item.msg); - break; - default: - assert(false, 'Bad item type.'); - break; - } - } - - if (blocks.length > 0) - peer.announceBlock(blocks); - - if (txs.length > 0) - peer.announceTX(txs); -}; - -/** - * Handle peer reject event. - * @private - * @param {Peer} peer - * @param {RejectPacket} reject - */ - -Pool.prototype.handleReject = co(function* handleReject(peer, reject) { - var entry; - - this.logger.warning( - 'Received reject (%s): msg=%s code=%s reason=%s hash=%s.', - peer.hostname, - reject.message, - reject.getCode(), - reject.reason, - reject.rhash()); - - this.emit('reject', reject, peer); - - if (!reject.hash) - return; - - entry = this.invMap.get(reject.hash); - - if (!entry) - return; - - entry.reject(peer); -}); - -/** - * Handle peer notfound packet. - * @private - * @param {Peer} peer - * @param {InvItem[]} items - */ - -Pool.prototype.handleNotFound = co(function* handleNotFound(peer, items) { - var i, item; - - for (i = 0; i < items.length; i++) { - item = items[i]; - this.fulfill(peer, item.hash); - } -}); - -/** - * Handle `getaddr` packet. - * @private - * @param {Peer} peer - * @param {GetAddrPacket} packet - */ - -Pool.prototype.handleGetAddr = co(function* handleGetAddr(peer, packet) { - var items = []; - var i, addrs, addr; - - if (this.options.selfish) - return; - - if (peer.sentAddr) { - this.logger.debug('Ignoring repeated getaddr (%s).', peer.hostname); - return; - } - - peer.sentAddr = true; - - addrs = this.hosts.toArray(); - - for (i = 0; i < addrs.length; i++) { - addr = addrs[i]; - - if (!peer.addrFilter.added(addr.hostname, 'ascii')) - continue; - - items.push(addr); - - if (items.length === 1000) - break; - } - - if (items.length === 0) - return; - - this.logger.debug( - 'Sending %d addrs to peer (%s)', - items.length, - peer.hostname); - - peer.send(new packets.AddrPacket(items)); -}); - -/** - * Handle `getheaders` packet. - * @private - * @param {Peer} peer - * @param {GetHeadersPacket} packet - */ - -Pool.prototype.handleGetHeaders = co(function* handleGetHeaders(peer, packet) { - var headers = []; - var hash, entry; - - if (!this.chain.synced) - return; - - if (this.options.selfish) - return; - - if (this.chain.options.spv) - return; - - if (this.chain.options.prune) - return; - - if (packet.locator.length > 0) { - hash = yield this.chain.findLocator(packet.locator); - if (hash) - hash = yield this.chain.db.getNextHash(hash); - } else { - hash = packet.stop; - } - - if (hash) - entry = yield this.chain.db.getEntry(hash); - - while (entry) { - headers.push(entry.toHeaders()); - - if (headers.length === 2000) - break; - - if (entry.hash === packet.stop) - break; - - entry = yield entry.getNext(); - } - - peer.sendHeaders(headers); -}); - -/** - * Handle `getblocks` packet. - * @private - * @param {Peer} - * @param {GetBlocksPacket} - */ - -Pool.prototype.handleGetBlocks = co(function* handleGetBlocks(peer, packet) { - var blocks = []; - var hash; - - if (!this.chain.synced) - return; - - if (this.options.selfish) - return; - - if (this.chain.options.spv) - return; - - if (this.chain.options.prune) - return; - - hash = yield this.chain.findLocator(packet.locator); - - if (hash) - hash = yield this.chain.db.getNextHash(hash); - - while (hash) { - blocks.push(new InvItem(invTypes.BLOCK, hash)); - - if (hash === packet.stop) - break; - - if (blocks.length === 500) { - peer.hashContinue = hash; - break; - } - - hash = yield this.chain.db.getNextHash(hash); - } - - peer.sendInv(blocks); -}); - -/** - * Get a block/tx from the broadcast map. - * @private - * @param {Peer} peer - * @param {InvItem} item - * @returns {Promise} - */ - -Pool.prototype.getBroadcasted = function getBroadcasted(peer, item) { - var type = item.isTX() ? invTypes.TX : invTypes.BLOCK; - var entry = this.invMap.get(item.hash); - - if (!entry) - return; - - if (type !== entry.type) { - this.logger.debug( - 'Peer requested item with the wrong type (%s).', - peer.hostname); - return; - } - - this.logger.debug( - 'Peer requested %s %s as a %s packet (%s).', - item.isTX() ? 'tx' : 'block', - item.rhash(), - item.hasWitness() ? 'witness' : 'normal', - peer.hostname); - - entry.ack(peer); - - return entry.msg; -}; - -/** - * Get a block/tx either from the broadcast map, mempool, or blockchain. - * @private - * @param {Peer} peer - * @param {InvItem} item - * @returns {Promise} - */ - -Pool.prototype.getItem = co(function* getItem(peer, item) { - var entry = this.getBroadcasted(peer, item); - - if (entry) - return entry; - - if (this.options.selfish) - return; - - if (item.isTX()) { - if (!this.mempool) - return; - return this.mempool.getTX(item.hash); - } - - if (this.chain.options.spv) - return; - - if (this.chain.options.prune) - return; - - return yield this.chain.db.getBlock(item.hash); -}); - -/** - * Send a block from the broadcast list or chain. - * @private - * @param {Peer} peer - * @param {InvItem} item - * @returns {Boolean} - */ - -Pool.prototype.sendBlock = co(function* sendBlock(peer, item, witness) { - var block = this.getBroadcasted(peer, item); - - // Check for a broadcasted item first. - if (block) { - peer.send(new packets.BlockPacket(block, witness)); - return true; - } - - if (this.options.selfish - || this.chain.options.spv - || this.chain.options.prune) { - return false; - } - - // If we have the same serialization, we - // can write the raw binary to the socket. - if (witness === this.chain.options.witness) { - block = yield this.chain.db.getRawBlock(item.hash); - - if (block) { - peer.sendRaw('block', block); - return true; - } - - return false; - } - - block = yield this.chain.db.getBlock(item.hash); - - if (block) { - peer.send(new packets.BlockPacket(block, witness)); - return true; - } - - return false; -}); - /** * Handle `getdata` packet. * @private * @param {Peer} peer - * @param {InvItem[]} items + * @param {GetDataPacket} packet */ -Pool.prototype.handleGetData = co(function* handleGetData(peer, items) { +Pool.prototype.handleGetData = co(function* handleGetData(peer, packet) { + var items = packet.items; var notFound = []; var txs = 0; var blocks = 0; @@ -1826,14 +1402,651 @@ Pool.prototype.handleGetData = co(function* handleGetData(peer, items) { this.emit('getdata', items, peer); }); +/** + * Handle peer notfound packet. + * @private + * @param {Peer} peer + * @param {NotFoundPacket} packet + */ + +Pool.prototype.handleNotFound = co(function* handleNotFound(peer, packet) { + var items = packet.items; + var i, item; + + for (i = 0; i < items.length; i++) { + item = items[i]; + this.fulfill(peer, item.hash); + } +}); + +/** + * Handle `getblocks` packet. + * @private + * @param {Peer} peer + * @param {GetBlocksPacket} packet + */ + +Pool.prototype.handleGetBlocks = co(function* handleGetBlocks(peer, packet) { + var blocks = []; + var hash; + + if (!this.chain.synced) + return; + + if (this.options.selfish) + return; + + if (this.chain.options.spv) + return; + + if (this.chain.options.prune) + return; + + hash = yield this.chain.findLocator(packet.locator); + + if (hash) + hash = yield this.chain.db.getNextHash(hash); + + while (hash) { + blocks.push(new InvItem(invTypes.BLOCK, hash)); + + if (hash === packet.stop) + break; + + if (blocks.length === 500) { + peer.hashContinue = hash; + break; + } + + hash = yield this.chain.db.getNextHash(hash); + } + + peer.sendInv(blocks); +}); + +/** + * Handle `getheaders` packet. + * @private + * @param {Peer} peer + * @param {GetHeadersPacket} packet + */ + +Pool.prototype.handleGetHeaders = co(function* handleGetHeaders(peer, packet) { + var headers = []; + var hash, entry; + + if (!this.chain.synced) + return; + + if (this.options.selfish) + return; + + if (this.chain.options.spv) + return; + + if (this.chain.options.prune) + return; + + if (packet.locator.length > 0) { + hash = yield this.chain.findLocator(packet.locator); + if (hash) + hash = yield this.chain.db.getNextHash(hash); + } else { + hash = packet.stop; + } + + if (hash) + entry = yield this.chain.db.getEntry(hash); + + while (entry) { + headers.push(entry.toHeaders()); + + if (headers.length === 2000) + break; + + if (entry.hash === packet.stop) + break; + + entry = yield entry.getNext(); + } + + peer.sendHeaders(headers); +}); + +/** + * Handle `headers` packet from a given peer. + * @private + * @param {Peer} peer + * @param {HeadersPacket} packet + * @returns {Promise} + */ + +Pool.prototype.handleHeaders = co(function* handleHeaders(peer, packet) { + var unlock = yield this.locker.lock(); + try { + return yield this._handleHeaders(peer, packet); + } finally { + unlock(); + } +}); + +/** + * Handle `headers` packet from + * a given peer without a lock. + * @private + * @param {Peer} peer + * @param {HeadersPacket} packet + * @returns {Promise} + */ + +Pool.prototype._handleHeaders = co(function* handleHeaders(peer, packet) { + var headers = packet.items; + var isCheckpoint = false; + var i, header, hash, height, last, node; + + if (!this.headersFirst) + return; + + if (!this.syncing) + return; + + if (!peer.isLoader()) + return; + + if (headers.length > 2000) { + this.increaseBan(100); + return; + } + + if (headers.length === 0) + return; + + assert(this.headerChain.size > 0); + + for (i = 0; i < headers.length; i++) { + header = headers[i]; + hash = header.hash('hex'); + last = this.headerChain.tail; + height = last.height + 1; + + if (header.prevBlock !== last.hash) { + peer.increaseBan(100); + throw new Error('Bad header chain.'); + } + + if (!header.verify()) { + peer.increaseBan(100); + throw new Error('Invalid header.'); + } + + node = new BlockNode(hash, last.height + 1); + + if (!this.headerNext) + this.headerNext = node; + + if (node.height === this.nextCheckpoint.height) { + if (node.hash !== this.nextCheckpoint.hash) { + peer.increaseBan(100); + throw new Error('Bad checkpoint header.'); + } + isCheckpoint = true; + } + + this.headerChain.push(node); + } + + this.emit('headers', headers, peer); + + this.logger.debug( + 'Received %s headers from peer (%s).', + headers.length, + peer.hostname); + + // Request the hashes we just added. + if (isCheckpoint) { + this.headerChain.shift(); + yield this.resolveHeaders(peer); + return; + } + + // Restart the getheaders process. + peer.sendGetHeaders([node.hash], this.nextCheckpoint.hash); +}); + +/** + * Handle `sendheaders` packet. + * @private + * @param {Peer} peer + * @param {SendHeadersPacket} packet + * @returns {Promise} + */ + +Pool.prototype.handleSendHeaders = co(function* handleSendHeaders(peer, packet) { + this.emit('sendheader', packet, peer); +}); + +/** + * Handle `block` packet. Attempt to add to chain. + * @private + * @param {Peer} peer + * @param {BlockPacket} packet + * @returns {Promise} + */ + +Pool.prototype.handleBlock = co(function* handleBlock(peer, packet) { + if (this.options.spv) { + this.logger.warning( + 'Peer sent unsolicited block (%s).', + this.hostname); + return; + } + + return yield this.addBlock(peer, packet.block); +}); + +/** + * Handle `block` packet. Attempt to add to chain. + * @private + * @param {Peer} peer + * @param {Block} block + * @returns {Promise} + */ + +Pool.prototype.addBlock = co(function* addBlock(peer, block) { + var hash = block.hash('hex'); + var unlock = yield this.locker.lock(hash); + try { + return yield this._addBlock(peer, block); + } finally { + unlock(); + } +}); + +/** + * Handle `block` packet. Attempt to add to chain (without a lock). + * @private + * @param {Peer} peer + * @param {Block} block + * @returns {Promise} + */ + +Pool.prototype._addBlock = co(function* addBlock(peer, block) { + var hash = block.hash('hex'); + var isCheckpoint = false; + var requested, node, checkpoint; + + if (!this.syncing) + return; + + requested = this.fulfill(peer, hash); + + // Someone is sending us blocks without + // us requesting them. + if (!requested) { + peer.invFilter.add(block.hash()); + this.logger.warning( + 'Received unrequested block: %s (%s).', + block.rhash(), peer.hostname); + return; + } + + if (this.headersFirst) { + node = this.headerChain.head; + assert(node); + + if (hash === node.hash) { + if (hash === this.nextCheckpoint.hash) { + this.logger.info( + 'Received checkpoint block %s (%d).', + hash, node.height); + isCheckpoint = true; + } else { + this.headerChain.shift(); + } + } + } + + peer.lastBlock = util.ms(); + + try { + yield this.chain.add(block); + } catch (err) { + if (err.type === 'VerifyError') { + if (err.reason === 'bad-prevblk') { + if (this.headersFirst) { + peer.increaseBan(10); + this.emit('error', err, peer); + return; + } + this.logger.debug('Peer sent an orphan block. Resolving.'); + yield this.resolveOrphan(peer, hash); + return; + } + peer.reject(block, err.code, err.reason, err.score); + this.emit('error', err, peer); + return; + } + throw err; + } + + this.logStatus(block); + + if (!this.headersFirst) + return; + + if (!isCheckpoint) { + yield this.resolveHeaders(peer); + return; + } + + node = this.nextCheckpoint; + checkpoint = this.getNextCheckpoint(node.height); + + if (checkpoint) { + this.nextCheckpoint = checkpoint; + peer.sendGetHeaders([node.hash], checkpoint.hash); + return; + } + + this.headersFirst = false; + this.nextCheckpoint = null; + this.headerChain.reset(); + this.headerNext = null; + + peer.sendGetBlocks([hash], null); +}); + +/** + * Log sync status. + * @private + * @param {Block} block + */ + +Pool.prototype.logStatus = function logStatus(block) { + if (this.logger.level >= 4 && this.chain.total % 20 === 0) { + this.logger.debug('Status:' + + ' ts=%s height=%d progress=%s' + + ' blocks=%d orphans=%d active=%d' + + ' target=%s peers=%d' + + ' pending=%d jobs=%d', + util.date(block.ts), + this.chain.height, + (this.chain.getProgress() * 100).toFixed(2) + '%', + this.chain.total, + this.chain.orphanCount, + this.requestMap.size, + block.bits, + this.peers.size(), + this.locker.pending, + this.locker.jobs.length); + } + + if (this.chain.total % 2000 === 0) { + this.logger.info( + 'Received 2000 more blocks (height=%d, hash=%s).', + this.chain.height, + block.rhash()); + } +}; + +/** + * Handle a transaction. Attempt to add to mempool. + * @private + * @param {Peer} peer + * @param {TXPacket} packet + * @returns {Promise} + */ + +Pool.prototype.handleTX = co(function* handleTX(peer, packet) { + var hash = packet.tx.hash('hex'); + var unlock = yield this.locker.lock(hash); + try { + return yield this._handleTX(peer, packet); + } finally { + unlock(); + } +}); + +/** + * Handle a transaction. Attempt to add to mempool (without a lock). + * @private + * @param {Peer} peer + * @param {TXPacket} packet + * @returns {Promise} + */ + +Pool.prototype._handleTX = co(function* handleTX(peer, packet) { + var tx = packet.tx; + + if (peer.lastMerkle) { + assert(peer.waitingTX > 0); + if (peer.lastMerkle.hasTX(tx)) { + peer.lastMerkle.addTX(tx); + if (--peer.waitingTX === 0) + yield peer.flushMerkle(); + return; + } + } + + var hash = tx.hash('hex'); + var requested = this.fulfill(peer, hash); + var missing; + + if (!requested) { + this.logger.warning('Peer sent unrequested tx: %s (%s).', + tx.txid(), peer.hostname); + + peer.invFilter.add(tx.hash()); + } + + if (!this.mempool) { + if (!requested) + this.txFilter.add(tx.hash()); + this.emit('tx', tx, peer); + return; + } + + if (!requested) { + if (this.mempool.hasReject(tx.hash())) { + throw new VerifyError(tx, + 'alreadyknown', + 'txn-already-in-mempool', + 0); + } + } + + try { + missing = yield this.mempool.addTX(tx); + } catch (err) { + if (err.type === 'VerifyError') { + peer.reject(tx, err.code, err.reason, err.score); + this.emit('error', err, peer); + return; + } + throw err; + } + + if (missing) { + this.logger.debug( + 'Requesting %d missing transactions (%s).', + missing.length, peer.hostname); + + this.getTX(peer, missing); + } + + this.emit('tx', tx, peer); +}); + +/** + * Handle peer reject event. + * @private + * @param {Peer} peer + * @param {RejectPacket} packet + */ + +Pool.prototype.handleReject = co(function* handleReject(peer, packet) { + var entry; + + this.logger.warning( + 'Received reject (%s): msg=%s code=%s reason=%s hash=%s.', + peer.hostname, + packet.message, + packet.getCode(), + packet.reason, + packet.rhash()); + + this.emit('reject', packet, peer); + + if (!packet.hash) + return; + + entry = this.invMap.get(packet.hash); + + if (!entry) + return; + + entry.reject(peer); +}); + +/** + * Handle `mempool` packet. + * @private + * @param {Peer} peer + * @param {MempoolPacket} packet + */ + +Pool.prototype.handleMempool = co(function* handleMempool(peer, packet) { + var items = []; + var i, hashes; + + if (!this.mempool) + return; + + if (!this.chain.synced) + return; + + if (this.options.selfish) + return; + + hashes = this.mempool.getSnapshot(); + + for (i = 0; i < hashes.length; i++) + items.push(new InvItem(invTypes.TX, hashes[i])); + + this.logger.debug('Sending mempool snapshot (%s).', peer.hostname); + + this.emit('mempool', peer); + + peer.sendInv(items); +}); + +/** + * Handle `filterload` packet. + * @private + * @param {Peer} peer + * @param {FilterLoadPacket} packet + */ + +Pool.prototype.handleFilterLoad = co(function* handleFilterLoad(peer, packet) { + this.emit('filterload', packet, peer); +}); + +/** + * Handle `filteradd` packet. + * @private + * @param {Peer} peer + * @param {FilterAddPacket} packet + */ + +Pool.prototype.handleFilterAdd = co(function* handleFilterAdd(peer, packet) { + this.emit('filteradd', packet, peer); +}); + +/** + * Handle `filterclear` packet. + * @private + * @param {Peer} peer + * @param {FilterClearPacket} packet + */ + +Pool.prototype.handleFilterClear = co(function* handleFilterClear(peer, packet) { + this.emit('filterclear', peer, packet); +}); + +/** + * Handle `merkleblock` packet. + * @private + * @param {Peer} peer + * @param {MerkleBlockPacket} block + */ + +Pool.prototype.handleMerkleBlock = co(function* handleMerkleBlock(peer, packet) { + var block = packet.block; + var ret = new VerifyResult(); + + // Potential DoS. + if (!this.options.spv) { + this.logger.warning( + 'Peer sent unsolicited merkleblock (%s).', + peer.hostname); + peer.increaseBan(100); + return; + } + + if (peer.lastMerkle) { + this.logger.warning( + 'Peer sent a merkleblock prematurely (%s).', + peer.hostname); + peer.increaseBan(100); + return; + } + + if (!block.verify(ret)) { + this.logger.warning( + 'Peer sent an invalid merkleblock (%s).', + peer.hostname); + peer.reject(block, 'invalid', ret.reason, ret.score); + return; + } + + peer.lastMerkle = block; + peer.waitingTX = block.matches.length; + + if (peer.waitingTX === 0) + yield peer.flushMerkle(); +}); + +/** + * Handle `sendcmpct` packet. + * @private + * @param {Peer} peer + * @param {FeeFilterPacket} packet + */ + +Pool.prototype.handleFeeFilter = co(function* handleFeeFilter(peer, packet) { + this.emit('feefilter', packet, peer); +}); + +/** + * Handle `sendcmpct` packet. + * @private + * @param {Peer} peer + * @param {SendCmpctPacket} packet + */ + +Pool.prototype.handleSendCmpct = co(function* handleSendCmpct(peer, packet) { + this.emit('sendcmpct', packet, peer); +}); + /** * Handle `cmpctblock` packet. * @private * @param {Peer} peer - * @param {CompactBlock} block + * @param {CompactBlockPacket} packet */ -Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, block) { +Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, packet) { + var block = packet.block; var hash = block.hash('hex'); var witness = this.options.witness; var result; @@ -1869,7 +2082,7 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, block) { this.logger.debug( 'Received full compact block %s (%s).', block.rhash(), peer.hostname); - yield this.handleBlock(peer, block.toBlock()); + yield this.addBlock(peer, block.toBlock()); return; } @@ -1893,49 +2106,15 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, block) { peer.send(new packets.GetBlockTxnPacket(block.toRequest())); }); -/** - * Handle `blocktxn` packet. - * @private - * @param {Peer} peer - * @param {TXResponse} res - */ - -Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(peer, res) { - var block = peer.compactBlocks.get(res.hash); - - if (!block) { - this.logger.debug('Peer sent unsolicited blocktxn (%s).', peer.hostname); - return; - } - - peer.compactBlocks.remove(res.hash); - - assert(this.compactBlocks.has(res.hash)); - this.compactBlocks.remove(res.hash); - - if (!block.fillMissing(res)) { - peer.increaseBan(100); - this.logger.warning('Peer sent non-full blocktxn (%s).', peer.hostname); - return; - } - - this.logger.debug( - 'Filled compact block %s (%s).', - block.rhash(), peer.hostname); - - this.emit('blocktxn', res, peer); - - yield this.handleBlock(peer, block.toBlock()); -}); - /** * Handle `getblocktxn` packet. * @private * @param {Peer} peer - * @param {TXRequest} req + * @param {GetBlockTxnPacket} packet */ -Pool.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(peer, req) { +Pool.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(peer, packet) { + var req = packet.request; var res, item, block, height; if (this.chain.options.spv) @@ -1976,35 +2155,106 @@ Pool.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(peer, req) { }); /** - * Handle `mempool` packet. + * Handle `blocktxn` packet. * @private * @param {Peer} peer - * @param {MempoolPacket} packet + * @param {BlockTxnPacket} packet */ -Pool.prototype.handleMempool = co(function* handleMempool(peer, packet) { - var items = []; - var i, hashes; +Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(peer, packet) { + var res = packet.response; + var block = peer.compactBlocks.get(res.hash); - if (!this.mempool) + if (!block) { + this.logger.debug('Peer sent unsolicited blocktxn (%s).', peer.hostname); return; + } - if (!this.chain.synced) + peer.compactBlocks.remove(res.hash); + + assert(this.compactBlocks.has(res.hash)); + this.compactBlocks.remove(res.hash); + + if (!block.fillMissing(res)) { + peer.increaseBan(100); + this.logger.warning('Peer sent non-full blocktxn (%s).', peer.hostname); return; + } - if (this.options.selfish) - return; + this.logger.debug( + 'Filled compact block %s (%s).', + block.rhash(), peer.hostname); - hashes = this.mempool.getSnapshot(); + this.emit('blocktxn', res, peer); - for (i = 0; i < hashes.length; i++) - items.push(new InvItem(invTypes.TX, hashes[i])); + yield this.addBlock(peer, block.toBlock()); +}); - this.logger.debug('Sending mempool snapshot (%s).', peer.hostname); +/** + * Handle `encinit` packet. + * @private + * @param {Peer} peer + * @param {EncinitPacket} packet + */ - this.emit('mempool', peer); +Peer.prototype.handleEncinit = co(function* handleEncinit(peer, packet) { + this.emit('encinit', packet, peer); +}); - peer.sendInv(items); +/** + * Handle `encack` packet. + * @private + * @param {Peer} peer + * @param {EncackPacket} packet + */ + +Peer.prototype.handleEncack = co(function* handleEncack(peer, packet) { + this.emit('encack', packet, peer); +}); + +/** + * Handle `authchallenge` packet. + * @private + * @param {Peer} peer + * @param {AuthChallengePacket} packet + */ + +Peer.prototype.handleAuthChallenge = co(function* handleAuthChallenge(peer, packet) { + this.emit('authchallenge', packet, peer); +}); + +/** + * Handle `authreply` packet. + * @private + * @param {Peer} peer + * @param {AuthReplyPacket} packet + */ + +Peer.prototype.handleAuthReply = co(function* handleAuthReply(peer, packet) { + this.emit('authreply', packet, peer); +}); + +/** + * Handle `authpropose` packet. + * @private + * @param {Peer} peer + * @param {AuthProposePacket} packet + */ + +Peer.prototype.handleAuthPropose = co(function* handleAuthPropose(peer, packet) { + this.emit('authpropose', packet, peer); +}); + +/** + * Handle `unknown` packet. + * @private + * @param {Peer} peer + * @param {UnknownPacket} packet + */ + +Peer.prototype.handleUnknown = co(function* handleUnknown(peer, packet) { + this.logger.warning('Unknown packet: %s.', packet.cmd); + this.emit('unknown', packet, peer); }); /**