From e912f78814c3ccc1fda2aeda2f7773bc0991f7c4 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Tue, 16 Feb 2016 03:03:11 -0800 Subject: [PATCH] async chain. --- lib/bcoin/block.js | 16 +- lib/bcoin/chain.js | 377 +++++++++++++++++++++++++++++-------------- lib/bcoin/chaindb.js | 4 +- lib/bcoin/node.js | 12 ++ lib/bcoin/pool.js | 301 +++++++++++++++++++--------------- lib/bcoin/utils.js | 4 +- 6 files changed, 457 insertions(+), 257 deletions(-) diff --git a/lib/bcoin/block.js b/lib/bcoin/block.js index bb7feb8f..b17b9d65 100644 --- a/lib/bcoin/block.js +++ b/lib/bcoin/block.js @@ -443,6 +443,8 @@ Block.prototype.verifyContext = function verifyContext() { } // BIP30 - Ensure there are no duplicate txids + // this.node.hasTX(tx.hash('hex'), function(err, has) { + // if (has) if (this.chain[tx.hash('hex')]) { // Blocks 91842 and 91880 created duplicate // txids by using the same exact output script @@ -465,19 +467,19 @@ Block.prototype.verifyContext = function verifyContext() { if (!input.output) continue; - assert(input.output); + // Ensure tx is not double spending an output + if (!input.output) { + utils.debug('Block is using spent inputs: %s (tx: %s, output: %s)', + this.rhash, tx.hash('hex'), + input.prevout.hash + '/' + input.prevout.index); + return false; + } // Verify the script if (!tx.verify(j, true, flags)) { utils.debug('Block has invalid inputs: %s', this.rhash); return false; } - - // Ensure tx is not double spending an output - // if (this.chain.isSpent(input.prevout.hash, input.prevout.index)) { - // utils.debug('Block is using spent inputs: %s', this.rhash); - // return false; - // } } } } diff --git a/lib/bcoin/chain.js b/lib/bcoin/chain.js index a47101cc..ea6ccfd2 100644 --- a/lib/bcoin/chain.js +++ b/lib/bcoin/chain.js @@ -259,50 +259,149 @@ Chain.prototype._preload = function _preload(callback) { }); }; -Chain.prototype._addEntry = function _addEntry(entry) { +Chain.prototype._saveBlock = function _saveBlock(block, callback) { + var node = bcoin.node.global; + + if (!node) + return callback(); + + node.block.saveBlock(block, function(err) { + if (err) + return callback(err); + + node.mempool.addBlock(block); + + return callback(); + }); +}; + +Chain.prototype._fillCoins = function _fillCoin(block, callback) { + var node = bcoin.node.global; + + if (!node) + return callback(); + + node.block.fillCoins(block, callback); +}; + +Chain.prototype._verifyContext = function _verifyContext(block, prev, callback) { + var node = bcoin.node.global; + + if (!node) + return callback(null, block.verifyContext()); + + var height = prev.height + 1; + var scriptChecks = true; + + node.block.fillCoins(block, function(err) { + var pending; + + if (err) + return callback(err); + + pending = block.txs.length; + + // If we are an ancestor of a checkpoint, we can + // skip the input verification. + if (height < network.checkpoints.lastHeight && !network.checkpoints[height]) + scriptChecks = false; + + if (!block.verifyContext()) + return callback(null, false); + + if (!pending) + return callback(null, true); + + // Check all transactions + block.txs.forEach(function(tx) { + var i; + for (i = 0; j < tx.inputs.length; i++) { + input = tx.inputs[i]; + // Ensure tx is not double spending an output + if (!input.output) { + utils.debug('Block is using spent inputs: %s (tx: %s, output: %s)', + this.rhash, tx.hash('hex'), + input.prevout.hash + '/' + input.prevout.index); + return callback(null, false); + } + } + // BIP30 - Ensure there are no duplicate txids + node.block.hasTX(tx.hash('hex'), function(err, has) { + // Blocks 91842 and 91880 created duplicate + // txids by using the same exact output script + // and extraNonce. + if (has) { + utils.debug('Block is overwriting txids: %s', this.rhash); + if (!(network.type === 'main' && (height === 91842 || height === 91880))) + return callback(null, false); + } + return callback(null, true); + }); + }); + }); +}; + + +Chain.prototype._removeBlock = function _removeBlock(tip, callback) { + var node = bcoin.node.global; + + if (!node) + return callback(); + + node.block.removeBlock(tip, function(err, block) { + if (err) + return callback(err); + + if (!block) + return; + + node.mempool.removeBlock(block); + }); +}; + +Chain.prototype._addEntry = function _addEntry(entry, block, callback) { var self = this; var existing; + callback = utils.asyncify(callback); + // Already added if (this.heightLookup[entry.hash] != null) { assert(this.heightLookup[entry.hash] === entry.height); - return Chain.codes.unchanged; + return callback(null, Chain.codes.unchanged); } // Duplicate height existing = this.db.get(entry.height); if (existing && existing.hash === entry.hash) - return Chain.codes.unchanged; + return callback(null, Chain.codes.unchanged); - // Fork at checkpoint - checkpoint = network.checkpoints[entry.height]; - if (checkpoint) { - this.emit('checkpoint', entry.height, entry.hash, checkpoint); - if (hash !== checkpoint) { - // Resetting to the last checkpoint _really_ isn't - // necessary (even bitcoind doesn't do it), but it - // could be used if you want to be on the overly - // safe (see: paranoid) side. - // this.resetLastCheckpoint(entry.height); - return Chain.codes.badCheckpoint; - } - } + this._saveBlock(block, function(err) { + if (err) + return callback(err); - this._saveEntry(entry, true); + self._saveEntry(entry, function(err) { + if (err) + return callback(err); - return Chain.codes.okay; + return callback(null, Chain.codes.okay); + }); + }); }; -Chain.prototype._saveEntry = function _saveEntry(entry, save) { - if (save) - this.db.save(entry); - +Chain.prototype._saveEntry = function _saveEntry(entry, callback) { this.heightLookup[entry.hash] = entry.height; if (!this.tip || entry.height > this.tip.height) { this.tip = entry; this.emit('tip', this.tip); } + + if (callback) { + if (typeof callback !== 'function') + callback = null; + this.db.save(entry, callback); + } }; Chain.prototype.resetLastCheckpoint = function resetLastCheckpoint(height) { @@ -355,18 +454,19 @@ Chain.prototype.resetTime = function resetTime(ts) { return this.resetHeight(entry.height); }; -Chain.prototype.add = function add(block, peer) { +Chain.prototype.add = function add(block, peer, callback) { + var self = this; var initial = block; var code = Chain.codes.unchanged; - var hash, prevHash, prevHeight, entry, tip, existing; + var hash, prevHash, prevHeight, entry, tip, existing, checkpoint; var total = 0; - for (;;) { + (function next() { hash = block.hash('hex'); prevHash = block.prevBlock; // Find the previous block height/index. - prevHeight = this.heightLookup[prevHash]; + prevHeight = self.heightLookup[prevHash]; // Validate the block we want to add. // This is only necessary for new @@ -374,50 +474,50 @@ Chain.prototype.add = function add(block, peer) { // orphans. if (block === initial && !block.verify()) { code = Chain.codes.invalid; - this.emit('invalid', { + self.emit('invalid', { height: prevHeight + 1, hash: hash }, peer); - break; + return done(null, code); } // If the block is already known to be // an orphan, ignore it. - if (this.orphan.map[prevHash]) { + if (self.orphan.map[prevHash]) { // If the orphan chain forked, simply // reset the orphans and find a new peer. - if (this.orphan.map[prevHash].hash('hex') !== hash) { - this.orphan.map = {}; - this.orphan.bmap = {}; - this.orphan.count = 0; - this.orphan.size = 0; - this.emit('fork', { + if (self.orphan.map[prevHash].hash('hex') !== hash) { + self.orphan.map = {}; + self.orphan.bmap = {}; + self.orphan.count = 0; + self.orphan.size = 0; + self.emit('fork', { height: -1, - expected: this.orphan.map[prevHash].hash('hex'), + expected: self.orphan.map[prevHash].hash('hex'), received: hash, checkpoint: false }, peer); code = Chain.codes.forked; - break; + return done(null, code); } code = Chain.codes.knownOrphan; - break; + return done(null, code); } // If previous block wasn't ever seen, // add it current to orphans and break. if (prevHeight == null) { - this.orphan.count++; - this.orphan.size += block.getSize(); - this.orphan.map[prevHash] = block; - this.orphan.bmap[hash] = block; + self.orphan.count++; + self.orphan.size += block.getSize(); + self.orphan.map[prevHash] = block; + self.orphan.bmap[hash] = block; code = Chain.codes.newOrphan; total++; - break; + return done(null, code); } // Create a new chain entry. - entry = new bcoin.chainblock(this, { + entry = new bcoin.chainblock(self, { hash: hash, version: block.version, prevBlock: prevHash, @@ -430,9 +530,9 @@ Chain.prototype.add = function add(block, peer) { // Add entry if we do not have it (or if // there is another entry at its height) - existing = this.db.get(entry.height); + existing = self.db.get(entry.height); if (!existing || existing.hash !== hash) { - assert(this.heightLookup[entry.hash] == null); + assert(self.heightLookup[entry.hash] == null); // A valid block with an already existing // height came in, that spells fork. We @@ -443,9 +543,9 @@ Chain.prototype.add = function add(block, peer) { // The tip has more chainwork, it is a // higher height than the entry. This is // not an alternate tip. Ignore it. - if (this.tip.chainwork.cmp(entry.chainwork) > 0) { + if (self.tip.chainwork.cmp(entry.chainwork) > 0) { code = Chain.codes.unchanged; - break; + return done(null, code); } // Get _our_ tip as opposed to // the attempted alternate tip. @@ -453,57 +553,63 @@ Chain.prototype.add = function add(block, peer) { // The block has equal chainwork (an // alternate tip). Reset the chain, find // a new peer, and wait to see who wins. - this.resetHeight(entry.height - 1); - this.emit('fork', { + self.resetHeight(entry.height - 1); + self.emit('fork', { height: prevHeight + 1, expected: tip.hash, received: hash, checkpoint: false }, peer); code = Chain.codes.forked; - break; + return self._removeBlock(tip.hash, function(err) { + if (err) + return done(err); + return done(null, code); + }); } - // Do "contextual" verification on our block - // now that we're certain its previous - // block is in the chain. - if (!block.verifyContext()) { - code = Chain.codes.invalid; - this.emit('invalid', { - height: prevHeight + 1, - hash: hash - }, peer); - break; - } - - // Attempt to add block to the chain index. - code = this._addEntry(entry); - - // Result should never be `unchanged` since - // we already verified there were no - // duplicate heights, etc. - assert(code !== Chain.codes.unchanged); - + // Fork at checkpoint // Block did not match the checkpoint. The // chain could be reset to the last sane // checkpoint, but it really isn't necessary, // so we don't do it. The misbehaving peer has // been killed and hopefully we find a peer // who isn't trying to fool us. - if (code === Chain.codes.badCheckpoint) { - this.emit('fork', { - height: entry.height, - expected: network.checkpoints[entry.height], - received: entry.hash, - checkpoint: true - }); - break; + checkpoint = network.checkpoints[entry.height]; + if (checkpoint) { + self.emit('checkpoint', entry.height, entry.hash, checkpoint); + if (hash !== checkpoint) { + // Resetting to the last checkpoint _really_ isn't + // necessary (even bitcoind doesn't do it), but it + // could be used if you want to be on the overly + // safe (see: paranoid) side. + // this.resetLastCheckpoint(entry.height); + code = Chain.codes.badCheckpoint; + self.emit('fork', { + height: entry.height, + expected: network.checkpoints[entry.height], + received: entry.hash, + checkpoint: true + }); + return done(null, code); + } } - // Should never happen, but... something - // went wrong. Ignore this block. - if (code !== Chain.codes.okay) - break; + // Could fill here for contextual verification. + // Also check isSpent here! + // self._fillCoins(block, function(err) { + + // Do "contextual" verification on our block + // now that we're certain its previous + // block is in the chain. + if (!block.verifyContext()) { + code = Chain.codes.invalid; + self.emit('invalid', { + height: prevHeight + 1, + hash: hash + }, peer); + return done(null, code); + } // Update the block height block.height = entry.height; @@ -511,50 +617,79 @@ Chain.prototype.add = function add(block, peer) { tx.height = entry.height; }); - // Keep track of the number of blocks we - // added and the number of orphans resolved. - total++; + // Attempt to add block to the chain index. + self._addEntry(entry, block, function(err, code_) { + if (err) + return done(err); - // Emit our block (and potentially resolved - // orphan) so the programmer can save it. - this.emit('block', block, peer); - this.emit('entry', entry); - if (block !== initial) - this.emit('resolved', block, peer); + code = code_; - // Fullfill request - this.request.fullfill(hash, block); + // Result should never be `unchanged` since + // we already verified there were no + // duplicate heights, etc. + assert(code !== Chain.codes.unchanged); + + // Should always be okay. + assert(code === Chain.codes.okay); + + // Keep track of the number of blocks we + // added and the number of orphans resolved. + total++; + + // Emit our block (and potentially resolved + // orphan) so the programmer can save it. + self.emit('block', block, peer); + self.emit('entry', entry); + if (block !== initial) + self.emit('resolved', block, peer); + + // Fullfill request + self.request.fullfill(hash, block); + + handleOrphans(); + }); + } else { + handleOrphans(); } - if (!this.orphan.map[hash]) - break; + function handleOrphans() { + if (!self.orphan.map[hash]) + return done(null, code); - // An orphan chain was found, start resolving. - block = this.orphan.map[hash]; - delete this.orphan.bmap[block.hash('hex')]; - delete this.orphan.map[hash]; - this.orphan.count--; - this.orphan.size -= block.getSize(); + // An orphan chain was found, start resolving. + block = self.orphan.map[hash]; + delete self.orphan.bmap[block.hash('hex')]; + delete self.orphan.map[hash]; + self.orphan.count--; + self.orphan.size -= block.getSize(); + + next(); + } + })(); + + function done(err, code) { + // Failsafe for large orphan chains. Do not + // allow more than 20mb stored in memory. + if (self.orphan.size > 20971520) { + Object.keys(self.orphan.bmap).forEach(function(hash) { + self.emit('unresolved', self.orphan.bmap[hash], peer); + }); + self.orphan.map = {}; + self.orphan.bmap = {}; + self.orphan.count = 0; + self.orphan.size = 0; + } + + if (code !== Chain.codes.okay) { + if (!(self.options.multiplePeers && code === Chain.codes.newOrphan)) + utils.debug('Chain Error: %s', Chain.msg(code)); + } + + if (err) + return callback(err); + + return callback(null, total); } - - // Failsafe for large orphan chains. Do not - // allow more than 20mb stored in memory. - if (this.orphan.size > 20971520) { - Object.keys(this.orphan.bmap).forEach(function(hash) { - this.emit('unresolved', this.orphan.bmap[hash], peer); - }, this); - this.orphan.map = {}; - this.orphan.bmap = {}; - this.orphan.count = 0; - this.orphan.size = 0; - } - - if (code !== Chain.codes.okay) { - if (!(this.options.multiplePeers && code === Chain.codes.newOrphan)) - utils.debug('Chain Error: %s', Chain.msg(code)); - } - - return total; }; Chain.prototype.has = function has(hash) { diff --git a/lib/bcoin/chaindb.js b/lib/bcoin/chaindb.js index c3b6b702..9be887f4 100644 --- a/lib/bcoin/chaindb.js +++ b/lib/bcoin/chaindb.js @@ -222,8 +222,8 @@ ChainDB.prototype.getAsync = function getAsync(height, callback) { }); }; -ChainDB.prototype.save = function save(entry) { - return this.saveAsync(entry); +ChainDB.prototype.save = function save(entry, callback) { + return this.saveAsync(entry, callback); }; ChainDB.prototype.saveSync = function saveSync(entry) { diff --git a/lib/bcoin/node.js b/lib/bcoin/node.js index a4ea2542..dc52df8e 100644 --- a/lib/bcoin/node.js +++ b/lib/bcoin/node.js @@ -60,6 +60,12 @@ Node.prototype._init = function _init() { this.pool = new bcoin.pool(this.options.pool); this.chain = this.pool.chain; + if (0) + this.pool.on('block', function(block, peer) { + self.mempool.addBlock(block); + }); + + if (0) this.pool.on('block', function(block, peer) { self.block.saveBlock(block, function(err) { if (err) @@ -93,6 +99,12 @@ Node.prototype._init = function _init() { self.emit('error', err); }); + this.pool.on('fork', function(data) { + if (data.block) + self.mempool.removeBlock(block); + }); + + if (0) this.pool.on('fork', function(a, b) { [a, b].forEach(function(hash) { self.block.removeBlock(hash, function(err, block) { diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index c330648a..26bc0df1 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -380,10 +380,15 @@ Pool.prototype._addLoader = function _addLoader() { return; // If the peer sent us a block that was added // to the chain (not orphans), reset the timeout. - if (self._handleBlock(block, peer)) { - self._startInterval(); - self._startTimer(); - } + self._handleBlock(block, peer, function(err, added) { + if (err) + self.emit('error', err); + + if (added) { + self._startInterval(); + self._startTimer(); + } + }); }); peer.on('block', function(block) { @@ -391,10 +396,15 @@ Pool.prototype._addLoader = function _addLoader() { return; // If the peer sent us a block that was added // to the chain (not orphans), reset the timeout. - if (self._handleBlock(block, peer)) { - self._startInterval(); - self._startTimer(); - } + self._handleBlock(block, peer, function(err, added) { + if (err) + self.emit('error', err); + + if (added) { + self._startInterval(); + self._startTimer(); + } + }); }); if (self.options.headers) { @@ -590,126 +600,152 @@ Pool.prototype._prehandleBlock = function _prehandleBlock(block, peer, callback) return callback(null, block); }; -Pool.prototype._handleBlock = function _handleBlock(block, peer) { +Pool.prototype._handleBlock = function _handleBlock(block, peer, callback) { var self = this; - var requested; - // Fulfill our request. - requested = this._response(block); + callback = utils.asyncify(callback); - // Emulate BIP37: emit all the filtered transactions. - if (this.options.fullNode && this.listeners('watched').length > 0) { - block.txs.forEach(function(tx) { - if (self.isWatched(tx)) - self.emit('watched', tx, peer); + this._prehandleBlock(block, peer, function(err) { + var requested; + + if (err) + return callback(err); + + // Fulfill our request. + requested = self._response(block); + + // Emulate BIP37: emit all the filtered transactions. + if (self.options.fullNode && self.listeners('watched').length > 0) { + block.txs.forEach(function(tx) { + if (self.isWatched(tx)) + self.emit('watched', tx, peer); + }); + } + + // Ensure the block was not invalid last time. + // Someone might be sending us bad blocks to DoS us. + if (self.block.invalid[block.hash('hex')]) { + utils.debug('Peer is sending an invalid chain (%s)', peer.host); + self.setMisbehavior(peer, 100); + return callback(null, false); + } + + // Ensure this is not a continuation + // of an invalid chain. + if (self.block.invalid[block.prevBlock]) { + utils.debug( + 'Peer is sending an invalid continuation chain (%s)', + peer.host); + self.setMisbehavior(peer, 100); + return callback(null, false); + } + + // Ignore if we already have. + if (self.chain.has(block)) { + utils.debug('Already have block %s (%s)', block.height, peer.host); + self.setMisbehavior(peer, 1); + return callback(null, false); + } + + // Make sure the block is valid. + if (!block.verify()) { + utils.debug( + 'Block verification failed for %s (%s)', + block.rhash, peer.host); + self.block.invalid[block.hash('hex')] = true; + self.setMisbehavior(peer, 100); + return callback(null, false); + } + + // Someone is sending us blocks without + // us requesting them. + if (!requested) { + utils.debug( + 'Recieved unrequested block: %s (%s)', + block.rhash, peer.host); + } + + // Resolve orphan chain + if (!self.options.headers) { + if (!self.chain.hasBlock(block.prevBlock)) { + // Special case for genesis block. + if (block.isGenesis()) + return callback(null, false); + + // Make sure the peer doesn't send us + // more than 200 orphans every 3 minutes. + if (self.isOrphaning(peer)) { + utils.debug('Peer is orphaning (%s)', peer.host); + self.setMisbehavior(peer, 100); + return callback(null, false); + } + + // NOTE: If we were to emit new orphans here, we + // would not need to store full blocks as orphans. + // However, the listener would not be able to see + // the height until later. + self._addIndex(block, peer, function(err, added) { + if (err) + return callback(err); + + if (added) + self.emit('pool block', block, peer); + + // Resolve orphan chain. + self.peers.load.loadBlocks( + self.chain.getLocator(), + self.chain.getOrphanRoot(block) + ); + + utils.debug('Handled orphan %s (%s)', block.rhash, peer.host); + + return callback(null, false); + }); + + return; + } + } else { + if (!self.chain.hasBlock(block.prevBlock)) { + // Special case for genesis block. + if (block.isGenesis()) + return callback(null, false); + + // Increase banscore by 10 if we're using getheaders. + if (!self.options.multiplePeers) { + if (self.setMisbehavior(peer, 10)) + return callback(null, false); + } + } + } + + // Add to index and emit/save + self._addIndex(block, peer, function(err, added) { + if (err) + return callback(err); + + if (added) { + self.emit('pool block', block, peer); + return callback(null, true); + } + + return callback(null, false); }); - } - - // Ensure the block was not invalid last time. - // Someone might be sending us bad blocks to DoS us. - if (this.block.invalid[block.hash('hex')]) { - utils.debug('Peer is sending an invalid chain (%s)', peer.host); - this.setMisbehavior(peer, 100); - return false; - } - - // Ensure this is not a continuation - // of an invalid chain. - if (this.block.invalid[block.prevBlock]) { - utils.debug( - 'Peer is sending an invalid continuation chain (%s)', - peer.host); - this.setMisbehavior(peer, 100); - return false; - } - - // Ignore if we already have. - if (this.chain.has(block)) { - utils.debug('Already have block %s (%s)', block.height, peer.host); - this.setMisbehavior(peer, 1); - return false; - } - - // Make sure the block is valid. - if (!block.verify()) { - utils.debug( - 'Block verification failed for %s (%s)', - block.rhash, peer.host); - this.block.invalid[block.hash('hex')] = true; - this.setMisbehavior(peer, 100); - return false; - } - - // Someone is sending us blocks without - // us requesting them. - if (!requested) { - utils.debug( - 'Recieved unrequested block: %s (%s)', - block.rhash, peer.host); - } - - // Resolve orphan chain - if (!this.options.headers) { - if (!this.chain.hasBlock(block.prevBlock)) { - // Special case for genesis block. - if (block.isGenesis()) - return false; - - // Make sure the peer doesn't send us - // more than 200 orphans every 3 minutes. - if (this.isOrphaning(peer)) { - utils.debug('Peer is orphaning (%s)', peer.host); - this.setMisbehavior(peer, 100); - return false; - } - - // NOTE: If we were to emit new orphans here, we - // would not need to store full blocks as orphans. - // However, the listener would not be able to see - // the height until later. - if (this._addIndex(block, peer)) - this.emit('pool block', block, peer); - - // Resolve orphan chain. - this.peers.load.loadBlocks( - this.chain.getLocator(), - this.chain.getOrphanRoot(block) - ); - - utils.debug('Handled orphan %s (%s)', block.rhash, peer.host); - - return false; - } - } else { - if (!this.chain.hasBlock(block.prevBlock)) { - // Special case for genesis block. - if (block.isGenesis()) - return false; - - // Increase banscore by 10 if we're using getheaders. - if (!this.options.multiplePeers) { - if (this.setMisbehavior(peer, 10)) - return false; - } - } - } - - // Add to index and emit/save - if (this._addIndex(block, peer)) { - this.emit('pool block', block, peer); - return true; - } + }); }; -Pool.prototype._addIndex = function _addIndex(block, peer) { - var added = this.chain.add(block, peer); +Pool.prototype._addIndex = function _addIndex(block, peer, callback) { + var self = this; + this.chain.add(block, peer, function(err, added) { + if (err) + return callback(err); - if (added === 0) - return false; + if (added === 0) + return callback(null, false); - this.emit('chain-progress', this.chain.fillPercent(), peer); + self.emit('chain-progress', self.chain.fillPercent(), peer); - return true; + return callback(null, true); + }); }; Pool.prototype.isFull = function isFull() { @@ -816,15 +852,28 @@ Pool.prototype._createPeer = function _createPeer(options) { return peer; }; -Pool.prototype._handleTX = function _handleTX(tx, peer) { - var requested = this._response(tx); - var added = this._addTX(tx, 1); +Pool.prototype._handleTX = function _handleTX(tx, peer, callback) { + var self = this; - if (added || tx.block) - this.emit('tx', tx, peer); + callback = utils.asyncify(callback); - if (!this.options.fullNode && tx.block) - this.emit('watched', tx, peer); + this._prehandleTX(tx, peer, function(err) { + var requested, added; + + if (err) + return callback(err); + + requested = self._response(tx); + added = self._addTX(tx, 1); + + if (added || tx.block) + self.emit('tx', tx, peer); + + if (!self.options.fullNode && tx.block) + self.emit('watched', tx, peer); + + return callback(); + }); }; Pool.prototype._addLeech = function _addLeech(socket) { diff --git a/lib/bcoin/utils.js b/lib/bcoin/utils.js index e64574f6..e818a678 100644 --- a/lib/bcoin/utils.js +++ b/lib/bcoin/utils.js @@ -419,7 +419,9 @@ RequestCache.prototype.fullfill = function fullfill(id, err, data) { utils.RequestCache = RequestCache; utils.asyncify = function asyncify(fn) { - return function _asynicifedFn(err, data1, data2) { + if (fn && fn.name === '_asyncifiedFn') + return fn; + return function _asyncifiedFn(err, data1, data2) { if (!fn) return err || data1; utils.nextTick(function() {