diff --git a/lib/http/rpc.js b/lib/http/rpc.js index 7e7beb3c..99ea66f0 100644 --- a/lib/http/rpc.js +++ b/lib/http/rpc.js @@ -543,7 +543,7 @@ RPC.prototype.getpeerinfo = co(function* getpeerinfo(args, help) { besthash: peer.bestHash ? util.revHex(peer.bestHash) : null, bestheight: peer.bestHeight, banscore: peer.banScore, - inflight: peer.requestMap.keys().map(util.revHex), + inflight: peer.blockMap.keys().map(util.revHex), whitelisted: false }); } diff --git a/lib/net/peer.js b/lib/net/peer.js index 6391474b..582de881 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -137,7 +137,8 @@ function Peer(options) { this.addrFilter = new Bloom.Rolling(5000, 0.001); this.invFilter = new Bloom.Rolling(50000, 0.000001); - this.requestMap = new Map(); + this.blockMap = new Map(); + this.txMap = new Map(); this.responseMap = new Map(); this.compactBlocks = new Map(); @@ -195,8 +196,7 @@ Peer.RESPONSE_TIMEOUT = 30000; /** * Required time for loader to - * respond with block/merkleblock - * during initial sync. + * respond with block/merkleblock. * @const {Number} * @default */ @@ -204,12 +204,13 @@ Peer.RESPONSE_TIMEOUT = 30000; Peer.BLOCK_TIMEOUT = 120000; /** - * Max number of requested items. + * Required time for loader to + * respond with a tx. * @const {Number} * @default */ -Peer.MAX_REQUESTS = 5000; +Peer.TX_TIMEOUT = 120000; /** * Generic timeout interval. @@ -1317,24 +1318,30 @@ Peer.prototype.maybeTimeout = function maybeTimeout() { } } - if (this.options.isFull()) { - if (this.requestMap.size > Peer.MAX_REQUESTS) { - this.error('Peer is stalling (data).'); - this.destroy(); - return; - } - - keys = this.requestMap.keys(); + if (this.options.isFull() || !this.syncing) { + keys = this.blockMap.keys(); for (i = 0; i < keys.length; i++) { key = keys[i]; - ts = this.requestMap.get(key); + ts = this.blockMap.get(key); if (now > ts + Peer.BLOCK_TIMEOUT) { this.error('Peer is stalling (block).'); this.destroy(); return; } } + + keys = this.txMap.keys(); + + for (i = 0; i < keys.length; i++) { + key = keys[i]; + ts = this.txMap.get(key); + if (now > ts + Peer.TX_TIMEOUT) { + this.error('Peer is stalling (tx).'); + this.destroy(); + return; + } + } } if (now > this.ts + 60000) { diff --git a/lib/net/pool.js b/lib/net/pool.js index ccb20536..2741b1c9 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -93,7 +93,8 @@ function Pool(options) { this.syncing = false; this.spvFilter = null; this.txFilter = null; - this.requestMap = new Map(); + this.blockMap = new Map(); + this.txMap = new Map(); this.compactBlocks = new Map(); this.invMap = new Map(); this.pendingFilter = null; @@ -173,7 +174,7 @@ Pool.prototype._init = function _init() { }); this.chain.on('full', function() { - self.resync(); + self.sync(); self.emit('full'); self.logger.info('Chain is fully synced (height=%d).', self.chain.height); }); @@ -356,7 +357,8 @@ Pool.prototype._disconnect = co(function* disconnect() { this.peers.destroy(); - this.requestMap.reset(); + this.blockMap.reset(); + this.txMap.reset(); if (this.pendingFilter != null) { clearTimeout(this.pendingFilter); @@ -702,6 +704,14 @@ Pool.prototype.forceSync = function forceSync() { this.resync(true); }; +/** + * Send a sync to each peer. + */ + +Pool.prototype.sync = function* sync(force) { + this.resync(false); +}; + /** * Stop the sync. * @private @@ -718,8 +728,14 @@ Pool.prototype.stopSync = function stopSync() { for (peer = this.peers.head(); peer; peer = peer.next) { if (!peer.outbound) continue; + peer.syncing = false; + peer.blockMap.reset(); + peer.compactBlocks.reset(); } + + this.blockMap.reset(); + this.compactBlocks.reset(); }; /** @@ -1375,7 +1391,7 @@ Pool.prototype.handleOpen = co(function* handleOpen(peer) { Pool.prototype.handleClose = co(function* handleClose(peer, connected) { var outbound = peer.outbound; var loader = peer.loader; - var size = peer.requestMap.size; + var size = peer.blockMap.size; this.removePeer(peer); @@ -1395,9 +1411,9 @@ Pool.prototype.handleClose = co(function* handleClose(peer, connected) { if (this.disconnecting) return; - if (this.chain.synced && !loader && size > 0) { - this.logger.debug('Peer with requested blocks disconnected.'); - this.logger.debug('Resending sync...'); + if (this.chain.synced && size > 0) { + this.logger.warning('Peer disconnected with requested blocks.'); + this.logger.warning('Resending sync...'); this.forceSync(); } @@ -1918,7 +1934,14 @@ Pool.prototype.handleNotFound = co(function* handleNotFound(peer, packet) { for (i = 0; i < items.length; i++) { item = items[i]; - this.fulfill(peer, item.hash); + + if (!this.resolveItem(peer, item)) { + this.logger.warning( + 'Peer sent notfound for unrequested item: %s (%s).', + item.hash, peer.hostname()); + peer.destroy(); + return; + } } }); @@ -2222,7 +2245,7 @@ Pool.prototype._addBlock = co(function* addBlock(peer, block, flags) { if (!this.syncing) return; - if (!this.fulfill(peer, hash)) { + if (!this.resolveBlock(peer, hash)) { this.logger.warning( 'Received unrequested block: %s (%s).', block.rhash(), peer.hostname()); @@ -2380,7 +2403,7 @@ Pool.prototype.logStatus = function logStatus(block) { (this.chain.getProgress() * 100).toFixed(2) + '%', this.chain.total, this.chain.orphanCount, - this.requestMap.size, + this.blockMap.size, block.bits, this.peers.size(), this.locker.jobs.length); @@ -2442,7 +2465,7 @@ Pool.prototype._handleTX = co(function* handleTX(peer, packet) { } } - if (!this.fulfill(peer, hash)) { + if (!this.resolveTX(peer, hash)) { this.logger.warning( 'Peer sent unrequested tx: %s (%s).', tx.txid(), peer.hostname()); @@ -2627,7 +2650,7 @@ Pool.prototype._handleMerkleBlock = co(function* handleMerkleBlock(peer, packet) return; } - if (!peer.requestMap.has(hash)) { + if (!peer.blockMap.has(hash)) { this.logger.warning( 'Peer sent an unrequested merkleblock (%s).', peer.hostname()); @@ -2733,7 +2756,7 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, packet) { return; } - if (!peer.requestMap.has(hash)) { + if (!peer.blockMap.has(hash)) { if (this.options.blockMode !== 1) { this.logger.warning( 'Peer sent us an unrequested compact block (%s).', @@ -2741,9 +2764,9 @@ Pool.prototype.handleCmpctBlock = co(function* handleCmpctBlock(peer, packet) { peer.destroy(); return; } - peer.requestMap.set(hash, util.ms()); - assert(!this.requestMap.has(hash)); - this.requestMap.insert(hash); + peer.blockMap.set(hash, util.ms()); + assert(!this.blockMap.has(hash)); + this.blockMap.insert(hash); } if (!this.mempool) { @@ -3132,11 +3155,18 @@ Pool.prototype.removePeer = function removePeer(peer) { this.peers.remove(peer); - hashes = peer.requestMap.keys(); + hashes = peer.blockMap.keys(); for (i = 0; i < hashes.length; i++) { hash = hashes[i]; - this.fulfill(peer, hash); + this.resolveBlock(peer, hash); + } + + hashes = peer.txMap.keys(); + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + this.resolveTX(peer, hash); } hashes = peer.compactBlocks.keys(); @@ -3340,11 +3370,14 @@ Pool.prototype.getBlock = function getBlock(peer, hashes) { for (i = 0; i < hashes.length; i++) { hash = hashes[i]; - if (this.requestMap.has(hash)) + if (this.blockMap.has(hash)) continue; - this.requestMap.insert(hash); - peer.requestMap.set(hash, now); + this.blockMap.insert(hash); + peer.blockMap.set(hash, now); + + if (this.chain.synced) + now += 100; items.push(hash); } @@ -3355,7 +3388,7 @@ Pool.prototype.getBlock = function getBlock(peer, hashes) { this.logger.debug( 'Requesting %d/%d blocks from peer with getdata (%s).', items.length, - this.requestMap.size, + this.blockMap.size, peer.hostname()); peer.getBlock(items); @@ -3384,11 +3417,13 @@ Pool.prototype.getTX = function getTX(peer, hashes) { for (i = 0; i < hashes.length; i++) { hash = hashes[i]; - if (this.requestMap.has(hash)) + if (this.txMap.has(hash)) continue; - this.requestMap.insert(hash); - peer.requestMap.set(hash, now); + this.txMap.insert(hash); + peer.txMap.set(hash, now); + + now += 50; items.push(hash); } @@ -3399,7 +3434,7 @@ Pool.prototype.getTX = function getTX(peer, hashes) { this.logger.debug( 'Requesting %d/%d txs from peer with getdata (%s).', items.length, - this.requestMap.size, + this.txMap.size, peer.hostname()); peer.getTX(items); @@ -3479,24 +3514,60 @@ Pool.prototype.ensureTX = function ensureTX(peer, hashes) { }; /** - * Fulfill a requested item. + * Fulfill a requested tx. * @param {Peer} peer * @param {Hash} hash * @returns {Boolean} */ -Pool.prototype.fulfill = function fulfill(peer, hash) { - if (!peer.requestMap.has(hash)) +Pool.prototype.resolveTX = function resolveTX(peer, hash) { + if (!peer.txMap.has(hash)) return false; - peer.requestMap.remove(hash); + peer.txMap.remove(hash); - assert(this.requestMap.has(hash)); - this.requestMap.remove(hash); + assert(this.txMap.has(hash)); + this.txMap.remove(hash); return true; }; +/** + * Fulfill a requested block. + * @param {Peer} peer + * @param {Hash} hash + * @returns {Boolean} + */ + +Pool.prototype.resolveBlock = function resolveBlock(peer, hash) { + if (!peer.blockMap.has(hash)) + return false; + + peer.blockMap.remove(hash); + + assert(this.blockMap.has(hash)); + this.blockMap.remove(hash); + + return true; +}; + +/** + * Fulfill a requested item. + * @param {Peer} peer + * @param {InvItem} item + * @returns {Boolean} + */ + +Pool.prototype.resolveItem = function resolveItem(peer, item) { + if (item.isBlock()) + return this.resolveBlock(peer, item.hash); + + if (item.isTX()) + return this.resolveTX(peer, item.hash); + + return false; +}; + /** * Broadcast a transaction or block. * @param {TX|Block} msg