diff --git a/lib/net/packets.js b/lib/net/packets.js index c07422bb..ffadc3c9 100644 --- a/lib/net/packets.js +++ b/lib/net/packets.js @@ -1842,7 +1842,7 @@ function RejectPacket(options) { this.message = ''; this.code = constants.reject.INVALID; this.reason = ''; - this.data = null; + this.hash = null; if (options) this.fromOptions(options); @@ -1878,8 +1878,8 @@ RejectPacket.prototype.fromOptions = function fromOptions(options) { if (options.reason) this.reason = options.reason; - if (options.data) - this.data = options.data; + if (options.hash) + this.hash = options.hash; return this; }; @@ -1906,7 +1906,7 @@ RejectPacket.prototype.getSize = function getSize() { size += 1; size += encoding.sizeVarString(this.reason, 'ascii'); - if (this.data) + if (this.hash) size += 32; return size; @@ -1925,8 +1925,8 @@ RejectPacket.prototype.toWriter = function toWriter(bw) { bw.writeU8(this.code); bw.writeVarString(this.reason, 'ascii'); - if (this.data) - bw.writeHash(this.data); + if (this.hash) + bw.writeHash(this.hash); return bw; }; @@ -1953,9 +1953,9 @@ RejectPacket.prototype.fromReader = function fromReader(br) { this.reason = br.readVarString('ascii', 111); if (this.message === 'block' || this.message === 'tx') - this.data = br.readHash('hex'); + this.hash = br.readHash('hex'); else - this.data = null; + this.hash = null; return this; }; @@ -2017,7 +2017,7 @@ RejectPacket.prototype.fromReason = function fromReason(code, reason, obj) { if (obj) { this.message = (obj instanceof TX) ? 'tx' : 'block'; - this.data = obj.hash('hex'); + this.hash = obj.hash('hex'); } return this; @@ -2056,7 +2056,7 @@ RejectPacket.prototype.inspect = function inspect() { + ' msg=' + this.message + ' code=' + (constants.rejectByVal[this.code] || this.code) + ' reason=' + this.reason - + ' data=' + this.data + + ' hash=' + (this.hash ? util.revHex(this.hash) : null) + '>'; }; diff --git a/lib/net/peer.js b/lib/net/peer.js index 70d66e1c..44553d12 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -378,6 +378,11 @@ Peer.prototype.initConnect = function initConnect() { self.connected = true; self.emit('connect'); + if (self.connectTimeout != null) { + clearTimeout(self.connectTimeout); + self.connectTimeout = null; + } + resolve(); }); @@ -1090,6 +1095,7 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { case packetTypes.GETBLOCKS: case packetTypes.GETHEADERS: case packetTypes.GETUTXOS: + case packetTypes.GETBLOCKTXN: unlock = yield this.locker.lock(); this.socket.pause(); @@ -1113,8 +1119,6 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { */ Peer.prototype.onPacket = co(function* onPacket(packet) { - this.lastRecv = util.ms(); - if (this.destroyed) throw new Error('Destroyed peer sent a packet.'); @@ -1138,6 +1142,8 @@ Peer.prototype.onPacket = co(function* onPacket(packet) { this._flushMerkle(); } + this.lastRecv = util.ms(); + switch (packet.type) { case packetTypes.VERSION: return yield this.handleVersion(packet); @@ -2138,19 +2144,18 @@ Peer.prototype.handleTX = co(function* handleTX(packet) { /** * Handle `reject` packet. * @private - * @param {RejectPacket} + * @param {RejectPacket} reject */ -Peer.prototype.handleReject = co(function* handleReject(details) { - var hash, entry; +Peer.prototype.handleReject = co(function* handleReject(reject) { + var entry; - this.fire('reject', details); + this.fire('reject', reject); - if (!details.data) + if (!reject.hash) return; - hash = details.data; - entry = this.pool.invMap[hash]; + entry = this.pool.invMap[reject.hash]; if (!entry) return; @@ -2583,12 +2588,12 @@ Peer.prototype.sendCompact = function sendCompact() { }; /** - * Check whether the peer is misbehaving (banScore >= 100). + * Check whether the peer is banned (banScore >= 100). * @returns {Boolean} */ -Peer.prototype.isMisbehaving = function isMisbehaving() { - return this.pool.hosts.isMisbehaving(this); +Peer.prototype.isBanned = function isBanned() { + return this.pool.hosts.isBanned(this); }; /** diff --git a/lib/net/pool.js b/lib/net/pool.js index 6ba6913e..4c88d22e 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -497,8 +497,8 @@ Pool.prototype.handleLeech = function handleLeech(socket) { return; } - if (this.hosts.isMisbehaving(addr)) { - this.logger.debug('Ignoring misbehaving leech (%s).', addr.hostname); + if (this.hosts.isBanned(addr)) { + this.logger.debug('Ignoring banned leech (%s).', addr.hostname); socket.destroy(); return; } @@ -753,280 +753,6 @@ Pool.prototype.stopSync = co(function* stopSync() { } }); -/** - * Handle `headers` packet from a given peer. - * @private - * @param {Headers[]} headers - * @param {Peer} peer - * @returns {Promise} - */ - -Pool.prototype.handleHeaders = co(function* handleHeaders(headers, peer) { - var unlock = yield this.locker.lock(); - try { - return yield this._handleHeaders(headers, peer); - } finally { - unlock(); - } -}); - -/** - * Handle `headers` packet from - * a given peer without a lock. - * @private - * @param {Headers[]} headers - * @param {Peer} peer - * @returns {Promise} - */ - -Pool.prototype._handleHeaders = co(function* handleHeaders(headers, peer) { - var i, ret, header, hash, last; - - if (!this.options.headers) - return; - - ret = new VerifyResult(); - - this.logger.debug( - 'Received %s headers from peer (%s).', - headers.length, - peer.hostname); - - this.emit('headers', headers); - - if (peer.isLoader()) { - // Reset interval to avoid stall behavior. - this.startInterval(); - // Reset timeout to avoid killing the loader. - this.startTimeout(); - } - - for (i = 0; i < headers.length; i++) { - header = headers[i]; - hash = header.hash('hex'); - - if (last && header.prevBlock !== last) { - peer.increaseBan(100); - throw new Error('Bad header chain.'); - } - - if (!header.verify(ret)) { - peer.reject(header, 'invalid', ret.reason, 100); - throw new Error('Invalid header.'); - } - - last = hash; - - if (!(yield this.chain.has(hash))) - this.getBlock(peer, hash); - } - - // Schedule the getdata's we just added. - this.scheduleRequests(peer); - - // Restart the getheaders process - // Technically `last` is not indexed yet so - // the locator hashes will not be entirely - // accurate. However, it shouldn't matter - // that much since FindForkInGlobalIndex - // simply tries to find the latest block in - // the peer's chain. - if (last && headers.length === 2000) - yield peer.getHeaders(last); -}); - -/** - * Handle `inv` packet from peer (containing only BLOCK types). - * @private - * @param {Hash[]} hashes - * @param {Peer} peer - * @returns {Promise} - */ - -Pool.prototype.handleBlocks = co(function* handleBlocks(hashes, peer) { - var i, hash; - - assert(!this.options.headers); - - this.logger.debug( - 'Received %s block hashes from peer (%s).', - hashes.length, - peer.hostname); - - this.emit('blocks', hashes); - - if (peer.isLoader()) { - // Reset interval to avoid stall behavior. - this.startInterval(); - // Reset timeout to avoid killing the loader. - this.startTimeout(); - } - - for (i = 0; i < hashes.length; i++) { - hash = hashes[i]; - - // Resolve orphan chain. - if (this.chain.hasOrphan(hash)) { - // There is a possible race condition here. - // The orphan may get resolved by the time - // we create the locator. In that case, we - // should probably actually move to the - // `exists` clause below if it is the last - // hash. - this.logger.debug('Received known orphan hash (%s).', peer.hostname); - yield peer.resolveOrphan(null, hash); - continue; - } - - // Request the block if we don't have it. - if (!(yield this.chain.has(hash))) { - this.getBlock(peer, hash); - continue; - } - - // Normally we request the hashContinue. - // In the odd case where we already have - // it, we can do one of two things: either - // force re-downloading of the block to - // continue the sync, or do a getblocks - // from the last hash (this will reset - // the hashContinue on the remote node). - if (i === hashes.length - 1) { - this.logger.debug('Received existing hash (%s).', peer.hostname); - yield peer.getBlocks(hash, null); - } - } - - this.scheduleRequests(peer); -}); - -/** - * Handle `inv` packet from peer (containing only BLOCK types). - * Potentially request headers if headers mode is enabled. - * @private - * @param {Hash[]} hashes - * @param {Peer} peer - * @returns {Promise} - */ - -Pool.prototype.handleInv = co(function* handleInv(hashes, peer) { - var unlock = yield this.locker.lock(); - try { - return yield this._handleInv(hashes, peer); - } finally { - unlock(); - } -}); - -/** - * Handle `inv` packet from peer without a lock. - * @private - * @param {Hash[]} hashes - * @param {Peer} peer - * @returns {Promise} - */ - -Pool.prototype._handleInv = co(function* handleInv(hashes, peer) { - var i, hash; - - // Ignore for now if we're still syncing - if (!this.chain.synced && !peer.isLoader()) - return; - - if (this.options.witness && !peer.version.hasWitness()) - return; - - if (!this.options.headers) { - yield this.handleBlocks(hashes, peer); - return; - } - - for (i = 0; i < hashes.length; i++) { - hash = hashes[i]; - yield peer.getHeaders(null, hash); - } - - this.scheduleRequests(peer); -}); - -/** - * Handle `block` packet. Attempt to add to chain. - * @private - * @param {MemBlock|MerkleBlock} block - * @param {Peer} peer - * @returns {Promise} - */ - -Pool.prototype.handleBlock = co(function* handleBlock(block, peer) { - var requested = this.fulfill(block); - - // Someone is sending us blocks without - // us requesting them. - if (!requested) { - peer.invFilter.add(block.hash()); - this.logger.warning( - 'Received unrequested block: %s (%s).', - block.rhash(), peer.hostname); - return yield co.wait(); - } - - try { - yield this.chain.add(block); - } catch (err) { - if (err.type !== 'VerifyError') { - this.scheduleRequests(peer); - throw err; - } - - peer.reject(block, err.code, err.reason, err.score); - - if (err.reason === 'bad-prevblk') { - if (this.options.headers) { - peer.increaseBan(10); - throw err; - } - this.logger.debug('Peer sent an orphan block. Resolving.'); - yield peer.resolveOrphan(null, block.hash('hex')); - this.scheduleRequests(peer); - throw err; - } - - this.scheduleRequests(peer); - throw err; - } - - this.scheduleRequests(peer); - - this.emit('chain-progress', this.chain.getProgress(), peer); - - if (this.logger.level >= 4 && this.chain.total % 20 === 0) { - this.logger.debug('Status:' - + ' ts=%s height=%d highest=%d progress=%s' - + ' blocks=%d orphans=%d active=%d' - + ' queue=%d target=%s peers=%d' - + ' pending=%d jobs=%d', - util.date(block.ts), - this.chain.height, - this.chain.bestHeight, - (this.chain.getProgress() * 100).toFixed(2) + '%', - this.chain.total, - this.chain.orphan.count, - this.activeBlocks, - peer.queueBlock.size, - block.bits, - this.peers.size(), - this.chain.locker.pending.length, - this.chain.locker.jobs.length); - } - - if (this.chain.total % 2000 === 0) { - this.logger.info( - 'Received 2000 more blocks (height=%d, hash=%s).', - this.chain.height, - block.rhash()); - } -}); - /** * Send `mempool` to all peers. */ @@ -1102,66 +828,33 @@ Pool.prototype.bindPeer = function bindPeer(peer) { var self = this; peer.once('open', function() { - if (!peer.outbound) - return; - - // Attempt to promote from pending->outbound - self.peers.promote(peer); - - // If we don't have an ack'd loader yet, use this peer. - if (!self.peers.load || !self.peers.load.ack) - self.setLoader(peer); + self.handleOpen(peer); }); peer.once('close', function() { - if (!self.loaded) { - self.removePeer(peer); - return; - } + self.handleClose(peer); + }); - if (!peer.isLoader()) { - self.removePeer(peer); - self.fillPeers(); - return; - } + peer.on('error', function(err) { + self.emit('error', err, peer); + }); - self.removePeer(peer); - self.stopInterval(); - self.stopTimeout(); + peer.on('version', function(version) { + self.handleVersion(version, peer); + }); - if (self.peers.outbound === 0) { - self.logger.warning('%s %s %s', - 'Could not connect to any peers.', - 'Do you have a network connection?', - 'Retrying in 5 seconds.'); - setTimeout(function() { - self.addLoader(); - }, 5000); - return; - } - - self.addLoader(); + peer.on('addr', function(addrs) { + self.handleAddr(addrs, peer); }); peer.on('merkleblock', co(function* (block) { if (!self.options.spv) return; - if (!self.syncing) - return; - - // If the peer sent us a block that was added - // to the chain (not orphans), reset the timeout. try { yield self.handleBlock(block, peer); } catch (e) { self.emit('error', e); - return; - } - - if (peer.isLoader()) { - self.startInterval(); - self.startTimeout(); } })); @@ -1169,61 +862,13 @@ Pool.prototype.bindPeer = function bindPeer(peer) { if (self.options.spv) return; - if (!self.syncing) - return; - - // If the peer sent us a block that was added - // to the chain (not orphans), reset the timeout. try { yield self.handleBlock(block, peer); } catch (e) { self.emit('error', e); - return; - } - - if (peer.isLoader()) { - self.startInterval(); - self.startTimeout(); } })); - peer.on('error', function(err) { - self.emit('error', err, peer); - }); - - peer.on('reject', function(payload) { - var data, code; - - if (payload.data) - data = util.revHex(payload.data); - - code = constants.rejectByVal[payload.code]; - - if (code) - code = code.toLowerCase(); - - self.logger.warning( - 'Received reject (%s): msg=%s code=%s reason=%s data=%s.', - peer.hostname, - payload.message, - code || payload.code, - payload.reason, - data || null); - - self.emit('reject', payload, peer); - }); - - peer.on('notfound', function(items) { - var i, item, req; - - for (i = 0; i < items.length; i++) { - item = items[i]; - req = self.requestMap[item.hash]; - if (req && req.peer === peer) - req.finish(new Error('Not found.')); - } - }); - peer.on('tx', co(function* (tx) { try { yield self.handleTX(tx, peer); @@ -1232,72 +877,7 @@ Pool.prototype.bindPeer = function bindPeer(peer) { } })); - peer.on('addr', function(addrs) { - var i, addr; - - if (self.options.ignoreDiscovery) - return; - - for (i = 0; i < addrs.length; i++) { - addr = addrs[i]; - - if (!addr.hasNetwork()) - continue; - - if (self.options.spv) { - if (!addr.hasBloom()) - continue; - } - - if (self.options.witness) { - if (!addr.hasWitness()) - continue; - } - - if (self.hosts.add(addr)) - self.emit('host', addr, peer); - } - - self.emit('addr', addrs, peer); - self.fillPeers(); - }); - - peer.on('txs', function (txs) { - var i, hash; - - self.emit('txs', txs, peer); - - if (self.syncing && !self.chain.synced) - return; - - for (i = 0; i < txs.length; i++) { - hash = txs[i]; - try { - self.getTX(peer, hash); - } catch (e) { - self.emit('error', e); - } - } - }); - - peer.on('version', function(version) { - self.logger.info( - 'Received version (%s): version=%d height=%d services=%s agent=%s', - peer.hostname, - version.version, - version.height, - version.services.toString(2), - version.agent); - - self.network.time.add(peer.hostname, version.ts); - - self.emit('version', version, peer); - }); - peer.on('headers', co(function* (headers) { - if (!self.syncing) - return; - try { yield self.handleHeaders(headers, peer); } catch (e) { @@ -1306,15 +886,549 @@ Pool.prototype.bindPeer = function bindPeer(peer) { })); peer.on('blocks', co(function* (hashes) { - if (!self.syncing) - return; - try { - yield self.handleInv(hashes, peer); + yield self.handleBlockInv(hashes, peer); } catch (e) { self.emit('error', e); } })); + + peer.on('txs', function (txs) { + self.handleTXInv(txs, peer); + }); + + peer.on('reject', function(reject) { + self.handleReject(reject, peer); + }); + + peer.on('notfound', function(items) { + self.handleNotFound(items, peer); + }); + + peer.on('alert', function(alert) { + self.handleAlert(alert, peer); + }); +}; + +/** + * Handle peer open event. + * @private + * @param {Peer} peer + */ + +Pool.prototype.handleOpen = function handleOpen(peer) { + if (!peer.outbound) + return; + + // Attempt to promote from pending->outbound + this.peers.promote(peer); + + // If we don't have an ack'd loader yet, use this peer. + if (!this.peers.load || !this.peers.load.ack) + this.setLoader(peer); +}; + +/** + * Handle peer close event. + * @private + * @param {Peer} peer + */ + +Pool.prototype.handleClose = function handleClose(peer) { + var self = this; + + if (!this.loaded) { + this.removePeer(peer); + return; + } + + if (!peer.isLoader()) { + this.removePeer(peer); + this.fillPeers(); + return; + } + + this.removePeer(peer); + this.stopInterval(); + this.stopTimeout(); + + if (this.peers.outbound === 0) { + this.logger.warning('%s %s %s', + 'Could not connect to any peers.', + 'Do you have a network connection?', + 'Retrying in 5 seconds.'); + setTimeout(function() { + self.addLoader(); + }, 5000); + return; + } + + this.addLoader(); +}; + +/** + * Handle peer version event. + * @private + * @param {VersionPacket} version + * @param {Peer} peer + */ + +Pool.prototype.handleVersion = function handleVersion(version, peer) { + this.logger.info( + 'Received version (%s): version=%d height=%d services=%s agent=%s', + peer.hostname, + version.version, + version.height, + version.services.toString(2), + version.agent); + + this.network.time.add(peer.hostname, version.ts); + + this.emit('version', version, peer); +}; + +/** + * Handle peer addr event. + * @private + * @param {NetworkAddress[]} addrs + * @param {Peer} peer + */ + +Pool.prototype.handleAddr = function handleAddr(addrs, peer) { + var i, addr; + + if (this.options.ignoreDiscovery) + return; + + for (i = 0; i < addrs.length; i++) { + addr = addrs[i]; + + if (!addr.hasNetwork()) + continue; + + if (this.options.spv) { + if (!addr.hasBloom()) + continue; + } + + if (this.options.witness) { + if (!addr.hasWitness()) + continue; + } + + if (this.hosts.add(addr)) + this.emit('host', addr, peer); + } + + this.emit('addr', addrs, peer); + this.fillPeers(); +}; + +/** + * Handle `block` packet. Attempt to add to chain. + * @private + * @param {MemBlock|MerkleBlock} block + * @param {Peer} peer + * @returns {Promise} + */ + +Pool.prototype.handleBlock = co(function* handleBlock(block, peer) { + var requested; + + if (!this.syncing) + return; + + requested = this.fulfill(block); + + // Someone is sending us blocks without + // us requesting them. + if (!requested) { + peer.invFilter.add(block.hash()); + this.logger.warning( + 'Received unrequested block: %s (%s).', + block.rhash(), peer.hostname); + return; + } + + try { + yield this.chain.add(block); + } catch (err) { + if (err.type !== 'VerifyError') { + this.scheduleRequests(peer); + throw err; + } + + peer.reject(block, err.code, err.reason, err.score); + + if (err.reason === 'bad-prevblk') { + if (this.options.headers) { + peer.increaseBan(10); + throw err; + } + this.logger.debug('Peer sent an orphan block. Resolving.'); + yield peer.resolveOrphan(null, block.hash('hex')); + this.scheduleRequests(peer); + throw err; + } + + this.scheduleRequests(peer); + throw err; + } + + this.scheduleRequests(peer); + + // If the peer sent us a block that was added + // to the chain (not orphans), reset the timeout. + if (peer.isLoader()) { + this.startInterval(); + this.startTimeout(); + } + + if (this.logger.level >= 4 && this.chain.total % 20 === 0) { + this.logger.debug('Status:' + + ' ts=%s height=%d highest=%d progress=%s' + + ' blocks=%d orphans=%d active=%d' + + ' queue=%d target=%s peers=%d' + + ' pending=%d jobs=%d', + util.date(block.ts), + this.chain.height, + this.chain.bestHeight, + (this.chain.getProgress() * 100).toFixed(2) + '%', + this.chain.total, + this.chain.orphan.count, + this.activeBlocks, + peer.queueBlock.size, + block.bits, + this.peers.size(), + this.chain.locker.pending.length, + this.chain.locker.jobs.length); + } + + if (this.chain.total % 2000 === 0) { + this.logger.info( + 'Received 2000 more blocks (height=%d, hash=%s).', + this.chain.height, + block.rhash()); + } +}); + + +/** + * Handle a transaction. Attempt to add to mempool. + * @private + * @param {TX} tx + * @param {Peer} peer + * @returns {Promise} + */ + +Pool.prototype.handleTX = co(function* handleTX(tx, peer) { + var requested = this.fulfill(tx); + var i, missing; + + if (!requested) { + peer.invFilter.add(tx.hash()); + + if (!this.mempool) + this.txFilter.add(tx.hash()); + + this.logger.warning('Peer sent unrequested tx: %s (%s).', + tx.txid(), peer.hostname); + + if (this.hasReject(tx.hash())) { + throw new VerifyError(tx, + 'alreadyknown', + 'txn-already-in-mempool', + 0); + } + } + + if (!this.mempool) { + this.emit('tx', tx, peer); + return; + } + + try { + missing = yield this.mempool.addTX(tx); + } catch (err) { + if (err.type === 'VerifyError') + peer.reject(tx, err.code, err.reason, err.score); + throw err; + } + + if (missing) { + this.logger.debug( + 'Requesting %d missing transactions (%s).', + missing.length, peer.hostname); + + try { + for (i = 0; i < missing.length; i++) + this.getTX(peer, missing[i]); + } catch (e) { + this.emit('error', e); + } + } + + this.emit('tx', tx, peer); +}); + +/** + * Handle `headers` packet from a given peer. + * @private + * @param {Headers[]} headers + * @param {Peer} peer + * @returns {Promise} + */ + +Pool.prototype.handleHeaders = co(function* handleHeaders(headers, peer) { + var unlock = yield this.locker.lock(); + try { + return yield this._handleHeaders(headers, peer); + } finally { + unlock(); + } +}); + +/** + * Handle `headers` packet from + * a given peer without a lock. + * @private + * @param {Headers[]} headers + * @param {Peer} peer + * @returns {Promise} + */ + +Pool.prototype._handleHeaders = co(function* handleHeaders(headers, peer) { + var i, ret, header, hash, last; + + if (!this.options.headers) + return; + + if (!this.syncing) + return; + + ret = new VerifyResult(); + + this.logger.debug( + 'Received %s headers from peer (%s).', + headers.length, + peer.hostname); + + this.emit('headers', headers); + + if (peer.isLoader()) { + // Reset interval to avoid stall behavior. + this.startInterval(); + // Reset timeout to avoid killing the loader. + this.startTimeout(); + } + + for (i = 0; i < headers.length; i++) { + header = headers[i]; + hash = header.hash('hex'); + + if (last && header.prevBlock !== last) { + peer.increaseBan(100); + throw new Error('Bad header chain.'); + } + + if (!header.verify(ret)) { + peer.reject(header, 'invalid', ret.reason, 100); + throw new Error('Invalid header.'); + } + + last = hash; + + if (!(yield this.chain.has(hash))) + this.getBlock(peer, hash); + } + + // Schedule the getdata's we just added. + this.scheduleRequests(peer); + + // Restart the getheaders process + // Technically `last` is not indexed yet so + // the locator hashes will not be entirely + // accurate. However, it shouldn't matter + // that much since FindForkInGlobalIndex + // simply tries to find the latest block in + // the peer's chain. + if (last && headers.length === 2000) + yield peer.getHeaders(last); +}); + +/** + * Handle `inv` packet from peer (containing only BLOCK types). + * Potentially request headers if headers mode is enabled. + * @private + * @param {Hash[]} hashes + * @param {Peer} peer + * @returns {Promise} + */ + +Pool.prototype.handleBlockInv = co(function* handleBlockInv(hashes, peer) { + var unlock = yield this.locker.lock(); + try { + return yield this._handleBlockInv(hashes, peer); + } finally { + unlock(); + } +}); + +/** + * Handle `inv` packet from peer without a lock. + * @private + * @param {Hash[]} hashes + * @param {Peer} peer + * @returns {Promise} + */ + +Pool.prototype._handleBlockInv = co(function* handleBlockInv(hashes, peer) { + var i, hash; + + if (!this.syncing) + return; + + // Ignore for now if we're still syncing + if (!this.chain.synced && !peer.isLoader()) + return; + + if (this.options.witness && !peer.version.hasWitness()) + return; + + // Request headers instead. + if (this.options.headers) { + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + yield peer.getHeaders(null, hash); + } + + this.scheduleRequests(peer); + + return; + } + + this.logger.debug( + 'Received %s block hashes from peer (%s).', + hashes.length, + peer.hostname); + + this.emit('blocks', hashes); + + if (peer.isLoader()) { + // Reset interval to avoid stall behavior. + this.startInterval(); + // Reset timeout to avoid killing the loader. + this.startTimeout(); + } + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + + // Resolve orphan chain. + if (this.chain.hasOrphan(hash)) { + // There is a possible race condition here. + // The orphan may get resolved by the time + // we create the locator. In that case, we + // should probably actually move to the + // `exists` clause below if it is the last + // hash. + this.logger.debug('Received known orphan hash (%s).', peer.hostname); + yield peer.resolveOrphan(null, hash); + continue; + } + + // Request the block if we don't have it. + if (!(yield this.chain.has(hash))) { + this.getBlock(peer, hash); + continue; + } + + // Normally we request the hashContinue. + // In the odd case where we already have + // it, we can do one of two things: either + // force re-downloading of the block to + // continue the sync, or do a getblocks + // from the last hash (this will reset + // the hashContinue on the remote node). + if (i === hashes.length - 1) { + this.logger.debug('Received existing hash (%s).', peer.hostname); + yield peer.getBlocks(hash, null); + } + } + + this.scheduleRequests(peer); +}); + +/** + * Handle peer inv packet (txs). + * @private + * @param {Hash[]} txs + * @param {Peer} peer + */ + +Pool.prototype.handleTXInv = function handleTXInv(txs, peer) { + var i, hash; + + this.emit('txs', txs, peer); + + if (this.syncing && !this.chain.synced) + return; + + for (i = 0; i < txs.length; i++) { + hash = txs[i]; + try { + this.getTX(peer, hash); + } catch (e) { + this.emit('error', e); + } + } +}; + +/** + * Handle peer reject event. + * @private + * @param {RejectPacket} reject + * @param {Peer} peer + */ + +Pool.prototype.handleReject = function handleReject(reject, peer) { + var data, code; + + if (reject.hash) + data = util.revHex(reject.hash); + + code = constants.rejectByVal[reject.code]; + + if (code) + code = code.toLowerCase(); + + this.logger.warning( + 'Received reject (%s): msg=%s code=%s reason=%s data=%s.', + peer.hostname, + reject.message, + code || reject.code, + reject.reason, + data || null); + + this.emit('reject', reject, peer); +}; + +/** + * Handle peer notfound packet. + * @private + * @param {InvItem[]} items + * @param {Peer} peer + */ + +Pool.prototype.handleNotFound = function handleNotFound(items, peer) { + var i, item, req; + + for (i = 0; i < items.length; i++) { + item = items[i]; + req = this.requestMap[item.hash]; + if (req && req.peer === peer) + req.finish(new Error('Not found.')); + } }; /** @@ -1390,64 +1504,6 @@ Pool.prototype.hasReject = function hasReject(hash) { return this.mempool.hasReject(hash); }; -/** - * Handle a transaction. Attempt to add to mempool. - * @private - * @param {TX} tx - * @param {Peer} peer - * @returns {Promise} - */ - -Pool.prototype.handleTX = co(function* handleTX(tx, peer) { - var requested = this.fulfill(tx); - var i, missing; - - if (!requested) { - peer.invFilter.add(tx.hash()); - - if (!this.mempool) - this.txFilter.add(tx.hash()); - - this.logger.warning('Peer sent unrequested tx: %s (%s).', - tx.txid(), peer.hostname); - - if (this.hasReject(tx.hash())) { - throw new VerifyError(tx, - 'alreadyknown', - 'txn-already-in-mempool', - 0); - } - } - - if (!this.mempool) { - this.emit('tx', tx, peer); - return; - } - - try { - missing = yield this.mempool.addTX(tx); - } catch (err) { - if (err.type === 'VerifyError') - peer.reject(tx, err.code, err.reason, err.score); - throw err; - } - - if (missing) { - this.logger.debug( - 'Requesting %d missing transactions (%s).', - missing.length, peer.hostname); - - try { - for (i = 0; i < missing.length; i++) - this.getTX(peer, missing[i]); - } catch (e) { - this.emit('error', e); - } - } - - this.emit('tx', tx, peer); -}); - /** * Create an inbound peer from an existing socket. * @private @@ -1557,6 +1613,7 @@ Pool.prototype.removePeer = function removePeer(peer) { Pool.prototype.setFilter = function setFilter(filter) { if (!this.options.spv) return; + this.spvFilter = filter; this.updateWatch(); }; @@ -1570,6 +1627,7 @@ Pool.prototype.setFilter = function setFilter(filter) { Pool.prototype.watch = function watch(data, enc) { if (!this.options.spv) return; + this.spvFilter.add(data, enc); this.updateWatch(); }; @@ -1581,6 +1639,7 @@ Pool.prototype.watch = function watch(data, enc) { Pool.prototype.unwatch = function unwatch() { if (!this.options.spv) return; + this.spvFilter.reset(); this.updateWatch(); }; @@ -1928,8 +1987,8 @@ Pool.prototype.unban = function unban(addr) { * @returns {Boolean} */ -Pool.prototype.isMisbehaving = function isMisbehaving(addr) { - return this.hosts.isMisbehaving(addr); +Pool.prototype.isBanned = function isBanned(addr) { + return this.hosts.isBanned(addr); }; /** @@ -2110,7 +2169,7 @@ function HostList(pool) { this.list = new List(); this.seeds = []; this.map = {}; - this.misbehaving = {}; + this.banned = {}; this.setSeeds(this.network.seeds); } @@ -2151,11 +2210,11 @@ HostList.prototype.reset = function reset() { }; /** - * Clear misbehaving and ignored. + * Clear banned and ignored. */ HostList.prototype.clear = function clear() { - this.misbehaving = {}; + this.banned = {}; }; /** @@ -2208,12 +2267,12 @@ HostList.prototype.remove = function remove(addr) { }; /** - * Mark a peer as misbehaving. + * Mark a peer as banned. * @param {NetworkAddress} addr */ HostList.prototype.ban = function ban(addr) { - this.misbehaving[addr.host] = util.now(); + this.banned[addr.host] = util.now(); return this.remove(addr); }; @@ -2223,7 +2282,7 @@ HostList.prototype.ban = function ban(addr) { */ HostList.prototype.unban = function unban(addr) { - delete this.misbehaving[addr.host]; + delete this.banned[addr.host]; }; /** @@ -2232,14 +2291,14 @@ HostList.prototype.unban = function unban(addr) { * @returns {Boolean} */ -HostList.prototype.isMisbehaving = function isMisbehaving(addr) { - var time = this.misbehaving[addr.host]; +HostList.prototype.isBanned = function isBanned(addr) { + var time = this.banned[addr.host]; if (time == null) return false; if (util.now() > time + constants.BAN_TIME) { - delete this.misbehaving[addr.host]; + delete this.banned[addr.host]; return false; }