From 2d952306e6484958bc228f62477657a1bdc6cece Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Thu, 19 Jan 2017 13:44:01 -0800 Subject: [PATCH] net: refactor packet handling. --- lib/net/packets.js | 5 - lib/net/peer.js | 866 ++++------------------------------------ lib/net/pool.js | 958 +++++++++++++++++++++++++++++++++++++++------ 3 files changed, 917 insertions(+), 912 deletions(-) diff --git a/lib/net/packets.js b/lib/net/packets.js index 6358d32c..20992a34 100644 --- a/lib/net/packets.js +++ b/lib/net/packets.js @@ -10,18 +10,13 @@ var common = require('./common'); var util = require('../utils/util'); var assert = require('assert'); -var crypto = require('../crypto/crypto'); -var ec = require('../crypto/ec'); var Bloom = require('../utils/bloom'); var bip152 = require('./bip152'); var NetAddress = require('../primitives/netaddress'); -var Coin = require('../primitives/coin'); var Headers = require('../primitives/headers'); var InvItem = require('../primitives/invitem'); var MemBlock = require('../primitives/memblock'); var MerkleBlock = require('../primitives/merkleblock'); -var Outpoint = require('../primitives/outpoint'); -var Output = require('../primitives/output'); var TX = require('../primitives/tx'); var BufferReader = require('../utils/reader'); var StaticWriter = require('../utils/staticwriter'); diff --git a/lib/net/peer.js b/lib/net/peer.js index 60f81f05..5670509c 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -608,36 +608,6 @@ Peer.prototype.finalize = co(function* finalize() { this.invTimer = setInterval(function() { self.flushInv(); }, Peer.INV_INTERVAL); - - // Advertise our address. - if (!this.pool.address.isNull() - && !this.options.selfish - && this.options.listen) { - this.send(new packets.AddrPacket([this.pool.address])); - } - - // We want compact blocks! - if (this.options.compact) { - if (this.version >= common.COMPACT_VERSION) - this.sendCompact(); - } - - // Find some more peers. - if (!this.pool.hosts.isFull()) - this.sendGetAddr(); - - // Relay our spv filter if we have one. - this.updateWatch(); - - // Announce our currently broadcasted items. - this.announceList(); - - // Set a fee rate filter. - if (this.pool.feeRate !== -1) - this.sendFeeRate(this.pool.feeRate); - - // Start syncing the chain. - this.sync(); }); /** @@ -755,40 +725,6 @@ Peer.prototype.announceTX = function announceTX(txs) { this.sendInv(inv); }; -/** - * Announce broadcast list to peer. - */ - -Peer.prototype.announceList = function announceList() { - var blocks = []; - var txs = []; - var hashes = this.pool.invMap.keys(); - var i, hash, item; - - for (i = 0; i < hashes.length; i++) { - hash = hashes[i]; - item = this.pool.invMap.get(hash); - - switch (item.type) { - case invTypes.BLOCK: - blocks.push(item.msg); - break; - case invTypes.TX: - txs.push(item.msg); - break; - default: - assert(false, 'Bad item type.'); - break; - } - } - - if (blocks.length > 0) - this.announceBlock(blocks); - - if (txs.length > 0) - this.announceTX(txs); -}; - /** * Send inv to a peer. * @param {InvItem[]} items @@ -885,6 +821,29 @@ Peer.prototype.sendHeaders = function sendHeaders(items) { } }; +/** + * Send a compact block. + * @private + * @param {Block} block + * @param {Boolean} witness + * @returns {Boolean} + */ + +Peer.prototype.sendCompactBlock = function sendCompactBlock(block, witness) { + // Try again with a new nonce + // if we get a siphash collision. + for (;;) { + try { + block = BIP152.CompactBlock.fromBlock(block, witness); + } catch (e) { + continue; + } + break; + } + + this.send(new packets.CmpctBlockPacket(block, witness)); +}; + /** * Send a `version` packet. */ @@ -943,14 +902,17 @@ Peer.prototype.sendPing = function sendPing() { * Send `filterload` to update the local bloom filter. */ -Peer.prototype.updateWatch = function updateWatch() { +Peer.prototype.sendFilterLoad = function sendFilterLoad(filter) { if (!this.handshake) return; if (!this.options.spv) return; - this.send(new packets.FilterLoadPacket(this.pool.spvFilter)); + if (!(this.services & services.BLOOM)) + return; + + this.send(new packets.FilterLoadPacket(filter)); }; /** @@ -1019,8 +981,6 @@ Peer.prototype.destroy = function destroy() { entry.reject(new Error('Peer was destroyed.')); } - this.compactBlocks.reset(); - this.locker.destroy(); this.emit('close', connected); @@ -1543,39 +1503,6 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { entry.resolve(packet); }); -/** - * Emit a block. - * @param {Block} block - * @returns {Promise} - */ - -Peer.prototype.emitBlock = co(function* emitBlock(block) { - this.emit('block', block); - yield this.pool.handleBlock(this, block); -}); - -/** - * Emit a block inv. - * @param {Hash[]} blocks - * @returns {Promise} - */ - -Peer.prototype.emitBlockInv = co(function* emitBlockInv(blocks) { - this.emit('blocks', blocks); - yield this.pool.handleBlockInv(this, blocks); -}); - -/** - * Emit headers. - * @param {Header[]} headers - * @returns {Promise} - */ - -Peer.prototype.emitHeaders = co(function* emitHeaders(headers) { - this.emit('headers', headers); - yield this.pool.handleHeaders(this, headers); -}); - /** * Flush merkle block once all matched * txs have been received. @@ -1586,36 +1513,14 @@ Peer.prototype.emitHeaders = co(function* emitHeaders(headers) { Peer.prototype.flushMerkle = co(function* flushMerkle() { assert(this.lastMerkle); - this.lastBlock = util.ms(); + this.emit('merkleblock', this.lastMerkle); - yield this.emitBlock(this.lastMerkle); + yield this.pool.handleBlock(this, this.lastMerkle); this.lastMerkle = null; this.waitingTX = 0; }); -/** - * Emit a transaction. - * @param {TX} tx - * @returns {Promise} - */ - -Peer.prototype.emitTX = co(function* emitTX(tx) { - this.emit('tx', tx); - yield this.pool.handleTX(this, tx); -}); - -/** - * Emit transaction inv. - * @param {Hash[]} txs - * @returns {Promise} - */ - -Peer.prototype.emitTXInv = co(function* emitTXInv(txs) { - this.emit('txs', txs); - yield this.pool.handleTXInv(this, txs); -}); - /** * Handle `filterload` packet. * @private @@ -1733,45 +1638,8 @@ Peer.prototype.handleFeeFilter = co(function* handleFeeFilter(packet) { */ Peer.prototype.handleGetHeaders = co(function* handleGetHeaders(packet) { - var headers = []; - var hash, entry; - - if (!this.chain.synced) - return; - - if (this.options.selfish) - return; - - if (this.chain.options.spv) - return; - - if (this.chain.options.prune) - return; - - if (packet.locator.length > 0) { - hash = yield this.chain.findLocator(packet.locator); - if (hash) - hash = yield this.chain.db.getNextHash(hash); - } else { - hash = packet.stop; - } - - if (hash) - entry = yield this.chain.db.getEntry(hash); - - while (entry) { - headers.push(entry.toHeaders()); - - if (headers.length === 2000) - break; - - if (entry.hash === packet.stop) - break; - - entry = yield entry.getNext(); - } - - this.sendHeaders(headers); + this.emit('getheaders', packet); + yield this.pool.handleGetHeaders(this, packet); }); /** @@ -1781,41 +1649,8 @@ Peer.prototype.handleGetHeaders = co(function* handleGetHeaders(packet) { */ Peer.prototype.handleGetBlocks = co(function* handleGetBlocks(packet) { - var blocks = []; - var hash; - - if (!this.chain.synced) - return; - - if (this.options.selfish) - return; - - if (this.chain.options.spv) - return; - - if (this.chain.options.prune) - return; - - hash = yield this.chain.findLocator(packet.locator); - - if (hash) - hash = yield this.chain.db.getNextHash(hash); - - while (hash) { - blocks.push(new InvItem(invTypes.BLOCK, hash)); - - if (hash === packet.stop) - break; - - if (blocks.length === 500) { - this.hashContinue = hash; - break; - } - - hash = yield this.chain.db.getNextHash(hash); - } - - this.sendInv(blocks); + this.emit('getblocks', packet); + yield this.pool.handleGetBlocks(this, packet); }); /** @@ -1865,9 +1700,11 @@ Peer.prototype.handleVersion = co(function* handleVersion(packet) { } } + this.emit('version', packet); + this.send(new packets.VerackPacket()); - this.emit('version', packet); + yield this.pool.handleVersion(this, packet); }); /** @@ -1878,7 +1715,7 @@ Peer.prototype.handleVersion = co(function* handleVersion(packet) { Peer.prototype.handleVerack = co(function* handleVerack(packet) { this.ack = true; - this.emit('verack'); + this.emit('verack', packet); this.logger.debug('Received verack (%s).', this.hostname); }); @@ -1889,160 +1726,10 @@ Peer.prototype.handleVerack = co(function* handleVerack(packet) { */ Peer.prototype.handleMempool = co(function* handleMempool(packet) { - var items = []; - var i, hashes; - - if (!this.mempool) - return; - - if (!this.chain.synced) - return; - - if (this.options.selfish) - return; - - hashes = this.mempool.getSnapshot(); - - for (i = 0; i < hashes.length; i++) - items.push(new InvItem(invTypes.TX, hashes[i])); - - this.logger.debug('Sending mempool snapshot (%s).', this.hostname); - - this.sendInv(items); + this.emit('mempool', packet); + yield this.pool.handleMempool(this); }); -/** - * Get a block/tx from the broadcast map. - * @private - * @param {InvItem} item - * @returns {Promise} - */ - -Peer.prototype.getBroadcasted = function getBroadcasted(item) { - var type = item.isTX() ? invTypes.TX : invTypes.BLOCK; - var entry = this.pool.invMap.get(item.hash); - - if (!entry) - return; - - if (type !== entry.type) { - this.logger.debug( - 'Peer requested item with the wrong type (%s).', - this.hostname); - return; - } - - this.logger.debug( - 'Peer requested %s %s as a %s packet (%s).', - item.isTX() ? 'tx' : 'block', - item.rhash(), - item.hasWitness() ? 'witness' : 'normal', - this.hostname); - - entry.ack(this); - - return entry.msg; -}; - -/** - * Get a block/tx either from the broadcast map, mempool, or blockchain. - * @private - * @param {InvItem} item - * @returns {Promise} - */ - -Peer.prototype.getItem = co(function* getItem(item) { - var entry = this.getBroadcasted(item); - - if (entry) - return entry; - - if (this.options.selfish) - return; - - if (item.isTX()) { - if (!this.mempool) - return; - return this.mempool.getTX(item.hash); - } - - if (this.chain.options.spv) - return; - - if (this.chain.options.prune) - return; - - return yield this.chain.db.getBlock(item.hash); -}); - -/** - * Send a block from the broadcast list or chain. - * @private - * @param {InvItem} item - * @returns {Boolean} - */ - -Peer.prototype.sendBlock = co(function* sendBlock(item, witness) { - var block = this.getBroadcasted(item); - - // Check for a broadcasted item first. - if (block) { - this.send(new packets.BlockPacket(block, witness)); - return true; - } - - if (this.options.selfish - || this.chain.options.spv - || this.chain.options.prune) { - return false; - } - - // If we have the same serialization, we - // can write the raw binary to the socket. - if (witness === this.chain.options.witness) { - block = yield this.chain.db.getRawBlock(item.hash); - - if (!block) - return false; - - this.sendRaw('block', block); - - return true; - } - - block = yield this.chain.db.getBlock(item.hash); - - if (!block) - return false; - - this.send(new packets.BlockPacket(block, witness)); - - return true; -}); - -/** - * Send a compact block. - * @private - * @param {Block} block - * @param {Boolean} witness - * @returns {Boolean} - */ - -Peer.prototype.sendCompactBlock = function sendCompactBlock(block, witness) { - // Try again with a new nonce - // if we get a siphash collision. - for (;;) { - try { - block = BIP152.CompactBlock.fromBlock(block, witness); - } catch (e) { - continue; - } - break; - } - - this.send(new packets.CmpctBlockPacket(block, witness)); -}; - /** * Handle `getdata` packet. * @private @@ -2050,139 +1737,8 @@ Peer.prototype.sendCompactBlock = function sendCompactBlock(block, witness) { */ Peer.prototype.handleGetData = co(function* handleGetData(packet) { - var notFound = []; - var txs = 0; - var blocks = 0; - var unknown = -1; - var items = packet.items; - var i, j, item, tx, block, result, height; - - if (items.length > 50000) - throw new Error('getdata size too large (' + items.length + ').'); - - for (i = 0; i < items.length; i++) { - item = items[i]; - - if (item.isTX()) { - tx = yield this.getItem(item); - - if (!tx) { - notFound.push(item); - continue; - } - - // Coinbases are an insta-ban from any node. - // This should technically never happen, but - // it's worth keeping here just in case. A - // 24-hour ban from any node is rough. - if (tx.isCoinbase()) { - notFound.push(item); - this.logger.warning('Failsafe: tried to relay a coinbase.'); - continue; - } - - this.send(new packets.TXPacket(tx, item.hasWitness())); - - txs++; - - continue; - } - - switch (item.type) { - case invTypes.BLOCK: - case invTypes.WITNESS_BLOCK: - result = yield this.sendBlock(item, item.hasWitness()); - if (!result) { - notFound.push(item); - continue; - } - blocks++; - break; - case invTypes.FILTERED_BLOCK: - case invTypes.WITNESS_FILTERED_BLOCK: - if (!this.spvFilter) { - notFound.push(item); - continue; - } - - block = yield this.getItem(item); - - if (!block) { - notFound.push(item); - continue; - } - - block = block.toMerkle(this.spvFilter); - - this.send(new packets.MerkleBlockPacket(block)); - - for (j = 0; j < block.txs.length; j++) { - tx = block.txs[j]; - this.send(new packets.TXPacket(tx, item.hasWitness())); - txs++; - } - - blocks++; - - break; - case invTypes.CMPCT_BLOCK: - height = yield this.chain.db.getHeight(item.hash); - - // Fallback to full block. - if (height < this.chain.tip.height - 10) { - result = yield this.sendBlock(item, this.compactWitness); - if (!result) { - notFound.push(item); - continue; - } - blocks++; - break; - } - - block = yield this.getItem(item); - - if (!block) { - notFound.push(item); - continue; - } - - this.sendCompactBlock(block, this.compactWitness); - - blocks++; - - break; - default: - unknown = item.type; - notFound.push(item); - continue; - } - - if (item.hash === this.hashContinue) { - this.sendInv([new InvItem(invTypes.BLOCK, this.chain.tip.hash)]); - this.hashContinue = null; - } - } - - if (notFound.length > 0) - this.send(new packets.NotFoundPacket(notFound)); - - if (txs > 0) { - this.logger.debug( - 'Served %d txs with getdata (notfound=%d) (%s).', - txs, notFound.length, this.hostname); - } - - if (blocks > 0) { - this.logger.debug( - 'Served %d blocks with getdata (notfound=%d) (%s).', - blocks, notFound.length, this.hostname); - } - - if (unknown !== -1) { - this.logger.warning( - 'Peer sent an unknown getdata type: %s (%d).', - unknown, this.hostname); - } + this.emit('getdata', packet.items); + yield this.pool.handleGetData(this, packet.items); }); /** @@ -2193,6 +1749,7 @@ Peer.prototype.handleGetData = co(function* handleGetData(packet) { Peer.prototype.handleNotFound = co(function* handleNotFound(packet) { this.emit('notfound', packet.items); + yield this.pool.handleNotFound(this, packet.items); }); /** @@ -2202,27 +1759,8 @@ Peer.prototype.handleNotFound = co(function* handleNotFound(packet) { */ Peer.prototype.handleAddr = co(function* handleAddr(packet) { - var now = this.network.now(); - var addrs = packet.items; - var i, addr; - - for (i = 0; i < addrs.length; i++) { - addr = addrs[i]; - - if (addr.ts <= 100000000 || addr.ts > now + 10 * 60) - addr.ts = now - 5 * 24 * 60 * 60; - - this.addrFilter.add(addr.hostname, 'ascii'); - } - - this.logger.info( - 'Received %d addrs (hosts=%d, peers=%d) (%s).', - addrs.length, - this.pool.hosts.size(), - this.pool.peers.size(), - this.hostname); - - this.emit('addr', addrs); + this.emit('addr', packet.items); + yield this.pool.handleAddr(this, packet.items); }); /** @@ -2283,42 +1821,8 @@ Peer.prototype.handlePong = co(function* handlePong(packet) { */ Peer.prototype.handleGetAddr = co(function* handleGetAddr(packet) { - var items = []; - var i, addrs, addr; - - if (this.options.selfish) - return; - - if (this.sentAddr) { - this.logger.debug('Ignoring repeated getaddr (%s).', this.hostname); - return; - } - - this.sentAddr = true; - - addrs = this.pool.hosts.toArray(); - - for (i = 0; i < addrs.length; i++) { - addr = addrs[i]; - - if (!this.addrFilter.added(addr.hostname, 'ascii')) - continue; - - items.push(addr); - - if (items.length === 1000) - break; - } - - if (items.length === 0) - return; - - this.logger.debug( - 'Sending %d addrs to peer (%s)', - items.length, - this.hostname); - - this.send(new packets.AddrPacket(items)); + this.emit('getaddr', packet); + yield this.pool.handleGetAddr(this, packet); }); /** @@ -2328,50 +1832,8 @@ Peer.prototype.handleGetAddr = co(function* handleGetAddr(packet) { */ Peer.prototype.handleInv = co(function* handleInv(packet) { - var items = packet.items; - var blocks = []; - var txs = []; - var unknown = -1; - var i, item; - - if (items.length > 50000) { - this.increaseBan(100); - return; - } - - for (i = 0; i < items.length; i++) { - item = items[i]; - switch (item.type) { - case invTypes.BLOCK: - blocks.push(item.hash); - break; - case invTypes.TX: - txs.push(item.hash); - break; - default: - unknown = item.type; - continue; - } - this.invFilter.add(item.hash, 'hex'); - } - - this.emit('inv', items); - - if (blocks.length > 0) - yield this.emitBlockInv(blocks); - - if (txs.length > 0) - yield this.emitTXInv(txs); - - this.logger.debug( - 'Received inv packet with %d items: blocks=%d txs=%d (%s).', - items.length, blocks.length, txs.length, this.hostname); - - if (unknown !== -1) { - this.logger.warning( - 'Peer sent an unknown inv type: %d (%s).', - unknown, this.hostname); - } + this.emit('inv', packet.items); + yield this.pool.handleInv(this, packet.items); }); /** @@ -2381,18 +1843,8 @@ Peer.prototype.handleInv = co(function* handleInv(packet) { */ Peer.prototype.handleHeaders = co(function* handleHeaders(packet) { - var headers = packet.items; - - this.logger.debug( - 'Received headers packet with %d items (%s).', - headers.length, this.hostname); - - if (headers.length > 2000) { - this.increaseBan(100); - return; - } - - yield this.emitHeaders(headers); + this.emit('headers', packet.items); + yield this.pool.handleHeaders(this, packet.items); }); /** @@ -2420,9 +1872,9 @@ Peer.prototype.handleBlock = co(function* handleBlock(packet) { return; } - this.lastBlock = util.ms(); + this.emit('block', packet.block); - yield this.emitBlock(packet.block); + yield this.pool.handleBlock(this, packet.block); }); /** @@ -2444,7 +1896,9 @@ Peer.prototype.handleTX = co(function* handleTX(packet) { } } - yield this.emitTX(tx); + this.emit('tx', tx); + + yield this.pool.handleTX(this, tx); }); /** @@ -2454,19 +1908,8 @@ Peer.prototype.handleTX = co(function* handleTX(packet) { */ Peer.prototype.handleReject = co(function* handleReject(reject) { - var entry; - this.emit('reject', reject); - - if (!reject.hash) - return; - - entry = this.pool.invMap.get(reject.hash); - - if (!entry) - return; - - entry.reject(this); + yield this.pool.handleReject(this, reject); }); /** @@ -2613,60 +2056,8 @@ Peer.prototype.handleSendCmpct = co(function* handleSendCmpct(packet) { */ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) { - var block = packet.block; - var hash = block.hash('hex'); - var ret = new VerifyResult(); - var result; - - if (!this.options.compact) { - this.logger.info('Peer sent unsolicited cmpctblock (%s).', this.hostname); - return; - } - - if (!this.mempool) { - this.logger.warning('Requesting compact blocks without a mempool!'); - return; - } - - if (this.compactBlocks.has(hash)) { - this.logger.debug( - 'Peer sent us a duplicate compact block (%s).', - this.hostname); - return; - } - - if (!block.verify(ret)) { - this.logger.debug( - 'Peer sent an invalid compact block (%s).', - this.hostname); - this.reject(block, 'invalid', ret.reason, ret.score); - return; - } - - result = block.fillMempool(this.options.witness, this.mempool); - - if (result) { - this.lastBlock = util.ms(); - this.logger.debug( - 'Received full compact block %s (%s).', - block.rhash(), this.hostname); - yield this.emitBlock(block.toBlock()); - return; - } - - if (this.compactBlocks.size >= 10) { - this.logger.warning('Compact block DoS attempt (%s).', this.hostname); - this.destroy(); - return; - } - - this.compactBlocks.set(hash, block); - - this.send(new packets.GetBlockTxnPacket(block.toRequest())); - - this.logger.debug( - 'Received semi-full compact block %s (%s).', - block.rhash(), this.hostname); + this.emit('cmpctblock', packet.block); + yield this.pool.handleCmpctBlock(this, packet.block); }); /** @@ -2676,44 +2067,8 @@ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) { */ Peer.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(packet) { - var req = packet.request; - var res, item, block, height; - - if (this.chain.options.spv) - return; - - if (this.chain.options.prune) - return; - - if (this.options.selfish) - return; - - item = new InvItem(invTypes.BLOCK, req.hash); - - block = yield this.getItem(item); - - if (!block) { - this.logger.debug( - 'Peer sent getblocktxn for non-existent block (%s).', - this.hostname); - this.increaseBan(100); - return; - } - - height = yield this.chain.db.getHeight(req.hash); - - if (height < this.chain.tip.height - 15) { - this.logger.debug( - 'Peer sent a getblocktxn for a block > 15 deep (%s)', - this.hostname); - return; - } - - res = BIP152.TXResponse.fromBlock(block, req); - - this.send(new packets.BlockTxnPacket(res, this.compactWitness)); - - this.emit('blocktxn', req); + this.emit('getblocktxn', packet.request); + yield this.pool.handleGetBlockTxn(this, packet.request); }); /** @@ -2723,32 +2078,8 @@ Peer.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(packet) { */ Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(packet) { - var res = packet.response; - var block = this.compactBlocks.get(res.hash); - - if (!block) { - this.logger.debug('Peer sent unsolicited blocktxn (%s).', this.hostname); - this.compactBlocks.reset(); - return; - } - - this.compactBlocks.remove(res.hash); - - if (!block.fillMissing(res)) { - this.increaseBan(100); - this.logger.warning('Peer sent non-full blocktxn (%s).', this.hostname); - return; - } - - this.logger.debug( - 'Filled compact block %s (%s).', - block.rhash(), this.hostname); - - this.lastBlock = util.ms(); - - this.emit('getblocktxn', res); - - yield this.emitBlock(block.toBlock()); + this.emit('blocktxn', packet.response); + yield this.pool.handleGetBlockTxn(this, packet.response); }); /** @@ -2908,81 +2239,32 @@ Peer.prototype.reject = function reject(obj, code, reason, score) { this.increaseBan(score); }; -/** - * Send `getblocks` to peer after building - * locator and resolving orphan root. - * @param {Hash} tip - Tip to build chain locator from. - * @param {Hash} orphan - Orphan hash to resolve. - * @returns {Promise} - */ - -Peer.prototype.resolveOrphan = co(function* resolveOrphan(tip, orphan) { - var root, locator; - - assert(orphan); - - locator = yield this.chain.getLocator(tip); - root = this.chain.getOrphanRoot(orphan); - - // Was probably resolved. - if (!root) { - this.logger.debug('Orphan root was already resolved.'); - return; - } - - this.sendGetBlocks(locator, root); -}); - -/** - * Send `getheaders` to peer after building locator. - * @param {Hash} tip - Tip to build chain locator from. - * @param {Hash?} stop - * @returns {Promise} - */ - -Peer.prototype.getHeaders = co(function* getHeaders(tip, stop) { - var locator = yield this.chain.getLocator(tip); - this.sendGetHeaders(locator, stop); -}); - -/** - * Send `getblocks` to peer after building locator. - * @param {Hash} tip - Tip hash to build chain locator from. - * @param {Hash?} stop - * @returns {Promise} - */ - -Peer.prototype.getBlocks = co(function* getBlocks(tip, stop) { - var locator = yield this.chain.getLocator(tip); - this.sendGetBlocks(locator, stop); -}); - /** * Start syncing from peer. * @returns {Promise} */ Peer.prototype.sync = co(function* sync() { - var tip, checkpoint; + var locator, tip, checkpoint; if (!this.pool.syncing) - return; + return false; if (!this.handshake) - return; + return false; if (this.syncSent) - return; + return false; if (!(this.services & services.NETWORK)) - return; + return false; if (this.options.witness && !this.hasWitness()) - return; + return false; if (!this.isLoader()) { if (!this.chain.synced) - return; + return false; } // Ask for the mempool if we're synced. @@ -2999,10 +2281,14 @@ Peer.prototype.sync = co(function* sync() { tip = this.chain.tip; checkpoint = this.pool.nextCheckpoint; this.sendGetHeaders([tip.hash], checkpoint.hash); - return; + return true; } - return yield this.getBlocks(); + locator = yield this.chain.getLocator(); + + this.sendGetBlocks(locator); + + return true; }); /** diff --git a/lib/net/pool.js b/lib/net/pool.js index 074df114..1562e19c 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -18,6 +18,7 @@ var errors = require('../protocol/errors'); var NetAddress = require('../primitives/netaddress'); var Address = require('../primitives/address'); var BIP150 = require('./bip150'); +var BIP152 = require('./bip152'); var Bloom = require('../utils/bloom'); var ec = require('../crypto/ec'); var Lock = require('../utils/lock'); @@ -30,6 +31,7 @@ var dns = require('./dns'); var HostList = require('./hostlist'); var InvItem = require('../primitives/invitem'); var Map = require('../utils/map'); +var packets = require('./packets'); var invTypes = InvItem.types; var VerifyError = errors.VerifyError; @@ -106,6 +108,7 @@ function Pool(options) { this.spvFilter = null; this.txFilter = null; this.requestMap = new Map(); + this.compactBlocks = new Map(); this.invMap = new Map(); this.scheduled = false; this.pendingWatch = null; @@ -584,28 +587,6 @@ Pool.prototype.forceSync = function forceSync() { } }; -/** - * Stop the blockchain sync. - */ - -Pool.prototype.stopSync = co(function* stopSync() { - var peer; - - if (!this.syncing) - return; - - this.syncing = false; - - if (!this.loaded) - return; - - for (peer = this.peers.head(); peer; peer = peer.next) { - if (!peer.outbound) - continue; - peer.syncSent = false; - } -}); - /** * Send `mempool` to all peers. */ @@ -688,22 +669,6 @@ Pool.prototype.bindPeer = function bindPeer(peer) { peer.on('error', function(err) { self.emit('error', err, peer); }); - - peer.on('version', function(packet) { - self.handleVersion(peer, packet); - }); - - peer.on('addr', function(addrs) { - self.handleAddr(peer, addrs); - }); - - peer.on('reject', function(reject) { - self.handleReject(peer, reject); - }); - - peer.on('notfound', function(items) { - self.handleNotFound(peer, items); - }); }; /** @@ -726,14 +691,44 @@ Pool.prototype.handleConnect = function handleConnect(peer) { */ Pool.prototype.handleOpen = function handleOpen(peer) { - if (!peer.outbound) - return; + // Advertise our address. + if (!this.address.isNull() + && !this.options.selfish + && this.options.listen) { + peer.send(new packets.AddrPacket([this.address])); + } - this.hosts.markAck(peer.hostname, peer.services); + // We want compact blocks! + if (this.options.compact) { + if (peer.version >= common.COMPACT_VERSION) + peer.sendCompact(); + } - // If we don't have an ack'd loader yet, use this peer. - if (!this.peers.load || !this.peers.load.handshake) - this.setLoader(peer); + // Find some more peers. + if (!this.hosts.isFull()) + peer.sendGetAddr(); + + // Relay our spv filter if we have one. + if (this.spvFilter) + peer.sendFilterLoad(this.spvFilter); + + // Announce our currently broadcasted items. + this.announceList(peer); + + // Set a fee rate filter. + if (this.feeRate !== -1) + peer.sendFeeRate(this.feeRate); + + // Start syncing the chain. + peer.sync(); + + if (peer.outbound) { + this.hosts.markAck(peer.hostname, peer.services); + + // If we don't have an ack'd loader yet, use this peer. + if (!this.peers.load || !this.peers.load.handshake) + this.setLoader(peer); + } }; /** @@ -770,7 +765,7 @@ Pool.prototype.handleClose = co(function* handleClose(peer, connected) { * @param {VersionPacket} packet */ -Pool.prototype.handleVersion = function handleVersion(peer, packet) { +Pool.prototype.handleVersion = co(function* handleVersion(peer, packet) { this.logger.info( 'Received version (%s): version=%d height=%d services=%s agent=%s', peer.hostname, @@ -782,7 +777,7 @@ Pool.prototype.handleVersion = function handleVersion(peer, packet) { this.network.time.add(peer.hostname, packet.ts); this.emit('version', packet, peer); -}; +}); /** * Handle peer addr event. @@ -791,29 +786,42 @@ Pool.prototype.handleVersion = function handleVersion(peer, packet) { * @param {NetAddress[]} addrs */ -Pool.prototype.handleAddr = function handleAddr(peer, addrs) { +Pool.prototype.handleAddr = co(function* handleAddr(peer, addrs) { + var now = this.network.now(); var services = this.options.requiredServices; var i, addr; this.emit('addr', addrs, peer); - if (this.options.noDiscovery) - return; - for (i = 0; i < addrs.length; i++) { addr = addrs[i]; + peer.addrFilter.add(addr.hostname, 'ascii'); + + if (this.options.noDiscovery) + continue; + if (!addr.isRoutable()) continue; if (!addr.hasServices(services)) continue; + if (addr.ts <= 100000000 || addr.ts > now + 10 * 60) + addr.ts = now - 5 * 24 * 60 * 60; + this.hosts.add(addr, peer.address); } + this.logger.info( + 'Received %d addrs (hosts=%d, peers=%d) (%s).', + addrs.length, + this.hosts.size(), + this.peers.size(), + peer.hostname); + this.fillOutbound(); -}; +}); /** * Handle `block` packet. Attempt to add to chain. @@ -877,6 +885,8 @@ Pool.prototype._handleBlock = co(function* handleBlock(peer, block) { } } + peer.lastBlock = util.ms(); + try { yield this.chain.add(block); } catch (err) { @@ -884,15 +894,15 @@ Pool.prototype._handleBlock = co(function* handleBlock(peer, block) { if (err.reason === 'bad-prevblk') { if (this.headersFirst) { peer.increaseBan(10); - this.emit('error', err); + this.emit('error', err, peer); return; } this.logger.debug('Peer sent an orphan block. Resolving.'); - yield peer.resolveOrphan(null, hash); + yield this.resolveOrphan(peer, hash); return; } peer.reject(block, err.code, err.reason, err.score); - this.emit('error', err); + this.emit('error', err, peer); return; } throw err; @@ -1017,7 +1027,7 @@ Pool.prototype._handleTX = co(function* handleTX(peer, tx) { } catch (err) { if (err.type === 'VerifyError') { peer.reject(tx, err.code, err.reason, err.score); - this.emit('error', err); + this.emit('error', err, peer); return; } throw err; @@ -1073,23 +1083,20 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) { if (!peer.isLoader()) return; - this.logger.debug( - 'Received %s headers from peer (%s).', - headers.length, - peer.hostname); - - this.emit('headers', headers); + if (headers.length > 2000) { + this.increaseBan(100); + return; + } if (headers.length === 0) return; + assert(this.headerChain.size > 0); + for (i = 0; i < headers.length; i++) { header = headers[i]; hash = header.hash('hex'); last = this.headerChain.tail; - - assert(last); - height = last.height + 1; if (header.prevBlock !== last.hash) { @@ -1118,6 +1125,13 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) { this.headerChain.push(node); } + this.emit('headers', headers, peer); + + this.logger.debug( + 'Received %s headers from peer (%s).', + headers.length, + peer.hostname); + // Request the hashes we just added. if (isCheckpoint) { this.headerChain.shift(); @@ -1176,9 +1190,77 @@ Pool.prototype.getNextCheckpoint = function getNextCheckpoint(height) { } }; +/** + * Handle `inv` packet. + * @private + * @param {Peer} peer + * @param {InvItem[]} items + */ + +Pool.prototype.handleInv = co(function* handleInv(peer, items) { + var unlock = yield this.locker.lock(); + try { + return yield this._handleInv(peer, items); + } finally { + unlock(); + } +}); + +/** + * Handle `inv` packet (without a lock). + * @private + * @param {Peer} peer + * @param {InvItem[]} items + */ + +Pool.prototype._handleInv = co(function* handleInv(peer, items) { + var blocks = []; + var txs = []; + var unknown = -1; + var i, item; + + if (items.length > 50000) { + peer.increaseBan(100); + return; + } + + for (i = 0; i < items.length; i++) { + item = items[i]; + switch (item.type) { + case invTypes.BLOCK: + blocks.push(item.hash); + break; + case invTypes.TX: + txs.push(item.hash); + break; + default: + unknown = item.type; + continue; + } + peer.invFilter.add(item.hash, 'hex'); + } + + this.logger.debug( + 'Received inv packet with %d items: blocks=%d txs=%d (%s).', + items.length, blocks.length, txs.length, peer.hostname); + + if (unknown !== -1) { + this.logger.warning( + 'Peer sent an unknown inv type: %d (%s).', + unknown, peer.hostname); + } + + this.emit('inv', items, peer); + + if (blocks.length > 0) + yield this.handleBlockInv(peer, blocks); + + if (txs.length > 0) + yield this.handleTXInv(peer, txs); +}); + /** * Handle `inv` packet from peer (containing only BLOCK types). - * Potentially request headers if headers mode is enabled. * @private * @param {Peer} peer * @param {Hash[]} hashes @@ -1186,23 +1268,6 @@ Pool.prototype.getNextCheckpoint = function getNextCheckpoint(height) { */ Pool.prototype.handleBlockInv = co(function* handleBlockInv(peer, hashes) { - var unlock = yield this.locker.lock(); - try { - return yield this._handleBlockInv(peer, hashes); - } finally { - unlock(); - } -}); - -/** - * Handle `inv` packet from peer without a lock. - * @private - * @param {Peer} peer - * @param {Hash[]} hashes - * @returns {Promise} - */ - -Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) { var items = []; var i, hash; @@ -1225,15 +1290,13 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) { hashes.length, peer.hostname); - this.emit('blocks', hashes); - for (i = 0; i < hashes.length; i++) { hash = hashes[i]; // Resolve orphan chain. if (this.chain.hasOrphan(hash)) { this.logger.debug('Received known orphan hash (%s).', peer.hostname); - yield peer.resolveOrphan(null, hash); + yield this.resolveOrphan(peer, hash); continue; } @@ -1252,7 +1315,7 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) { // the hashContinue on the remote node). if (i === hashes.length - 1) { this.logger.debug('Received existing hash (%s).', peer.hostname); - yield peer.getBlocks(hash, null); + yield this.getBlocks(peer, hash); } } @@ -1267,27 +1330,9 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) { */ Pool.prototype.handleTXInv = co(function* handleTXInv(peer, hashes) { - var unlock = yield this.locker.lock(); - try { - return yield this._handleTXInv(peer, hashes); - } finally { - unlock(); - } -}); - -/** - * Handle peer inv packet (txs). - * @private - * @param {Peer} peer - * @param {Hash[]} hashes - */ - -Pool.prototype._handleTXInv = co(function* handleTXInv(peer, hashes) { var items = []; var i, hash; - this.emit('txs', hashes, peer); - if (this.syncing && !this.chain.synced) return; @@ -1303,6 +1348,41 @@ Pool.prototype._handleTXInv = co(function* handleTXInv(peer, hashes) { this.getTX(peer, items); }); +/** + * Announce broadcast list to peer. + * @param {Peer} peer + */ + +Pool.prototype.announceList = function announceList(peer) { + var blocks = []; + var txs = []; + var hashes = this.invMap.keys(); + var i, hash, item; + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + item = this.invMap.get(hash); + + switch (item.type) { + case invTypes.BLOCK: + blocks.push(item.msg); + break; + case invTypes.TX: + txs.push(item.msg); + break; + default: + assert(false, 'Bad item type.'); + break; + } + } + + if (blocks.length > 0) + peer.announceBlock(blocks); + + if (txs.length > 0) + peer.announceTX(txs); +}; + /** * Handle peer reject event. * @private @@ -1310,7 +1390,9 @@ Pool.prototype._handleTXInv = co(function* handleTXInv(peer, hashes) { * @param {RejectPacket} reject */ -Pool.prototype.handleReject = function handleReject(peer, reject) { +Pool.prototype.handleReject = co(function* handleReject(peer, reject) { + var entry; + this.logger.warning( 'Received reject (%s): msg=%s code=%s reason=%s hash=%s.', peer.hostname, @@ -1320,36 +1402,624 @@ Pool.prototype.handleReject = function handleReject(peer, reject) { reject.rhash()); this.emit('reject', reject, peer); -}; + + if (!reject.hash) + return; + + entry = this.invMap.get(reject.hash); + + if (!entry) + return; + + entry.reject(peer); +}); /** * Handle peer notfound packet. * @private - * @param {InvItem[]} items * @param {Peer} peer + * @param {InvItem[]} items */ -Pool.prototype.handleNotFound = function handleNotFound(peer, items) { +Pool.prototype.handleNotFound = co(function* handleNotFound(peer, items) { var i, item; for (i = 0; i < items.length; i++) { item = items[i]; this.fulfill(peer, item.hash); } +}); + +/** + * Handle `getaddr` packet. + * @private + * @param {Peer} peer + * @param {GetAddrPacket} packet + */ + +Pool.prototype.handleGetAddr = co(function* handleGetAddr(peer, packet) { + var items = []; + var i, addrs, addr; + + if (this.options.selfish) + return; + + if (peer.sentAddr) { + this.logger.debug('Ignoring repeated getaddr (%s).', peer.hostname); + return; + } + + peer.sentAddr = true; + + addrs = this.hosts.toArray(); + + for (i = 0; i < addrs.length; i++) { + addr = addrs[i]; + + if (!peer.addrFilter.added(addr.hostname, 'ascii')) + continue; + + items.push(addr); + + if (items.length === 1000) + break; + } + + if (items.length === 0) + return; + + this.logger.debug( + 'Sending %d addrs to peer (%s)', + items.length, + peer.hostname); + + peer.send(new packets.AddrPacket(items)); +}); + +/** + * Handle `getheaders` packet. + * @private + * @param {Peer} peer + * @param {GetHeadersPacket} packet + */ + +Pool.prototype.handleGetHeaders = co(function* handleGetHeaders(peer, packet) { + var headers = []; + var hash, entry; + + if (!this.chain.synced) + return; + + if (this.options.selfish) + return; + + if (this.chain.options.spv) + return; + + if (this.chain.options.prune) + return; + + if (packet.locator.length > 0) { + hash = yield this.chain.findLocator(packet.locator); + if (hash) + hash = yield this.chain.db.getNextHash(hash); + } else { + hash = packet.stop; + } + + if (hash) + entry = yield this.chain.db.getEntry(hash); + + while (entry) { + headers.push(entry.toHeaders()); + + if (headers.length === 2000) + break; + + if (entry.hash === packet.stop) + break; + + entry = yield entry.getNext(); + } + + peer.sendHeaders(headers); +}); + +/** + * Handle `getblocks` packet. + * @private + * @param {Peer} + * @param {GetBlocksPacket} + */ + +Pool.prototype.handleGetBlocks = co(function* handleGetBlocks(peer, packet) { + var blocks = []; + var hash; + + if (!this.chain.synced) + return; + + if (this.options.selfish) + return; + + if (this.chain.options.spv) + return; + + if (this.chain.options.prune) + return; + + hash = yield this.chain.findLocator(packet.locator); + + if (hash) + hash = yield this.chain.db.getNextHash(hash); + + while (hash) { + blocks.push(new InvItem(invTypes.BLOCK, hash)); + + if (hash === packet.stop) + break; + + if (blocks.length === 500) { + peer.hashContinue = hash; + break; + } + + hash = yield this.chain.db.getNextHash(hash); + } + + peer.sendInv(blocks); +}); + +/** + * Get a block/tx from the broadcast map. + * @private + * @param {Peer} peer + * @param {InvItem} item + * @returns {Promise} + */ + +Pool.prototype.getBroadcasted = function getBroadcasted(peer, item) { + var type = item.isTX() ? invTypes.TX : invTypes.BLOCK; + var entry = this.invMap.get(item.hash); + + if (!entry) + return; + + if (type !== entry.type) { + this.logger.debug( + 'Peer requested item with the wrong type (%s).', + peer.hostname); + return; + } + + this.logger.debug( + 'Peer requested %s %s as a %s packet (%s).', + item.isTX() ? 'tx' : 'block', + item.rhash(), + item.hasWitness() ? 'witness' : 'normal', + peer.hostname); + + entry.ack(peer); + + return entry.msg; }; +/** + * Get a block/tx either from the broadcast map, mempool, or blockchain. + * @private + * @param {Peer} peer + * @param {InvItem} item + * @returns {Promise} + */ + +Pool.prototype.getItem = co(function* getItem(peer, item) { + var entry = this.getBroadcasted(peer, item); + + if (entry) + return entry; + + if (this.options.selfish) + return; + + if (item.isTX()) { + if (!this.mempool) + return; + return this.mempool.getTX(item.hash); + } + + if (this.chain.options.spv) + return; + + if (this.chain.options.prune) + return; + + return yield this.chain.db.getBlock(item.hash); +}); + +/** + * Send a block from the broadcast list or chain. + * @private + * @param {Peer} peer + * @param {InvItem} item + * @returns {Boolean} + */ + +Pool.prototype.sendBlock = co(function* sendBlock(peer, item, witness) { + var block = this.getBroadcasted(peer, item); + + // Check for a broadcasted item first. + if (block) { + peer.send(new packets.BlockPacket(block, witness)); + return true; + } + + if (this.options.selfish + || this.chain.options.spv + || this.chain.options.prune) { + return false; + } + + // If we have the same serialization, we + // can write the raw binary to the socket. + if (witness === this.chain.options.witness) { + block = yield this.chain.db.getRawBlock(item.hash); + + if (block) { + peer.sendRaw('block', block); + return true; + } + + return false; + } + + block = yield this.chain.db.getBlock(item.hash); + + if (block) { + peer.send(new packets.BlockPacket(block, witness)); + return true; + } + + return false; +}); + +/** + * Handle `getdata` packet. + * @private + * @param {Peer} peer + * @param {InvItem[]} items + */ + +Pool.prototype.handleGetData = co(function* handleGetData(peer, items) { + var notFound = []; + var txs = 0; + var blocks = 0; + var unknown = -1; + var i, j, item, tx, block, result, height; + + if (items.length > 50000) + throw new Error('getdata size too large (' + items.length + ').'); + + for (i = 0; i < items.length; i++) { + item = items[i]; + + if (item.isTX()) { + tx = yield this.getItem(peer, item); + + if (!tx) { + notFound.push(item); + continue; + } + + // Coinbases are an insta-ban from any node. + // This should technically never happen, but + // it's worth keeping here just in case. A + // 24-hour ban from any node is rough. + if (tx.isCoinbase()) { + notFound.push(item); + this.logger.warning('Failsafe: tried to relay a coinbase.'); + continue; + } + + peer.send(new packets.TXPacket(tx, item.hasWitness())); + + txs++; + + continue; + } + + switch (item.type) { + case invTypes.BLOCK: + case invTypes.WITNESS_BLOCK: + result = yield this.sendBlock(peer, item, item.hasWitness()); + if (!result) { + notFound.push(item); + continue; + } + blocks++; + break; + case invTypes.FILTERED_BLOCK: + case invTypes.WITNESS_FILTERED_BLOCK: + if (!peer.spvFilter) { + notFound.push(item); + continue; + } + + block = yield this.getItem(peer, item); + + if (!block) { + notFound.push(item); + continue; + } + + block = block.toMerkle(peer.spvFilter); + + peer.send(new packets.MerkleBlockPacket(block)); + + for (j = 0; j < block.txs.length; j++) { + tx = block.txs[j]; + peer.send(new packets.TXPacket(tx, item.hasWitness())); + txs++; + } + + blocks++; + + break; + case invTypes.CMPCT_BLOCK: + height = yield this.chain.db.getHeight(item.hash); + + // Fallback to full block. + if (height < this.chain.tip.height - 10) { + result = yield this.sendBlock(peer, item, peer.compactWitness); + if (!result) { + notFound.push(item); + continue; + } + blocks++; + break; + } + + block = yield this.getItem(peer, item); + + if (!block) { + notFound.push(item); + continue; + } + + peer.sendCompactBlock(block, peer.compactWitness); + + blocks++; + + break; + default: + unknown = item.type; + notFound.push(item); + continue; + } + + if (item.hash === peer.hashContinue) { + peer.sendInv([new InvItem(invTypes.BLOCK, this.chain.tip.hash)]); + peer.hashContinue = null; + } + } + + if (notFound.length > 0) + peer.send(new packets.NotFoundPacket(notFound)); + + if (txs > 0) { + this.logger.debug( + 'Served %d txs with getdata (notfound=%d) (%s).', + txs, notFound.length, peer.hostname); + } + + if (blocks > 0) { + this.logger.debug( + 'Served %d blocks with getdata (notfound=%d) (%s).', + blocks, notFound.length, peer.hostname); + } + + if (unknown !== -1) { + this.logger.warning( + 'Peer sent an unknown getdata type: %s (%d).', + unknown, peer.hostname); + } + + this.emit('getdata', items, peer); +}); + +/** + * Handle `cmpctblock` packet. + * @private + * @param {Peer} peer + * @param {CompactBlock} block + */ + +Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, block) { + var hash = block.hash('hex'); + var witness = this.options.witness; + var result; + + if (!this.options.compact) { + this.logger.info('Peer sent unsolicited cmpctblock (%s).', peer.hostname); + return; + } + + if (!this.mempool) { + this.logger.warning('Requesting compact blocks without a mempool!'); + return; + } + + if (this.compactBlocks.has(hash)) { + this.logger.debug( + 'Peer sent us a duplicate compact block (%s).', + peer.hostname); + return; + } + + if (!block.verify()) { + this.logger.debug( + 'Peer sent an invalid compact block (%s).', + peer.hostname); + peer.increaseBan(100); + return; + } + + result = block.fillMempool(witness, this.mempool); + + if (result) { + this.logger.debug( + 'Received full compact block %s (%s).', + block.rhash(), peer.hostname); + yield this.handleBlock(peer, block.toBlock()); + return; + } + + if (peer.compactBlocks.size >= 10) { + this.logger.warning('Compact block DoS attempt (%s).', peer.hostname); + peer.destroy(); + return; + } + + assert(!peer.compactBlocks.has(hash)); + peer.compactBlocks.set(hash, block); + + this.compactBlocks.insert(hash); + + this.logger.debug( + 'Received semi-full compact block %s (%s).', + block.rhash(), peer.hostname); + + this.emit('cmpctblock', block, peer); + + peer.send(new packets.GetBlockTxnPacket(block.toRequest())); +}); + +/** + * Handle `blocktxn` packet. + * @private + * @param {Peer} peer + * @param {TXResponse} res + */ + +Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(peer, res) { + var block = peer.compactBlocks.get(res.hash); + + if (!block) { + this.logger.debug('Peer sent unsolicited blocktxn (%s).', peer.hostname); + return; + } + + peer.compactBlocks.remove(res.hash); + + assert(this.compactBlocks.has(res.hash)); + this.compactBlocks.remove(res.hash); + + if (!block.fillMissing(res)) { + peer.increaseBan(100); + this.logger.warning('Peer sent non-full blocktxn (%s).', peer.hostname); + return; + } + + this.logger.debug( + 'Filled compact block %s (%s).', + block.rhash(), peer.hostname); + + this.emit('blocktxn', res, peer); + + yield this.handleBlock(peer, block.toBlock()); +}); + +/** + * Handle `getblocktxn` packet. + * @private + * @param {Peer} peer + * @param {TXRequest} req + */ + +Pool.prototype.handleGetBlockTxn = co(function* handleGetBlockTxn(peer, req) { + var res, item, block, height; + + if (this.chain.options.spv) + return; + + if (this.chain.options.prune) + return; + + if (this.options.selfish) + return; + + item = new InvItem(invTypes.BLOCK, req.hash); + + block = yield this.getItem(peer, item); + + if (!block) { + this.logger.debug( + 'Peer sent getblocktxn for non-existent block (%s).', + peer.hostname); + peer.increaseBan(100); + return; + } + + height = yield this.chain.db.getHeight(req.hash); + + if (height < this.chain.tip.height - 15) { + this.logger.debug( + 'Peer sent a getblocktxn for a block > 15 deep (%s)', + peer.hostname); + return; + } + + this.emit('getblocktxn', req, peer); + + res = BIP152.TXResponse.fromBlock(block, req); + + peer.send(new packets.BlockTxnPacket(res, peer.compactWitness)); +}); + +/** + * Handle `mempool` packet. + * @private + * @param {Peer} peer + * @param {MempoolPacket} packet + */ + +Pool.prototype.handleMempool = co(function* handleMempool(peer, packet) { + var items = []; + var i, hashes; + + if (!this.mempool) + return; + + if (!this.chain.synced) + return; + + if (this.options.selfish) + return; + + hashes = this.mempool.getSnapshot(); + + for (i = 0; i < hashes.length; i++) + items.push(new InvItem(invTypes.TX, hashes[i])); + + this.logger.debug('Sending mempool snapshot (%s).', peer.hostname); + + this.emit('mempool', peer); + + peer.sendInv(items); +}); + /** * Create an inbound peer from an existing socket. * @private - * @param {NetAddress} addr * @param {net.Socket} socket */ Pool.prototype.addInbound = function addInbound(socket) { var peer; - if (!this.loaded) - return socket.destroy(); + if (!this.loaded) { + socket.destroy(); + return; + } peer = this.acceptPeer(socket); @@ -1362,7 +2032,6 @@ Pool.prototype.addInbound = function addInbound(socket) { /** * Allocate a host from the host list. - * @param {Boolean} unique * @returns {NetAddress} */ @@ -1498,6 +2167,16 @@ Pool.prototype.removePeer = function removePeer(peer) { hash = hashes[i]; this.fulfill(peer, hash); } + + hashes = peer.compactBlocks.keys(); + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + assert(this.compactBlocks.has(hash)); + this.compactBlocks.remove(hash); + } + + peer.compactBlocks.reset(); }; /** @@ -1537,7 +2216,7 @@ Pool.prototype.setFilter = function setFilter(filter) { return; this.spvFilter = filter; - this.updateWatch(); + this.sendFilterLoad(); }; /** @@ -1551,7 +2230,7 @@ Pool.prototype.watch = function watch(data, enc) { return; this.spvFilter.add(data, enc); - this.updateWatch(); + this.sendFilterLoad(); }; /** @@ -1563,24 +2242,26 @@ Pool.prototype.unwatch = function unwatch() { return; this.spvFilter.reset(); - this.updateWatch(); + this.sendFilterLoad(); }; /** * Resend the bloom filter to peers. */ -Pool.prototype.updateWatch = function updateWatch() { +Pool.prototype.sendFilterLoad = function sendFilterLoad() { var self = this; var peer; if (this.pendingWatch != null) return; + assert(this.spvFilter); + this.pendingWatch = setTimeout(function() { self.pendingWatch = null; for (peer = self.peers.head(); peer; peer = peer.next) - peer.updateWatch(); + peer.sendFilterLoad(self.spvFilter); }, 50); }; @@ -1604,6 +2285,49 @@ Pool.prototype.watchOutpoint = function watchOutpoint(outpoint) { this.watch(outpoint.toRaw()); }; +/** + * Send `getblocks` to peer after building + * locator and resolving orphan root. + * @param {Peer} peer + * @param {Hash} orphan - Orphan hash to resolve. + * @returns {Promise} + */ + +Pool.prototype.resolveOrphan = co(function* resolveOrphan(peer, orphan) { + var locator = yield this.chain.getLocator(); + var root = this.chain.getOrphanRoot(orphan); + + assert(root); + + peer.sendGetBlocks(locator, root); +}); + +/** + * Send `getheaders` to peer after building locator. + * @param {Peer} peer + * @param {Hash} tip - Tip to build chain locator from. + * @param {Hash?} stop + * @returns {Promise} + */ + +Pool.prototype.getHeaders = co(function* getHeaders(peer, tip, stop) { + var locator = yield this.chain.getLocator(tip); + peer.sendGetHeaders(locator, stop); +}); + +/** + * Send `getblocks` to peer after building locator. + * @param {Peer} peer + * @param {Hash} tip - Tip hash to build chain locator from. + * @param {Hash?} stop + * @returns {Promise} + */ + +Pool.prototype.getBlocks = co(function* getBlocks(peer, tip, stop) { + var locator = yield this.chain.getLocator(tip); + peer.sendGetBlocks(locator, stop); +}); + /** * Queue a `getdata` request to be sent. * @param {Peer} peer