From 9c352c5d2b6e7e8ee378e7a50f3fe8ecabab1846 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Thu, 19 Jan 2017 01:18:57 -0800 Subject: [PATCH] net: better locks. --- lib/blockchain/chain.js | 19 -- lib/net/peer.js | 121 +++++++++---- lib/net/pool.js | 385 +++++++++++++++------------------------- 3 files changed, 224 insertions(+), 301 deletions(-) diff --git a/lib/blockchain/chain.js b/lib/blockchain/chain.js index 7900f6ad..7499df81 100644 --- a/lib/blockchain/chain.js +++ b/lib/blockchain/chain.js @@ -1117,25 +1117,6 @@ Chain.prototype._resetTime = co(function* resetTime(ts) { yield this._reset(entry.height, false); }); -/** - * Wait for the chain to drain (finish processing - * all of the blocks in its queue). - * @returns {Promise} - */ - -Chain.prototype.onDrain = function onDrain() { - return this.locker.wait(); -}; - -/** - * Test whether the chain is in the process of adding blocks. - * @returns {Boolean} - */ - -Chain.prototype.isBusy = function isBusy() { - return this.locker.isBusy(); -}; - /** * Add a block to the chain, perform all necessary verification. * @param {Block} block diff --git a/lib/net/peer.js b/lib/net/peer.js index 61d33ee5..b3c4afc9 100644 --- a/lib/net/peer.js +++ b/lib/net/peer.js @@ -136,9 +136,8 @@ function Peer(pool) { this.invFilter = new Bloom.Rolling(50000, 0.000001); this.requestMap = new Map(); - this.blockQueue = []; - this.compactBlocks = new Map(); this.responseMap = new Map(); + this.compactBlocks = new Map(); if (this.options.bip151) { this.bip151 = new BIP151(); @@ -263,7 +262,7 @@ Peer.prototype._init = function init() { this.parser.on('packet', co(function* (packet) { try { - yield self.handlePacket(packet); + yield self.readPacket(packet); } catch (e) { self.error(e); self.destroy(); @@ -1402,28 +1401,14 @@ Peer.prototype.getTX = function getTX(hashes) { * @param {Packet} packet */ -Peer.prototype.handlePacket = co(function* handlePacket(packet) { - var unlock; - - // We stop reads and lock the peer for any - // packet with significant IO/asynchronocity. - switch (packet.type) { - case packetTypes.GETDATA: - case packetTypes.GETBLOCKS: - case packetTypes.GETHEADERS: - case packetTypes.GETUTXOS: - case packetTypes.GETBLOCKTXN: - unlock = yield this.locker.lock(); - try { - this.socket.pause(); - return yield this.onPacket(packet); - } finally { - this.socket.resume(); - unlock(); - } - break; - default: - return yield this.onPacket(packet); +Peer.prototype.readPacket = co(function* readPacket(packet) { + var unlock = yield this.locker.lock(); + try { + this.socket.pause(); + return yield this.handlePacket(packet); + } finally { + this.socket.resume(); + unlock(); } }); @@ -1433,7 +1418,7 @@ Peer.prototype.handlePacket = co(function* handlePacket(packet) { * @param {Packet} packet */ -Peer.prototype.onPacket = co(function* onPacket(packet) { +Peer.prototype.handlePacket = co(function* handlePacket(packet) { var entry; if (this.destroyed) @@ -1573,19 +1558,78 @@ Peer.prototype.onPacket = co(function* onPacket(packet) { entry.resolve(packet); }); +/** + * Emit a block. + * @param {Block} block + * @returns {Promise} + */ + +Peer.prototype.emitBlock = co(function* emitBlock(block) { + this.emit('block', block); + yield this.pool.handleBlock(this, block); +}); + +/** + * Emit a block inv. + * @param {Hash[]} blocks + * @returns {Promise} + */ + +Peer.prototype.emitBlockInv = co(function* emitBlockInv(blocks) { + this.emit('blocks', blocks); + yield this.pool.handleBlockInv(this, blocks); +}); + +/** + * Emit headers. + * @param {Header[]} headers + * @returns {Promise} + */ + +Peer.prototype.emitHeaders = co(function* emitHeaders(headers) { + this.emit('headers', headers); + yield this.pool.handleHeaders(this, headers); +}); + /** * Flush merkle block once all matched * txs have been received. * @private + * @returns {Promise} */ -Peer.prototype.flushMerkle = function flushMerkle() { +Peer.prototype.flushMerkle = co(function* flushMerkle() { assert(this.lastMerkle); + this.lastBlock = util.ms(); - this.emit('merkleblock', this.lastMerkle); + + yield this.emitBlock(this.lastMerkle); + this.lastMerkle = null; this.waitingTX = 0; -}; +}); + +/** + * Emit a transaction. + * @param {TX} tx + * @returns {Promise} + */ + +Peer.prototype.emitTX = co(function* emitTX(tx) { + this.emit('tx', tx); + yield this.pool.handleTX(this, tx); +}); + +/** + * Emit transaction inv. + * @param {Hash[]} txs + * @returns {Promise} + */ + +Peer.prototype.emitTXInv = co(function* emitTXInv(txs) { + this.emit('txs', txs); + yield this.pool.handleTXInv(this, txs); +}); /** * Handle `filterload` packet. @@ -1675,7 +1719,7 @@ Peer.prototype.handleMerkleBlock = co(function* handleMerkleBlock(packet) { this.waitingTX = block.matches.length; if (this.waitingTX === 0) - this.flushMerkle(); + yield this.flushMerkle(); }); /** @@ -2400,10 +2444,10 @@ Peer.prototype.handleInv = co(function* handleInv(packet) { this.emit('inv', items); if (blocks.length > 0) - this.emit('blocks', blocks); + yield this.emitBlockInv(blocks); if (txs.length > 0) - this.emit('txs', txs); + yield this.emitTXInv(txs); this.logger.debug( 'Received inv packet with %d items: blocks=%d txs=%d (%s).', @@ -2434,7 +2478,7 @@ Peer.prototype.handleHeaders = co(function* handleHeaders(packet) { return; } - this.emit('headers', headers); + yield this.emitHeaders(headers); }); /** @@ -2464,7 +2508,7 @@ Peer.prototype.handleBlock = co(function* handleBlock(packet) { this.lastBlock = util.ms(); - this.emit('block', packet.block); + yield this.emitBlock(packet.block); }); /** @@ -2481,12 +2525,12 @@ Peer.prototype.handleTX = co(function* handleTX(packet) { if (this.lastMerkle.hasTX(tx)) { this.lastMerkle.addTX(tx); if (--this.waitingTX === 0) - this.flushMerkle(); + yield this.flushMerkle(); return; } } - this.emit('tx', tx); + yield this.emitTX(tx); }); /** @@ -2700,10 +2744,10 @@ Peer.prototype.handleCmpctBlock = co(function* handleCmpctBlock(packet) { if (result) { this.lastBlock = util.ms(); - this.emit('block', block.toBlock()); this.logger.debug( 'Received full compact block %s (%s).', block.rhash(), this.hostname); + yield this.emitBlock(block.toBlock()); return; } @@ -2799,8 +2843,9 @@ Peer.prototype.handleBlockTxn = co(function* handleBlockTxn(packet) { this.lastBlock = util.ms(); - this.emit('block', block.toBlock()); this.emit('getblocktxn', res); + + yield this.emitBlock(block.toBlock()); }); /** diff --git a/lib/net/pool.js b/lib/net/pool.js index 068ada33..84ed4d31 100644 --- a/lib/net/pool.js +++ b/lib/net/pool.js @@ -99,7 +99,7 @@ function Pool(options) { this.chain = this.options.chain; this.mempool = this.options.mempool; this.server = this.options.createServer(); - this.locker = new Lock(); + this.locker = new Lock(true); this.connected = false; this.syncing = false; @@ -108,7 +108,6 @@ function Pool(options) { this.spvFilter = null; this.txFilter = null; this.requestMap = new Map(); - this.queueMap = new Map(); this.invMap = new Map(); this.scheduled = false; this.pendingWatch = null; @@ -445,19 +444,11 @@ Pool.prototype.addLoader = function addLoader() { return; } - addr = this.getHost(false); + addr = this.getHost(); if (!addr) return; - peer = this.peers.get(addr.hostname); - - if (peer) { - this.logger.info('Repurposing peer for loader (%s).', peer.hostname); - this.setLoader(peer); - return; - } - peer = this.createPeer(addr); this.logger.info('Setting loader peer (%s).', peer.hostname); @@ -661,56 +652,6 @@ Pool.prototype.bindPeer = function bindPeer(peer) { self.handleAddr(peer, addrs); }); - peer.on('merkleblock', co(function* (block) { - if (!self.options.spv) - return; - - try { - yield self.handleBlock(peer, block); - } catch (e) { - self.emit('error', e); - } - })); - - peer.on('block', co(function* (block) { - if (self.options.spv) - return; - - try { - yield self.handleBlock(peer, block); - } catch (e) { - self.emit('error', e); - } - })); - - peer.on('tx', co(function* (tx) { - try { - yield self.handleTX(peer, tx); - } catch (e) { - self.emit('error', e); - } - })); - - peer.on('headers', co(function* (headers) { - try { - yield self.handleHeaders(peer, headers); - } catch (e) { - self.emit('error', e); - } - })); - - peer.on('blocks', co(function* (hashes) { - try { - yield self.handleBlockInv(peer, hashes); - } catch (e) { - self.emit('error', e); - } - })); - - peer.on('txs', function(hashes) { - self.handleTXInv(peer, hashes); - }); - peer.on('reject', function(reject) { self.handleReject(peer, reject); }); @@ -840,6 +781,24 @@ Pool.prototype.handleAddr = function handleAddr(peer, addrs) { */ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) { + var hash = block.hash('hex'); + var unlock = yield this.locker.lock(hash); + try { + return yield this._handleBlock(peer, block); + } finally { + unlock(); + } +}); + +/** + * Handle `block` packet. Attempt to add to chain (without a lock). + * @private + * @param {Peer} peer + * @param {MemBlock|MerkleBlock} block + * @returns {Promise} + */ + +Pool.prototype._handleBlock = co(function* handleBlock(peer, block) { var hash = block.hash('hex'); var requested; @@ -861,10 +820,8 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) { try { yield this.chain.add(block); } catch (err) { - if (err.type !== 'VerifyError') { - this.scheduleRequests(peer); + if (err.type !== 'VerifyError') throw err; - } if (err.reason === 'bad-prevblk') { if (this.options.headers) { @@ -873,24 +830,19 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) { } this.logger.debug('Peer sent an orphan block. Resolving.'); yield peer.resolveOrphan(null, hash); - this.scheduleRequests(peer); throw err; } peer.reject(block, err.code, err.reason, err.score); - this.scheduleRequests(peer); - throw err; } - this.scheduleRequests(peer); - if (this.logger.level >= 4 && this.chain.total % 20 === 0) { this.logger.debug('Status:' + ' ts=%s height=%d progress=%s' + ' blocks=%d orphans=%d active=%d' - + ' queue=%d target=%s peers=%d' + + ' target=%s peers=%d' + ' pending=%d jobs=%d', util.date(block.ts), this.chain.height, @@ -898,11 +850,10 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) { this.chain.total, this.chain.orphanCount, this.requestMap.size, - this.queueMap.size, block.bits, this.peers.size(), - this.chain.locker.pending, - this.chain.locker.jobs.length); + this.locker.pending, + this.locker.jobs.length); } if (this.chain.total % 2000 === 0) { @@ -922,6 +873,24 @@ Pool.prototype.handleBlock = co(function* handleBlock(peer, block) { */ Pool.prototype.handleTX = co(function* handleTX(peer, tx) { + var hash = tx.hash('hex'); + var unlock = yield this.locker.lock(hash); + try { + return yield this._handleTX(peer, tx); + } finally { + unlock(); + } +}); + +/** + * Handle a transaction. Attempt to add to mempool (without a lock). + * @private + * @param {Peer} peer + * @param {TX} tx + * @returns {Promise} + */ + +Pool.prototype._handleTX = co(function* handleTX(peer, tx) { var hash = tx.hash('hex'); var requested = this.fulfill(peer, hash); var missing; @@ -937,7 +906,6 @@ Pool.prototype.handleTX = co(function* handleTX(peer, tx) { if (!requested) this.txFilter.add(tx.hash()); this.emit('tx', tx, peer); - this.scheduleRequests(peer); return; } @@ -963,15 +931,9 @@ Pool.prototype.handleTX = co(function* handleTX(peer, tx) { 'Requesting %d missing transactions (%s).', missing.length, peer.hostname); - try { - this.getTX(peer, missing); - } catch (e) { - this.emit('error', e); - } + this.getTX(peer, missing); } - this.scheduleRequests(peer); - this.emit('tx', tx, peer); }); @@ -1002,7 +964,9 @@ Pool.prototype.handleHeaders = co(function* handleHeaders(peer, headers) { */ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) { - var i, ret, header, hash, last; + var items = []; + var ret = new VerifyResult(); + var i, header, hash, last; if (!this.options.headers) return; @@ -1010,8 +974,6 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) { if (!this.syncing) return; - ret = new VerifyResult(); - this.logger.debug( 'Received %s headers from peer (%s).', headers.length, @@ -1019,6 +981,9 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) { this.emit('headers', headers); + if (headers.length === 0) + return; + for (i = 0; i < headers.length; i++) { header = headers[i]; hash = header.hash('hex'); @@ -1035,14 +1000,14 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) { last = hash; - if (yield this.chain.has(hash)) + if (yield this.hasBlock(hash)) continue; - this.getBlock(peer, hash); + items.push(hash); } - // Schedule the getdata's we just added. - this.scheduleRequests(peer); + // Request the hashes we just added. + this.getBlock(peer, items); // Restart the getheaders process // Technically `last` is not indexed yet so @@ -1051,7 +1016,7 @@ Pool.prototype._handleHeaders = co(function* handleHeaders(peer, headers) { // that much since FindForkInGlobalIndex // simply tries to find the latest block in // the peer's chain. - if (last && headers.length === 2000) + if (headers.length === 2000) yield peer.getHeaders(last); }); @@ -1082,6 +1047,7 @@ Pool.prototype.handleBlockInv = co(function* handleBlockInv(peer, hashes) { */ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) { + var items = []; var i, hash; if (!this.syncing) @@ -1101,8 +1067,6 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) { yield peer.getHeaders(null, hash); } - this.scheduleRequests(peer); - return; } @@ -1118,20 +1082,14 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) { // Resolve orphan chain. if (this.chain.hasOrphan(hash)) { - // There is a possible race condition here. - // The orphan may get resolved by the time - // we create the locator. In that case, we - // should probably actually move to the - // `exists` clause below if it is the last - // hash. this.logger.debug('Received known orphan hash (%s).', peer.hostname); yield peer.resolveOrphan(null, hash); continue; } // Request the block if we don't have it. - if (!(yield this.chain.has(hash))) { - this.getBlock(peer, hash); + if (!(yield this.hasBlock(hash))) { + items.push(hash); continue; } @@ -1148,7 +1106,7 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) { } } - this.scheduleRequests(peer); + this.getBlock(peer, items); }); /** @@ -1158,14 +1116,42 @@ Pool.prototype._handleBlockInv = co(function* handleBlockInv(peer, hashes) { * @param {Hash[]} hashes */ -Pool.prototype.handleTXInv = function handleTXInv(peer, hashes) { +Pool.prototype.handleTXInv = co(function* handleTXInv(peer, hashes) { + var unlock = yield this.locker.lock(); + try { + return yield this._handleTXInv(peer, hashes); + } finally { + unlock(); + } +}); + +/** + * Handle peer inv packet (txs). + * @private + * @param {Peer} peer + * @param {Hash[]} hashes + */ + +Pool.prototype._handleTXInv = co(function* handleTXInv(peer, hashes) { + var items = []; + var i, hash; + this.emit('txs', hashes, peer); if (this.syncing && !this.chain.synced) return; - this.getTX(peer, hashes); -}; + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + + if (this.hasTX(hash)) + continue; + + items.push(hash); + } + + this.getTX(peer, items); +}); /** * Handle peer reject event. @@ -1290,7 +1276,7 @@ Pool.prototype.addInbound = function addInbound(socket) { * @returns {NetAddress} */ -Pool.prototype.getHost = function getHost(unique) { +Pool.prototype.getHost = function getHost() { var services = this.options.requiredServices; var now = this.network.now(); var i, entry, addr; @@ -1312,10 +1298,8 @@ Pool.prototype.getHost = function getHost(unique) { addr = entry.addr; - if (unique) { - if (this.peers.has(addr.hostname)) - continue; - } + if (this.peers.has(addr.hostname)) + continue; if (!addr.isValid()) continue; @@ -1355,7 +1339,7 @@ Pool.prototype.addOutbound = function addOutbound() { if (!this.peers.load) return; - addr = this.getHost(true); + addr = this.getHost(); if (!addr) return; @@ -1424,16 +1408,6 @@ Pool.prototype.removePeer = function removePeer(peer) { hash = hashes[i]; this.fulfill(peer, hash); } - - hashes = peer.blockQueue; - - for (i = 0; i < hashes.length; i++) { - hash = hashes[i]; - assert(this.queueMap.has(hash)); - this.queueMap.remove(hash); - } - - peer.blockQueue.length = 0; }; /** @@ -1541,14 +1515,16 @@ Pool.prototype.watchOutpoint = function watchOutpoint(outpoint) { }; /** - * Queue a `getdata` request to be sent. Checks existence - * in the chain before requesting. + * Queue a `getdata` request to be sent. * @param {Peer} peer - * @param {Hash} hash - Block hash. + * @param {Hash[]} hashes * @returns {Promise} */ -Pool.prototype.getBlock = function getBlock(peer, hash) { +Pool.prototype.getBlock = function getBlock(peer, hashes) { + var items = []; + var i, hash; + if (!this.loaded) return; @@ -1558,38 +1534,30 @@ Pool.prototype.getBlock = function getBlock(peer, hash) { if (peer.destroyed) throw new Error('Peer is destroyed (getdata).'); - if (this.requestMap.has(hash)) + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + + if (this.requestMap.has(hash)) + continue; + + this.requestMap.insert(hash); + peer.requestMap.insert(hash); + + items.push(hash); + } + + if (items.length === 0) return; - if (this.queueMap.has(hash)) - return; + this.logger.debug( + 'Requesting %d/%d blocks from peer with getdata (%s).', + items.length, + this.requestMap.size, + peer.hostname); - this.queueMap.insert(hash); - peer.blockQueue.push(hash); + peer.getBlock(items); }; -/** - * Test whether the chain has or has seen an item. - * @param {Hash} hash - * @returns {Promise} - Returns Boolean. - */ - -Pool.prototype.hasBlock = co(function* hasBlock(hash) { - // Check the chain. - if (yield this.chain.has(hash)) - return true; - - // Check the pending requests. - if (this.requestMap.has(hash)) - return true; - - // Check the queued requests. - if (this.queueMap.has(hash)) - return true; - - return false; -}); - /** * Queue a `getdata` request to be sent. Checks existence * in the mempool before requesting. @@ -1614,11 +1582,9 @@ Pool.prototype.getTX = function getTX(peer, hashes) { for (i = 0; i < hashes.length; i++) { hash = hashes[i]; - if (this.hasTX(hash)) + if (this.requestMap.has(hash)) continue; - assert(!this.requestMap.has(hash)); - this.requestMap.insert(hash); peer.requestMap.insert(hash); @@ -1637,6 +1603,24 @@ Pool.prototype.getTX = function getTX(peer, hashes) { peer.getTX(items); }; +/** + * Test whether the chain has or has seen an item. + * @param {Hash} hash + * @returns {Promise} - Returns Boolean. + */ + +Pool.prototype.hasBlock = co(function* hasBlock(hash) { + // Check the lock. + if (this.locker.has(hash)) + return true; + + // Check the chain. + if (yield this.chain.has(hash)) + return true; + + return false; +}); + /** * Test whether the mempool has or has seen an item. * @param {Hash} hash @@ -1644,6 +1628,10 @@ Pool.prototype.getTX = function getTX(peer, hashes) { */ Pool.prototype.hasTX = function hasTX(hash) { + // Check the lock queue. + if (this.locker.has(hash)) + return true; + if (!this.mempool) { // Check the TX filter if // we don't have a mempool. @@ -1661,100 +1649,9 @@ Pool.prototype.hasTX = function hasTX(hash) { } } - // Check the pending requests. - if (this.requestMap.has(hash)) - return true; - return false; }; -/** - * Schedule next batch of `getdata` requests for peer. - * @param {Peer} peer - * @returns {Promise} - */ - -Pool.prototype.scheduleRequests = co(function* scheduleRequests(peer) { - if (this.scheduled) - return; - - this.scheduled = true; - - yield this.chain.onDrain(); - - this.sendBlockRequests(peer); - - this.scheduled = false; -}); - -/** - * Send scheduled requests in the request queues. - * @private - * @param {Peer} peer - */ - -Pool.prototype.sendBlockRequests = function sendBlockRequests(peer) { - var queue = peer.blockQueue; - var hashes = []; - var i, size, hash; - - if (peer.destroyed) - return; - - if (queue.length === 0) - return; - - if (this.options.spv) { - if (this.requestMap.size >= 2000) - return; - - size = Math.min(queue.length, 50000); - } else { - size = this.network.getBatchSize(this.chain.height); - - if (this.requestMap.size >= size) - return; - } - - for (i = 0; i < queue.length; i++) { - hash = queue[i]; - - if (hashes.length === size) - break; - - assert(this.queueMap.has(hash)); - - this.queueMap.remove(hash); - - assert(!this.requestMap.has(hash)); - - // Do a second check to make sure - // we don't have this in the chain. - // Avoids a potential race condition - // with the `chain.has()` call. - if (this.chain.seen(hash)) - continue; - - this.requestMap.insert(hash); - peer.requestMap.insert(hash); - - hashes.push(hash); - } - - peer.blockQueue = queue.slice(i); - - if (hashes.length === 0) - return; - - this.logger.debug( - 'Requesting %d/%d blocks from peer with getdata (%s).', - hashes.length, - this.requestMap.size, - peer.hostname); - - peer.getBlock(hashes); -}; - /** * Fulfill a requested item. * @param {Peer} peer