From c8bc9fb8b6ea6d9804c65cac33067735291a0658 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Mon, 15 Aug 2016 15:46:37 -0700 Subject: [PATCH] mempool: rewrite. --- lib/bcoin/bip152.js | 63 +-- lib/bcoin/fullnode.js | 117 ++--- lib/bcoin/http/rpc.js | 102 ++-- lib/bcoin/mempool.js | 1145 ++++++++++++++++++++--------------------- lib/bcoin/peer.js | 60 +-- lib/bcoin/pool.js | 2 +- lib/bcoin/rbt.js | 3 + test/block-test.js | 56 +- test/mempool-test.js | 49 +- 9 files changed, 727 insertions(+), 870 deletions(-) diff --git a/lib/bcoin/bip152.js b/lib/bcoin/bip152.js index 262e17ec..5d462840 100644 --- a/lib/bcoin/bip152.js +++ b/lib/bcoin/bip152.js @@ -171,51 +171,42 @@ CompactBlock.prototype.toRequest = function toRequest() { return TXRequest.fromCompact(this); }; -CompactBlock.prototype.fillMempool = function fillMempool(mempool, callback) { - var self = this; +CompactBlock.prototype.fillMempool = function fillMempool(mempool) { var have = {}; - var id, index; + var hashes = mempool.getSnapshot(); + var i, id, index, hash, tx; - mempool.getSnapshot(function(err, hashes) { - if (err) - return callback(err); + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + id = this.sid(hash); + index = this.idMap[id]; - utils.forEachSerial(hashes, function(hash, next) { - id = self.sid(hash); - index = self.idMap[id]; + if (index == null) + continue; - if (index == null) - return next(); + if (have[index]) { + // Siphash collision, just request it. + this.available[index] = null; + this.count--; + continue; + } - if (have[index]) { - // Siphash collision, just request it. - self.available[index] = null; - self.count--; - return next(); - } + tx = mempool.getTX(hash); - mempool.getTX(hash, function(err, tx) { - if (err) - return callback(err); + if (!tx) + continue; - // Race condition: tx - // fell out of mempool. - if (!tx) - return next(); + this.available[index] = tx; + have[index] = true; + this.count++; - self.available[index] = tx; - have[index] = true; - self.count++; + // We actually may have a siphash collision + // here, but exit early anyway for perf. + if (this.count === this.totalTX) + return true; + } - // We actually may have a siphash collision - // here, but exit early anyway for perf. - if (self.count === self.totalTX) - return callback(null, true); - - next(); - }); - }, callback); - }); + return false; }; CompactBlock.prototype.fillMissing = function fillMissing(res) { diff --git a/lib/bcoin/fullnode.js b/lib/bcoin/fullnode.js index cfad931a..2ea43fe9 100644 --- a/lib/bcoin/fullnode.js +++ b/lib/bcoin/fullnode.js @@ -483,32 +483,15 @@ Fullnode.prototype.getFullBlock = function getFullBlock(hash, callback) { */ Fullnode.prototype.getCoin = function getCoin(hash, index, callback) { - var self = this; - this.mempool.getCoin(hash, index, function(err, coin) { - if (err) - return callback(err); + var coin = this.mempool.getCoin(hash, index); - if (coin) - return callback(null, coin); + if (coin) + return callback(null, coin); - self.chain.db.getCoin(hash, index, function(err, coin) { - if (err) - return callback(err); + if (this.mempool.isSpent(hash, index)) + return callback(); - if (!coin) - return callback(); - - self.mempool.isSpent(hash, index, function(err, spent) { - if (err) - return callback(err); - - if (spent) - return callback(); - - return callback(null, coin); - }); - }); - }); + this.chain.db.getCoin(hash, index, callback); }; /** @@ -520,29 +503,23 @@ Fullnode.prototype.getCoin = function getCoin(hash, index, callback) { Fullnode.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, callback) { var self = this; - this.mempool.getCoinsByAddress(addresses, function(err, coins) { + var coins = this.mempool.getCoinsByAddress(addresses); + + this.chain.db.getCoinsByAddress(addresses, function(err, blockCoins) { if (err) return callback(err); - self.chain.db.getCoinsByAddress(addresses, function(err, blockCoins) { + utils.forEach(blockCoins, function(coin, next) { + var spent = self.mempool.isSpent(coin.hash, coin.index); + + if (!spent) + coins.push(coin); + + return next(); + }, function(err) { if (err) return callback(err); - - utils.forEach(blockCoins, function(coin, next) { - self.mempool.isSpent(coin.hash, coin.index, function(err, spent) { - if (err) - return callback(err); - - if (!spent) - coins.push(coin); - - return next(); - }); - }, function(err) { - if (err) - return callback(err); - return callback(null, coins); - }); + return callback(null, coins); }); }); }; @@ -554,25 +531,12 @@ Fullnode.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, cal */ Fullnode.prototype.getTX = function getTX(hash, callback) { - var self = this; + var tx = this.mempool.getTX(hash); - this.mempool.getTX(hash, function(err, tx) { - if (err) - return callback(err); + if (tx) + return callback(null, tx); - if (tx) - return callback(null, tx); - - self.chain.db.getTX(hash, function(err, tx) { - if (err) - return callback(err); - - if (!tx) - return callback(); - - return callback(null, tx); - }); - }); + this.chain.db.getTX(hash, callback); }; /** @@ -582,17 +546,10 @@ Fullnode.prototype.getTX = function getTX(hash, callback) { */ Fullnode.prototype.hasTX = function hasTX(hash, callback) { - var self = this; + if (this.mempool.hasTX(hash)) + return callback(null, true); - this.mempool.hasTX(hash, function(err, result) { - if (err) - return callback(err); - - if (result) - return callback(null, true); - - self.chain.db.hasTX(hash, callback); - }); + this.chain.db.hasTX(hash, callback); }; /** @@ -603,17 +560,10 @@ Fullnode.prototype.hasTX = function hasTX(hash, callback) { */ Fullnode.prototype.isSpent = function isSpent(hash, index, callback) { - var self = this; + if (this.mempool.isSpent(hash, index)) + return callback(null, true); - this.mempool.isSpent(hash, index, function(err, spent) { - if (err) - return callback(err); - - if (spent) - return callback(null, true); - - self.chain.db.isSpent(hash, index, callback); - }); + this.chain.db.isSpent(hash, index, callback); }; /** @@ -624,18 +574,13 @@ Fullnode.prototype.isSpent = function isSpent(hash, index, callback) { */ Fullnode.prototype.getTXByAddress = function getTXByAddress(addresses, callback) { - var self = this; + var mempool = this.mempool.getTXByAddress(addresses); - this.mempool.getTXByAddress(addresses, function(err, mempool) { + this.chain.db.getTXByAddress(addresses, function(err, txs) { if (err) return callback(err); - self.chain.db.getTXByAddress(addresses, function(err, txs) { - if (err) - return callback(err); - - return callback(null, mempool.concat(txs)); - }); + return callback(null, mempool.concat(txs)); }); }; diff --git a/lib/bcoin/http/rpc.js b/lib/bcoin/http/rpc.js index 817b95a9..e190651c 100644 --- a/lib/bcoin/http/rpc.js +++ b/lib/bcoin/http/rpc.js @@ -1094,50 +1094,41 @@ RPC.prototype.getrawmempool = function getrawmempool(args, callback) { RPC.prototype.mempoolToJSON = function mempoolToJSON(verbose, callback) { var self = this; var out = {}; - var tx; + var i, tx, hashes, hash, entry; if (verbose) { - return this.mempool.getSnapshot(function(err, hashes) { - if (err) - return callback(err); + hashes = this.mempool.getSnapshot(); - utils.forEachSerial(hashes, function(hash, next) { - self.mempool.getEntry(hash, function(err, entry) { - if (err) - return callback(err); + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + entry = this.mempool.getEntry(hash); - tx = entry.tx; + if (!entry) + continue; - out[tx.rhash] = { - size: entry.size, - fee: entry.fee, - modifiedfee: entry.fees, - time: entry.ts, - height: entry.height, - startingpriority: entry.priority, - currentpriority: entry.getPriority(self.chain.height), - descendantcount: 0, - descendantsize: entry.size, - descendantfees: entry.fees, - depends: [] - }; + tx = entry.tx; - next(); - }); - }, function(err) { - if (err) - return callback(err); - return callback(null, out); - }); - }); + out[tx.rhash] = { + size: entry.size, + fee: entry.fee, + modifiedfee: entry.fees, + time: entry.ts, + height: entry.height, + startingpriority: entry.priority, + currentpriority: entry.getPriority(self.chain.height), + descendantcount: 0, + descendantsize: entry.size, + descendantfees: entry.fees, + depends: [] + }; + } + + return callback(null, out); } - this.mempool.getSnapshot(function(err, hashes) { - if (err) - return callback(err); + hashes = this.mempool.getSnapshot(); - return callback(null, hashes.map(utils.revHex)); - }); + return callback(null, hashes.map(utils.revHex)); }; RPC.prototype.gettxout = function gettxout(args, callback) { @@ -1573,7 +1564,7 @@ RPC.prototype.getnetworkhashps = function getnetworkhashps(args, callback) { RPC.prototype.prioritisetransaction = function prioritisetransaction(args, callback) { var self = this; - var hash, pri, fee; + var hash, pri, fee, entry; if (args.help || args.length !== 3) { return callback(new RPCError('prioritisetransaction' @@ -1592,35 +1583,32 @@ RPC.prototype.prioritisetransaction = function prioritisetransaction(args, callb hash = utils.revHex(hash); - this.mempool.getEntry(hash, function(err, entry) { + entry = this.mempool.getEntry(hash); + + if (!entry) + return callback(new RPCError('Transaction not in mempool.')); + + entry.priority += pri; + entry.fees += fee; + + if (entry.priority < 0) + entry.priority = 0; + + if (entry.fees < 0) + entry.fees = 0; + + this.mempool.fillAllCoins(entry.tx, function(err) { if (err) return callback(err); - if (!entry) + if (!entry.tx.hasCoins()) return callback(new RPCError('Transaction not in mempool.')); - entry.priority += pri; - entry.fees += fee; - - if (entry.priority < 0) - entry.priority = 0; - - if (entry.fees < 0) - entry.fees = 0; - - this.mempool.fillAllCoins(entry.tx, function(err) { + self.mempool.addUnchecked(entry, function(err) { if (err) return callback(err); - if (!entry.tx.hasCoins()) - return callback(new RPCError('Transaction not in mempool.')); - - self.mempool.addUnchecked(entry, function(err) { - if (err) - return callback(err); - - callback(null, true); - }); + callback(null, true); }); }); }; diff --git a/lib/bcoin/mempool.js b/lib/bcoin/mempool.js index c7f3508c..021a7341 100644 --- a/lib/bcoin/mempool.js +++ b/lib/bcoin/mempool.js @@ -16,12 +16,11 @@ var bcoin = require('./env'); var AsyncObject = require('./async'); var constants = bcoin.protocol.constants; var utils = require('./utils'); +var RBT = require('./rbt'); var assert = utils.assert; var BufferWriter = require('./writer'); var BufferReader = require('./reader'); var VerifyError = bcoin.errors.VerifyError; -var pad32 = utils.pad32; -var DUMMY = new Buffer([0]); var ptrSize; /** @@ -80,17 +79,18 @@ function Mempool(options) { this.locker = new bcoin.locker(this, this.addTX); - this.db = bcoin.ldb({ - location: this.options.location, - db: this.options.db || 'memory' - }); - this.size = 0; this.waiting = {}; this.orphans = {}; this.totalOrphans = 0; this.spent = 0; this.total = 0; + this.tx = {}; + this.spents = {}; + this.coins = {}; + this.time = new RBT(timeCmp); + this.coinIndex = new AddressIndex(this); + this.txIndex = new AddressIndex(this); this.freeCount = 0; this.lastTime = 0; @@ -122,28 +122,7 @@ utils.inherits(Mempool, AsyncObject); */ Mempool.prototype._open = function open(callback) { - var self = this; - - // Clean the database before loading. The only - // reason for using an on-disk db for the mempool - // is not for persistence, but to keep ~300mb of - // txs out of main memory. - this.db.destroy(function(err) { - if (err) - return callback(err); - - self.db.open(function(err) { - if (err) - return callback(err); - - self.initialMemoryUsage(function(err) { - if (err) - return callback(err); - - self.chain.open(callback); - }); - }); - }); + this.chain.open(callback); }; /** @@ -153,7 +132,7 @@ Mempool.prototype._open = function open(callback) { */ Mempool.prototype._close = function destroy(callback) { - this.db.close(callback); + callback(); }; /** @@ -166,30 +145,6 @@ Mempool.prototype._lock = function _lock(func, args, force) { return this.locker.lock(func, args, force); }; -/** - * Tally up total memory usage from database. - * @param {Function} callback - Returns [Error, Number]. - */ - -Mempool.prototype.initialMemoryUsage = function initialMemoryUsage(callback) { - var self = this; - var i, tx; - - this.getHistory(function(err, txs) { - if (err) - return callback(err); - - for (i = 0; i < txs.length; i++) { - tx = txs[i]; - self.size += self.memUsage(tx); - self.spent += tx.inputs.length; - self.total++; - } - - return callback(); - }); -}; - /** * Notify the mempool that a new block has come * in (removes all transactions contained in the @@ -201,41 +156,39 @@ Mempool.prototype.initialMemoryUsage = function initialMemoryUsage(callback) { Mempool.prototype.addBlock = function addBlock(block, callback, force) { var self = this; var unlock = this._lock(addBlock, [block, callback], force); - var entries; + var len, entries, entry; if (!unlock) return; callback = utils.wrap(callback, unlock); + len = block.txs.length - 1; entries = []; utils.forRangeSerial(0, block.txs.length, function(i, next) { - var tx = block.txs[block.txs.length - 1 - i]; + var tx = block.txs[len--]; var hash = tx.hash('hex'); if (tx.isCoinbase()) return next(); - self.getEntry(hash, function(err, entry) { + entry = self.getEntry(hash); + + if (!entry) { + self.removeOrphan(hash); + return next(); + } + + self.removeUnchecked(entry, false, function(err) { if (err) return next(err); - if (!entry) { - self.removeOrphan(hash); - return next(); - } + self.emit('confirmed', tx, block); - self.removeUnchecked(entry, false, function(err) { - if (err) - return next(err); + entries.push(entry); - self.emit('confirmed', tx, block); - - entries.push(entry); - - return next(); - }, true); - }); + return next(); + }, true); }, function(err) { if (err) return callback(err); @@ -274,24 +227,19 @@ Mempool.prototype.removeBlock = function removeBlock(block, callback, force) { if (tx.isCoinbase()) return next(); - self.hasTX(hash, function(err, result) { + if (self.hasTX(hash)) + return next(); + + entry = MempoolEntry.fromTX(tx, block.height); + + self.addUnchecked(entry, function(err) { if (err) return next(err); - if (result) - return next(); + self.emit('unconfirmed', tx, block); - entry = MempoolEntry.fromTX(tx, block.height); - - self.addUnchecked(entry, function(err) { - if (err) - return next(err); - - self.emit('unconfirmed', tx, block); - - return next(); - }, true); - }); + return next(); + }, true); }, callback); }; @@ -304,59 +252,49 @@ Mempool.prototype.removeBlock = function removeBlock(block, callback, force) { Mempool.prototype.limitMempoolSize = function limitMempoolSize(entryHash, callback) { var self = this; var trimmed = false; + var end, entries, hashes, entry; if (this.getSize() <= this.maxSize) return callback(null, trimmed); - this.getRange({ - start: 0, - end: utils.now() - constants.mempool.MEMPOOL_EXPIRY - }, function(err, entries) { + end = utils.now() - constants.mempool.MEMPOOL_EXPIRY; + entries = this.getRange(0, end); + + utils.forEachSerial(entries, function(entry, next) { + if (self.getSize() <= self.maxSize) + return callback(null, trimmed); + + if (!trimmed) + trimmed = entry.tx.hash('hex') === entryHash; + + self.removeUnchecked(entry, true, next, true); + }, function(err) { if (err) return callback(err); - utils.forEachSerial(entries, function(entry, next) { + if (self.getSize() <= self.maxSize) + return callback(null, trimmed); + + hashes = self.getSnapshot(); + + utils.forEachSerial(hashes, function(hash, next) { if (self.getSize() <= self.maxSize) return callback(null, trimmed); + entry = self.getEntry(hash); + + if (!entry) + return next(); + if (!trimmed) - trimmed = entry.tx.hash('hex') === entryHash; + trimmed = hash === entryHash; self.removeUnchecked(entry, true, next, true); }, function(err) { if (err) return callback(err); - if (self.getSize() <= self.maxSize) - return callback(null, trimmed); - - self.getSnapshot(function(err, hashes) { - if (err) - return callback(err); - - utils.forEachSerial(hashes, function(hash, next) { - if (self.getSize() <= self.maxSize) - return callback(null, trimmed); - - self.getEntry(hash, function(err, entry) { - if (err) - return next(err); - - if (!entry) - return next(); - - if (!trimmed) - trimmed = hash === entryHash; - - self.removeUnchecked(entry, true, next, true); - }); - }, function(err) { - if (err) - return callback(err); - - return callback(null, trimmed); - }); - }); + return callback(null, trimmed); }); }); }; @@ -388,9 +326,21 @@ Mempool.prototype.limitOrphans = function limitOrphans() { */ Mempool.prototype.getTX = function getTX(hash, callback) { - return this.db.fetch('t/' + hash, function(data) { - return bcoin.tx.fromRaw(data); - }, callback); + var data = this.tx[hash]; + var tx; + + if (!data) + return; + + try { + tx = bcoin.tx.fromRaw(data); + } catch (e) { + delete this.tx[hash]; + this.logger.warning('Possible memory corruption.'); + return; + } + + return tx; }; /** @@ -401,9 +351,21 @@ Mempool.prototype.getTX = function getTX(hash, callback) { */ Mempool.prototype.getEntry = function getEntry(hash, callback) { - return this.db.fetch('t/' + hash, function(data) { - return MempoolEntry.fromRaw(data); - }, callback); + var data = this.tx[hash]; + var tx; + + if (!data) + return; + + try { + tx = MempoolEntry.fromRaw(data); + } catch (e) { + delete this.tx[hash]; + this.logger.warning('Possible memory corruption.'); + return; + } + + return tx; }; /** @@ -414,12 +376,24 @@ Mempool.prototype.getEntry = function getEntry(hash, callback) { */ Mempool.prototype.getCoin = function getCoin(hash, index, callback) { - return this.db.fetch('c/' + hash + '/' + index, function(data) { - var coin = bcoin.coin.fromRaw(data); + var key = hash + index; + var data = this.coins[key]; + var coin; + + if (!data) + return; + + try { + coin = bcoin.coin.fromRaw(data); coin.hash = hash; coin.index = index; - return coin; - }, callback); + } catch (e) { + delete this.coins[key]; + this.logger.warning('Possible memory corruption.'); + return; + } + + return coin; }; /** @@ -433,10 +407,22 @@ Mempool.prototype.getCoin = function getCoin(hash, index, callback) { */ Mempool.prototype.isSpent = function isSpent(hash, index, callback) { - return this.db.fetch('s/' + hash + '/' + index, function(data) { - assert(data.length === 32, 'Database corruption.'); - return data.toString('hex'); - }, callback); + var key = hash + index; + var data = this.spents[key]; + var spender; + + if (!data) + return; + + try { + spender = bcoin.outpoint.fromRaw(data); + } catch (e) { + delete this.spents[key]; + this.logger.warning('Possible memory corruption.'); + return; + } + + return spender; }; /** @@ -445,39 +431,25 @@ Mempool.prototype.isSpent = function isSpent(hash, index, callback) { * @param {Function} callback - Returns [Error, {@link Coin}[]]. */ -Mempool.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, callback) { - var self = this; +Mempool.prototype.getCoinsByAddress = function getCoinsByAddress(addresses) { var coins = []; + var i, j, coin, hash; if (!Array.isArray(addresses)) addresses = [addresses]; - utils.forEachSerial(addresses, function(address, next) { - address = bcoin.address.getHash(address, 'hex'); + for (i = 0; i < addresses.length; i++) { + hash = bcoin.address.getHash(addresses[i], 'hex'); + if (!hash) + continue; - if (!address) - return next(); + coin = this.coinIndex.searchCoin(hash); - self.db.lookup({ - gte: 'C/' + address, - lte: 'C/' + address + '~', - transform: function(key) { - key = key.split('/'); - return 'c/' + key[2] + '/' + key[3]; - }, - parse: function(data, key) { - var coin = bcoin.coin.fromRaw(data); - key = key.split('/'); - coin.hash = key[1]; - coin.index = +key[2]; - coins.push(coin); - } - }, next); - }, function(err) { - if (err) - return callback(err); - return callback(null, coins); - }); + for (j = 0; j < coin.length; j++) + coins.push(coin[j]); + } + + return coins; }; /** @@ -486,42 +458,25 @@ Mempool.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, call * @param {Function} callback - Returns [Error, {@link TX}[]]. */ -Mempool.prototype.getTXByAddress = function getTXByAddress(addresses, callback) { - var self = this; +Mempool.prototype.getTXByAddress = function getTXByAddress(addresses) { var txs = []; - var have = {}; + var i, j, tx, hash; if (!Array.isArray(addresses)) addresses = [addresses]; - utils.forEachSerial(addresses, function(address, next) { - address = bcoin.address.getHash(address, 'hex'); + for (i = 0; i < addresses.length; i++) { + hash = bcoin.address.getHash(addresses[i], 'hex'); + if (!hash) + continue; - if (!address) - return next(); + tx = this.txIndex.searchTX(hash); - self.db.lookup({ - gte: 'T/' + address, - lte: 'T/' + address + '~', - transform: function(key) { - var hash = key.split('/')[2]; + for (j = 0; j < tx.length; j++) + txs.push(tx[j]); + } - if (have[hash]) - return; - - have[hash] = true; - - return 't/' + hash; - }, - parse: function(data, key) { - txs.push(bcoin.tx.fromRaw(data)); - } - }, next); - }, function(err) { - if (err) - return callback(err); - return callback(null, txs); - }); + return txs; }; /** @@ -533,32 +488,26 @@ Mempool.prototype.getTXByAddress = function getTXByAddress(addresses, callback) * @param {Function} callback - Returns [Error, {@link TX}]. */ -Mempool.prototype.fillHistory = function fillHistory(tx, callback) { - var self = this; +Mempool.prototype.fillHistory = function fillHistory(tx) { + var i, input, prevout, prev; - if (tx.isCoinbase()) { - callback = utils.asyncify(callback); - return callback(null, tx); - } + if (tx.isCoinbase()) + return; + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; - utils.forEachSerial(tx.inputs, function(input, next) { if (input.coin) - return next(); + continue; - self.getTX(input.prevout.hash, function(err, tx) { - if (err) - return next(err); + prevout = input.prevout; + prev = this.getTX(prevout.hash); - if (tx) - input.coin = bcoin.coin.fromTX(tx, input.prevout.index); + if (!prev) + continue; - next(); - }); - }, function(err) { - if (err) - return callback(err); - return callback(null, tx); - }); + input.coin = bcoin.coin.fromTX(prev, prevout.index); + } }; /** @@ -568,32 +517,26 @@ Mempool.prototype.fillHistory = function fillHistory(tx, callback) { * @param {Function} callback - Returns [Error, {@link TX}]. */ -Mempool.prototype.fillCoins = function fillCoins(tx, callback) { - var self = this; +Mempool.prototype.fillCoins = function fillCoins(tx) { + var i, input, prevout, coin; - if (tx.isCoinbase()) { - callback = utils.asyncify(callback); - return callback(null, tx); - } + if (tx.isCoinbase()) + return; + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; - utils.forEachSerial(tx.inputs, function(input, next) { if (input.coin) - return next(); + continue; - self.getCoin(input.prevout.hash, input.prevout.index, function(err, coin) { - if (err) - return callback(err); + prevout = input.prevout; + coin = this.getCoin(prevout.hash, prevout.index); - if (coin) - input.coin = coin; + if (!coin) + continue; - next(); - }); - }, function(err) { - if (err) - return callback(err); - return callback(null, tx); - }); + input.coin = coin; + } }; /** @@ -603,7 +546,7 @@ Mempool.prototype.fillCoins = function fillCoins(tx, callback) { */ Mempool.prototype.hasTX = function hasTX(hash, callback) { - return this.db.has('t/' + hash, callback); + return this.tx[hash] != null; }; /** @@ -612,19 +555,23 @@ Mempool.prototype.hasTX = function hasTX(hash, callback) { * @param {Function} callback - Returns [Error, {@link TX}[]]. */ -Mempool.prototype.getRange = function getRange(options, callback) { - return this.db.lookup({ - gte: 'm/' + pad32(options.start) + '/', - lte: 'm/' + pad32(options.end) + '/~', - transform: function(key) { - return 't/' + key.split('/')[2]; - }, - parse: function(data, key) { - return MempoolEntry.fromRaw(data); - }, - limit: options.limit, - reverse: options.reverse - }, callback); +Mempool.prototype.getRange = function getRange(start, end, callback) { + var items = this.time.range(start, end); + var entries = []; + var i, item, hash, entry; + + for (i = 0; i < items.length; i++) { + item = items[i]; + hash = item.value.toString('hex'); + entry = this.getEntry(hash); + if (!entry) { + this.time.remove(item.key); + continue; + } + entries.push(entry); + } + + return entries; }; /** @@ -633,14 +580,14 @@ Mempool.prototype.getRange = function getRange(options, callback) { * @param {Function} callback - Returns [Error, Boolean]. */ -Mempool.prototype.has = function has(hash, callback) { +Mempool.prototype.has = function has(hash) { if (this.locker.hasPending(hash)) - return utils.asyncify(callback)(null, true); + return true; if (this.hasOrphan(hash)) - return utils.asyncify(callback)(null, true); + return true; - return this.hasTX(hash, callback); + return this.hasTX(hash); }; /** @@ -728,74 +675,64 @@ Mempool.prototype.addTX = function addTX(tx, callback, force) { 0)); } - self.has(hash, function(err, exists) { + if (self.has(hash)) { + return callback(new VerifyError(tx, + 'alreadyknown', + 'txn-already-in-mempool', + 0)); + } + + self.chain.db.hasCoins(hash, function(err, exists) { if (err) return callback(err); if (exists) { return callback(new VerifyError(tx, 'alreadyknown', - 'txn-already-in-mempool', + 'txn-already-known', 0)); } - self.chain.db.hasCoins(hash, function(err, exists) { + if (self.isDoubleSpend(tx)) { + return callback(new VerifyError(tx, + 'duplicate', + 'bad-txns-inputs-spent', + 0)); + } + + self.fillAllCoins(tx, function(err) { if (err) return callback(err); - if (exists) { - return callback(new VerifyError(tx, - 'alreadyknown', - 'txn-already-known', - 0)); + if (!tx.hasCoins()) { + self.storeOrphan(tx); + return callback(); } - self.isDoubleSpend(tx, function(err, doubleSpend) { + entry = MempoolEntry.fromTX(tx, self.chain.height); + + self.verify(entry, function(err) { if (err) return callback(err); - if (doubleSpend) { - return callback(new VerifyError(tx, - 'duplicate', - 'bad-txns-inputs-spent', - 0)); - } - - self.fillAllCoins(tx, function(err) { + self.addUnchecked(entry, function(err) { if (err) return callback(err); - if (!tx.hasCoins()) { - self.storeOrphan(tx); - return callback(); - } - - entry = MempoolEntry.fromTX(tx, self.chain.height); - - self.verify(entry, function(err) { + self.limitMempoolSize(hash, function(err, trimmed) { if (err) return callback(err); - self.addUnchecked(entry, function(err) { - if (err) - return callback(err); + if (trimmed) { + return callback(new VerifyError(tx, + 'insufficientfee', + 'mempool full', + 0)); + } - self.limitMempoolSize(hash, function(err, trimmed) { - if (err) - return callback(err); - - if (trimmed) { - return callback(new VerifyError(tx, - 'insufficientfee', - 'mempool full', - 0)); - } - - return callback(); - }); - }, true); + return callback(); }); - }); + }, true); }); }); }); @@ -823,48 +760,45 @@ Mempool.prototype.addUnchecked = function addUnchecked(entry, callback, force) { callback = utils.wrap(callback, unlock); - this._addUnchecked(entry, function(err) { - if (err) - return callback(err); + this._addUnchecked(entry); - self.spent += entry.tx.inputs.length; - self.size += self.memUsage(entry.tx); - self.total++; - self.emit('tx', entry.tx); - self.emit('add tx', entry.tx); + this.spent += entry.tx.inputs.length; + this.size += this.memUsage(entry.tx); + this.total++; + this.emit('tx', entry.tx); + this.emit('add tx', entry.tx); - if (self.fees) - self.fees.processTX(entry, self.chain.isFull()); + if (this.fees) + this.fees.processTX(entry, this.chain.isFull()); - self.logger.debug('Added tx %s to mempool.', entry.tx.rhash); + this.logger.debug('Added tx %s to mempool.', entry.tx.rhash); - resolved = self.resolveOrphans(entry.tx); + resolved = this.resolveOrphans(entry.tx); - utils.forEachSerial(resolved, function(tx, next) { - var entry = MempoolEntry.fromTX(tx, self.chain.height); - self.verify(entry, function(err) { + utils.forEachSerial(resolved, function(tx, next) { + var entry = MempoolEntry.fromTX(tx, self.chain.height); + self.verify(entry, function(err) { + if (err) { + if (err.type === 'VerifyError') { + self.logger.debug('Could not resolve orphan %s: %s.', + tx.rhash, + err.message); + self.emit('bad orphan', tx, entry); + return next(); + } + self.emit('error', err); + return next(); + } + self.addUnchecked(entry, function(err) { if (err) { - if (err.type === 'VerifyError') { - self.logger.debug('Could not resolve orphan %s: %s.', - tx.rhash, - err.message); - self.emit('bad orphan', tx, entry); - return next(); - } self.emit('error', err); return next(); } - self.addUnchecked(entry, function(err) { - if (err) { - self.emit('error', err); - return next(); - } - self.logger.spam('Resolved orphan %s in mempool.', entry.tx.rhash); - next(); - }, true); - }); - }, callback); - }); + self.logger.spam('Resolved orphan %s in mempool.', entry.tx.rhash); + next(); + }, true); + }); + }, callback); }; /** @@ -972,7 +906,7 @@ Mempool.prototype.verify = function verify(entry, callback) { var mandatory = constants.flags.MANDATORY_VERIFY_FLAGS; var tx = entry.tx; var ret = {}; - var fee, modFee, now, size, rejectFee, minRelayFee, minRate; + var fee, modFee, now, size, rejectFee, minRelayFee, minRate, count; if (this.chain.state.hasWitness()) mandatory |= constants.flags.VERIFY_WITNESS; @@ -1058,46 +992,43 @@ Mempool.prototype.verify = function verify(entry, callback) { if (self.rejectAbsurdFees && fee > minRelayFee * 10000) return callback(new VerifyError(tx, 'highfee', 'absurdly-high-fee', 0)); - self.countAncestors(tx, function(err, count) { + count = self.countAncestors(tx); + + if (count > constants.mempool.ANCESTOR_LIMIT) { + return callback(new VerifyError(tx, + 'nonstandard', + 'too-long-mempool-chain', + 0)); + } + + if (!tx.checkInputs(height, ret)) + return callback(new VerifyError(tx, 'invalid', ret.reason, ret.score)); + + // Do this in the worker pool. + tx.verifyAsync(flags, function(err, result) { if (err) return callback(err); - if (count > constants.mempool.ANCESTOR_LIMIT) { - return callback(new VerifyError(tx, - 'nonstandard', - 'too-long-mempool-chain', - 0)); - } - - if (!tx.checkInputs(height, ret)) - return callback(new VerifyError(tx, 'invalid', ret.reason, ret.score)); - - // Do this in the worker pool. - tx.verifyAsync(flags, function(err, result) { - if (err) - return callback(err); - - if (!result) { - return tx.verifyAsync(mandatory, function(err, result) { - if (err) - return callback(err); - - if (result) { - return callback(new VerifyError(tx, - 'nonstandard', - 'non-mandatory-script-verify-flag', - 0)); - } + if (!result) { + return tx.verifyAsync(mandatory, function(err, result) { + if (err) + return callback(err); + if (result) { return callback(new VerifyError(tx, 'nonstandard', - 'mandatory-script-verify-flag', + 'non-mandatory-script-verify-flag', 0)); - }); - } + } - return callback(); - }); + return callback(new VerifyError(tx, + 'nonstandard', + 'mandatory-script-verify-flag', + 0)); + }); + } + + return callback(); }); }); }; @@ -1109,39 +1040,22 @@ Mempool.prototype.verify = function verify(entry, callback) { * @param {Function} callback - Returns [Error, Number]. */ -Mempool.prototype.countAncestors = function countAncestors(tx, callback) { - var self = this; +Mempool.prototype.countAncestors = function countAncestors(tx) { var max = 0; + var i, input, count, prev; - utils.forEachSerial(tx.inputs, function(input, next) { - var count = 0; - self.getTX(input.prevout.hash, function(err, tx) { - if (err) - return next(err); + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + prev = this.getTX(input.prevout.hash); + if (!prev) + continue; + count = 1; + count += this.countAncestors(prev); + if (count > max) + max = count; + } - if (!tx) - return next(); - - count += 1; - - self.countAncestors(tx, function(err, prev) { - if (err) - return next(err); - - count += prev; - - if (count > max) - max = count; - - next(); - }); - }); - }, function(err) { - if (err) - return callback(err); - - return callback(null, max); - }); + return max; }; /** @@ -1194,30 +1108,17 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx) { */ Mempool.prototype.getBalance = function getBalance(callback) { + var keys = Object.keys(this.coins); var total = 0; - var i; + var i, key, data; - return this.db.iterate({ - gte: 'c', - lte: 'c~', - values: true, - parse: function(data, key) { - assert(data.length >= 16); - return utils.read64N(data, 8); - } - }, function(err, coins) { - if (err) - return callback(err); + for (i = 0; i < keys.length; i++) { + key = keys[i]; + data = this.coins[key]; + total += utils.read64N(data, 8); + } - for (i = 0; i < coins.length; i++) - total += coins[i]; - - return callback(null, { - confirmed: 0, - unconfirmed: total, - total: total - }); - }); + return total; }; /** @@ -1226,14 +1127,19 @@ Mempool.prototype.getBalance = function getBalance(callback) { */ Mempool.prototype.getHistory = function getHistory(callback) { - return this.db.iterate({ - gte: 't', - lte: 't~', - values: true, - parse: function(data, key) { - return bcoin.tx.fromRaw(data); - } - }, callback); + var keys = Object.keys(this.tx); + var txs = []; + var i, key, tx; + + for (i = 0; i < keys.length; i++) { + key = keys[i]; + tx = this.getTX(key); + if (!tx) + continue; + txs.push(tx); + } + + return txs; }; /** @@ -1243,13 +1149,14 @@ Mempool.prototype.getHistory = function getHistory(callback) { */ Mempool.prototype.getOrphan = function getOrphan(hash) { - var orphan = this.orphans[hash]; + var data = this.orphans[hash]; + var orphan; - if (!orphan) + if (!data) return; try { - orphan = bcoin.tx.fromExtended(orphan, true); + orphan = bcoin.tx.fromExtended(data, true); } catch (e) { delete this.orphans[hash]; this.logger.warning('%s %s', @@ -1365,17 +1272,12 @@ Mempool.prototype.removeOrphan = function removeOrphan(tx) { */ Mempool.prototype.fillAllHistory = function fillAllHistory(tx, callback) { - var self = this; + this.fillHistory(tx); - this.fillHistory(tx, function(err) { - if (err) - return callback(err); + if (tx.hasCoins()) + return callback(null, tx); - if (tx.hasCoins()) - return callback(null, tx); - - self.chain.db.fillCoins(tx, callback); - }); + this.chain.db.fillCoins(tx, callback); }; /** @@ -1391,44 +1293,36 @@ Mempool.prototype.fillAllCoins = function fillAllCoins(tx, callback) { var self = this; var doubleSpend = false; - this.fillCoins(tx, function(err) { + this.fillCoins(tx); + + if (tx.hasCoins()) + return callback(null, tx); + + utils.forEachSerial(tx.inputs, function(input, next) { + var hash = input.prevout.hash; + var index = input.prevout.index; + + if (self.isSpent(hash, index)) { + doubleSpend = true; + return next(); + } + + self.chain.db.getCoin(hash, index, function(err, coin) { + if (err) + return next(err); + + if (!coin) + return next(); + + input.coin = coin; + + next(); + }); + }, function(err) { if (err) return callback(err); - if (tx.hasCoins()) - return callback(null, tx); - - utils.forEachSerial(tx.inputs, function(input, next) { - var hash = input.prevout.hash; - var index = input.prevout.index; - - self.isSpent(hash, index, function(err, spent) { - if (err) - return callback(err); - - if (spent) { - doubleSpend = true; - return next(); - } - - self.chain.db.getCoin(hash, index, function(err, coin) { - if (err) - return next(err); - - if (!coin) - return next(); - - input.coin = coin; - - next(); - }); - }); - }, function(err) { - if (err) - return callback(err); - - return callback(null, tx, doubleSpend); - }); + return callback(null, tx, doubleSpend); }); }; @@ -1438,14 +1332,8 @@ Mempool.prototype.fillAllCoins = function fillAllCoins(tx, callback) { * @param {Function} callback - Returns [Error, {@link Hash}[]]. */ -Mempool.prototype.getSnapshot = function getSnapshot(callback) { - return this.db.iterate({ - gte: 't', - lte: 't~', - transform: function(key) { - return key.split('/')[1]; - } - }, callback); +Mempool.prototype.getSnapshot = function getSnapshot() { + return Object.keys(this.tx); }; /** @@ -1469,19 +1357,17 @@ Mempool.prototype.checkLocks = function checkLocks(tx, flags, callback) { * @param {Function} callback - Returns [Error, Boolean]. */ -Mempool.prototype.isDoubleSpend = function isDoubleSpend(tx, callback) { - var self = this; - utils.everySerial(tx.inputs, function(input, next) { - self.isSpent(input.prevout.hash, input.prevout.index, function(err, spent) { - if (err) - return next(err); - return next(null, !spent); - }); - }, function(err, result) { - if (err) - return callback(err); - return callback(null, !result); - }); +Mempool.prototype.isDoubleSpend = function isDoubleSpend(tx) { + var i, input, prevout; + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + prevout = input.prevout; + if (this.isSpent(prevout.hash, prevout.index)) + return true; + } + + return false; }; /** @@ -1493,67 +1379,43 @@ Mempool.prototype.isDoubleSpend = function isDoubleSpend(tx, callback) { Mempool.prototype.getConfidence = function getConfidence(hash, callback) { var self = this; + var tx; callback = utils.asyncify(callback); - function getTX(callback) { - if (hash instanceof bcoin.tx) - return callback(null, hash, hash.hash('hex')); - - return self.getTX(hash, function(err, tx) { - if (err) - return callback(err); - return callback(null, tx, hash); - }); + if (hash instanceof bcoin.tx) { + tx = hash; + hash = hash.hash('hex'); + } else { + tx = self.getTX(hash); } - function isDoubleSpend(tx, callback) { - if (tx) - return self.isDoubleSpend(tx, callback); - return callback(null, false); - } + if (self.hasTX(hash)) + return callback(null, constants.confidence.PENDING); - return getTX(function(err, tx, hash) { - if (err) - return callback(err); + if (tx && self.isDoubleSpend(tx)) + return callback(null, constants.confidence.INCONFLICT); - return self.hasTX(hash, function(err, result) { + if (tx && tx.block) { + return self.chain.db.isMainChain(tx.block, function(err, result) { if (err) return callback(err); if (result) - return callback(null, constants.confidence.PENDING); + return callback(null, constants.confidence.BUILDING); - return isDoubleSpend(tx, function(err, result) { - if (err) - return callback(err); - - if (result) - return callback(null, constants.confidence.INCONFLICT); - - if (tx && tx.block) { - return self.chain.db.isMainChain(tx.block, function(err, result) { - if (err) - return callback(err); - - if (result) - return callback(null, constants.confidence.BUILDING); - - return callback(null, constants.confidence.DEAD); - }); - } - - return self.chain.db.hasCoins(hash, function(err, existing) { - if (err) - return callback(err); - - if (existing) - return callback(null, constants.confidence.BUILDING); - - return callback(null, constants.confidence.UNKNOWN); - }); - }); + return callback(null, constants.confidence.DEAD); }); + } + + self.chain.db.hasCoins(hash, function(err, existing) { + if (err) + return callback(err); + + if (existing) + return callback(null, constants.confidence.BUILDING); + + return callback(null, constants.confidence.UNKNOWN); }); }; @@ -1564,60 +1426,49 @@ Mempool.prototype.getConfidence = function getConfidence(hash, callback) { * @param {Function} callback */ -Mempool.prototype._addUnchecked = function _addUnchecked(entry, callback) { +Mempool.prototype._addUnchecked = function _addUnchecked(entry) { var tx = entry.tx; var hash = tx.hash('hex'); - var i, addresses, address, input, output, key, coin, batch; + var i, input, output, key, coin, spender; - batch = this.db.batch(); + this.tx[hash] = entry.toRaw(); + this.time.insert(entry.ts, hash); - batch.put('t/' + hash, entry.toRaw()); - batch.put('m/' + pad32(entry.ts) + '/' + hash, DUMMY); - - if (this.options.indexAddress) { - addresses = tx.getHashes('hex'); - for (i = 0; i < addresses.length; i++) - batch.put('T/' + addresses[i] + '/' + hash, DUMMY); - } + if (this.options.indexAddress) + this.indexTX.addTX(tx); for (i = 0; i < tx.inputs.length; i++) { input = tx.inputs[i]; - key = input.prevout.hash + '/' + input.prevout.index; + key = input.prevout.hash + input.prevout.index; if (tx.isCoinbase()) break; assert(input.coin); - if (this.options.indexAddress) { - address = input.getHash('hex'); - if (address) - batch.del('C/' + address + '/' + key); - } + if (this.options.indexAddress) + this.coinIndex.removeCoin(input.coin); - batch.del('c/' + key); - batch.put('s/' + key, tx.hash()); + spender = bcoin.outpoint.fromTX(tx, i).toRaw(); + + delete this.coins[key]; + this.spents[key] = spender; } for (i = 0; i < tx.outputs.length; i++) { output = tx.outputs[i]; - key = hash + '/' + i; + key = hash + i; if (output.script.isUnspendable()) continue; - if (this.options.indexAddress) { - address = output.getHash('hex'); - if (address) - batch.put('C/' + address + '/' + key, DUMMY); - } + coin = bcoin.coin.fromTX(tx, i); - coin = bcoin.coin.fromTX(tx, i).toRaw(); + if (this.options.indexAddress) + this.coinIndex.addCoin(coin); - batch.put('c/' + key, coin); + this.coins[key] = coin.toRaw(); } - - return batch.write(callback); }; /** @@ -1647,30 +1498,26 @@ Mempool.prototype._removeUnchecked = function _removeUnchecked(entry, limit, cal var self = this; var tx = entry.tx; var hash = tx.hash('hex'); - var batch = this.db.batch(); - var i, addresses, input, output, key, address; + var i, input, output, key, coin; this._removeSpenders(entry, limit, function(err) { if (err) return callback(err); - batch.del('t/' + hash); - batch.del('m/' + pad32(entry.ts) + '/' + hash); + delete self.tx[hash]; + self.time.remove(entry.ts); - if (self.options.indexAddress) { - addresses = tx.getHashes('hex'); - for (i = 0; i < addresses.length; i++) - batch.del('T/' + addresses[i] + '/' + hash); - } + if (self.options.indexAddress) + self.txIndex.addTX(tx); for (i = 0; i < tx.inputs.length; i++) { input = tx.inputs[i]; - key = input.prevout.hash + '/' + input.prevout.index; + key = input.prevout.hash + input.prevout.index; if (tx.isCoinbase()) break; - batch.del('s/' + key); + delete self.spents[key]; // We only disconnect inputs if this // is a limited transaction. For block @@ -1685,32 +1532,28 @@ Mempool.prototype._removeUnchecked = function _removeUnchecked(entry, limit, cal if (input.coin.height !== -1) continue; - if (self.options.indexAddress) { - address = input.getHash('hex'); - if (address) - batch.put('C/' + address + '/' + key, DUMMY); - } + if (self.options.indexAddress) + self.coinIndex.removeCoin(input.coin); - batch.put('c/' + key, input.coin.toRaw()); + self.coins[key] = input.coin.toRaw(); } for (i = 0; i < tx.outputs.length; i++) { output = tx.outputs[i]; - key = hash + '/' + i; + key = hash + i; if (output.script.isUnspendable()) continue; if (self.options.indexAddress) { - address = output.getHash('hex'); - if (address) - batch.del('C/' + address + '/' + key); + coin = bcoin.coin.fromTX(tx, i); + self.coinIndex.removeCoin(coin); } - batch.del('c/' + key); + delete self.coins[key]; } - return batch.write(callback); + return callback(); }); }; @@ -1725,7 +1568,7 @@ Mempool.prototype._removeUnchecked = function _removeUnchecked(entry, limit, cal Mempool.prototype._removeSpenders = function _removeSpenders(entry, limit, callback) { var self = this; var tx = entry.tx; - var hash; + var hash, spender; // We do not remove spenders if this is // being removed for a block. The spenders @@ -1737,23 +1580,17 @@ Mempool.prototype._removeSpenders = function _removeSpenders(entry, limit, callb hash = tx.hash('hex'); utils.forEachSerial(tx.outputs, function(output, next, i) { - self.isSpent(hash, i, function(err, spender) { - if (err) - return next(err); + spender = self.isSpent(hash, i); - if (!spender) - return next(); + if (!spender) + return next(); - self.getEntry(spender, function(err, entry) { - if (err) - return next(err); + entry = self.getEntry(spender.hash); - if (!entry) - return next(); + if (!entry) + return next(); - self.removeUnchecked(entry, limit, next, true); - }); - }); + self.removeUnchecked(entry, limit, next, true); }, callback); }; @@ -2112,6 +1949,120 @@ function mallocUsage(alloc) { return ((alloc + 15) >>> 3) << 3; } +/** + * Address Index + */ + +function AddressIndex(mempool) { + this.mempool = mempool; + this.tree = new RBT(); +} + +AddressIndex.prototype.search = function(hash) { + return this.tree.range(hash + '/', hash + '/~'); +}; + +AddressIndex.prototype.set = function set(hash, postfix, value) { + return this.tree.insert(hash + '/' + postfix, value); +}; + +AddressIndex.prototype.remove = function remove(hash, postfix) { + return this.tree.remove(hash + '/' + postfix); +}; + +AddressIndex.prototype.searchCoin = function searchCoin(address) { + var items = this.search(address); + var out = []; + var i, item, outpoint, coin; + + for (i = 0; i < items.length; i++) { + item = items[i]; + + try { + outpoint = bcoin.outpoint.fromRaw(item.value); + } catch (e) { + this.tree.remove(item.key); + continue; + } + + coin = this.mempool.getCoin(outpoint.hash, outpoint.index); + + if (!coin) { + this.tree.remove(item.key); + continue; + } + + out.push(coin); + } + + return out; +}; + +AddressIndex.prototype.searchTX = function searchTX(address) { + var items = this.search(address); + var out = []; + var i, item, hash, tx; + + for (i = 0; i < items.length; i++) { + item = items[i]; + hash = item.value.toString('hex'); + tx = this.mempool.getEntry(hash); + + if (!tx) { + this.tree.remove(item.key); + continue; + } + + out.push(tx); + } + + return out; +}; + +AddressIndex.prototype.addTX = function addTX(tx) { + var hashes = tx.getHashes('hex'); + var i, hash; + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + this.set(hash, tx.hash('hex'), tx.hash()); + } +}; + +AddressIndex.prototype.removeTX = function removeTX(tx) { + var hashes = tx.getHashes('hex'); + var i, hash; + + for (i = 0; i < hashes.length; i++) { + hash = hashes[i]; + this.remove(hash, tx.hash('hex')); + } +}; + +AddressIndex.prototype.addCoin = function addCoin(coin) { + var hash = coin.getHash('hex'); + var outpoint; + + if (!hash) + return; + + outpoint = bcoin.outpoint(coin.hash, coin.index); + this.set(hash, coin.hash + coin.index, outpoint.toRaw()); +}; + +AddressIndex.prototype.removeCoin = function removeCoin(coin) { + var hash = coin.getHash('hex'); + + if (!hash) + return; + + this.remove(hash, coin.hash + coin.index); +}; + +function timeCmp(a, b) { + return a - b; +} + /* * Expose */ diff --git a/lib/bcoin/peer.js b/lib/bcoin/peer.js index 3e77d3e8..17be2078 100644 --- a/lib/bcoin/peer.js +++ b/lib/bcoin/peer.js @@ -1124,7 +1124,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { if (!payload.mempool) return callback(); - self.mempool.getCoin(hash, index, callback); + callback(null, self.mempool.getCoin(hash, index)); } function isSpent(hash, index, callback) { @@ -1134,7 +1134,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) { if (!payload.mempool) return callback(null, false); - self.mempool.isSpent(hash, index, callback); + callback(null, self.mempool.isSpent(hash, index)); } if (payload.prevout.length > 15) @@ -1427,7 +1427,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) { Peer.prototype._handleMempool = function _handleMempool() { var self = this; var items = []; - var i; + var i, hashes; var unlock = this.locker.lock(_handleMempool, []); if (!unlock) @@ -1450,18 +1450,14 @@ Peer.prototype._handleMempool = function _handleMempool() { if (this.pool.options.selfish) return done(); - this.mempool.getSnapshot(function(err, hashes) { - if (err) - return done(err); + hashes = this.mempool.getSnapshot(); - for (i = 0; i < hashes.length; i++) - items.push(new InvItem(constants.inv.TX, hashes[i])); + for (i = 0; i < hashes.length; i++) + items.push(new InvItem(constants.inv.TX, hashes[i])); - self.logger.debug('Sending mempool snapshot (%s).', self.hostname); + self.logger.debug('Sending mempool snapshot (%s).', self.hostname); - self.sendInv(items); - done(); - }); + self.sendInv(items); }; /** @@ -1494,7 +1490,7 @@ Peer.prototype._getItem = function _getItem(item, callback) { if (item.isTX()) { if (!this.mempool) return callback(); - return this.mempool.getEntry(item.hash, callback); + return callback(null, this.mempool.getEntry(item.hash)); } if (this.chain.db.options.spv) @@ -1941,6 +1937,7 @@ Peer.prototype._handleSendCmpct = function _handleSendCmpct(payload) { Peer.prototype._handleCmpctBlock = function _handleCmpctBlock(block) { var self = this; var hash = block.hash('hex'); + var result; function done(err) { if (err) { @@ -1970,31 +1967,28 @@ Peer.prototype._handleCmpctBlock = function _handleCmpctBlock(block) { // Sort of a lock too. this.compactBlocks[hash] = block; - block.fillMempool(this.mempool, function(err, result) { - if (err) - return done(err); + result = block.fillMempool(this.mempool); - if (result) { - delete self.compactBlocks[hash]; - self.emit('block', block.toBlock()); - self.logger.debug( - 'Received full compact block %s (%s).', - block.rhash, self.hostname); - return; - } + if (result) { + delete this.compactBlocks[hash]; + this.fire('block', block.toBlock()); + this.logger.debug( + 'Received full compact block %s (%s).', + block.rhash, this.hostname); + return; + } - self.write(self.framer.getBlockTxn(block.toRequest())); + this.write(this.framer.getBlockTxn(block.toRequest())); + this.logger.debug( + 'Received semi-full compact block %s (%s).', + block.rhash, this.hostname); + + block.startTimeout(10000, function() { self.logger.debug( - 'Received semi-full compact block %s (%s).', + 'Compact block timed out: %s (%s).', block.rhash, self.hostname); - - block.startTimeout(10000, function() { - self.logger.debug( - 'Compact block timed out: %s (%s).', - block.rhash, self.hostname); - delete self.compactBlocks[hash]; - }); + delete self.compactBlocks[hash]; }); }; diff --git a/lib/bcoin/pool.js b/lib/bcoin/pool.js index 04bc4042..11b09d11 100644 --- a/lib/bcoin/pool.js +++ b/lib/bcoin/pool.js @@ -1563,7 +1563,7 @@ Pool.prototype.has = function has(type, hash, force, callback) { } // Check the mempool. - return this.mempool.has(hash, check); + return check(null, this.mempool.has(hash)); } // Check the chain. diff --git a/lib/bcoin/rbt.js b/lib/bcoin/rbt.js index 44257b55..cb2100fd 100644 --- a/lib/bcoin/rbt.js +++ b/lib/bcoin/rbt.js @@ -37,6 +37,9 @@ function RBT(location, options) { if (!options) options = {}; + if (typeof options === 'function') + options = { compare: options }; + this.options = options; this.root = SENTINEL; this.compare = options.compare || utils.cmp; diff --git a/test/block-test.js b/test/block-test.js index 15424f25..ebb83a61 100644 --- a/test/block-test.js +++ b/test/block-test.js @@ -224,23 +224,21 @@ describe('Block', function() { var fakeMempool = { getSnapshot: function(callback) { - callback(null, Object.keys(map)); + return Object.keys(map); }, getTX: function(hash, callback) { - callback(null, map[hash]); + return map[hash]; } }; assert.equal(cblock.sid(block.txs[1].hash()), 125673511480291); - cblock.fillMempool(fakeMempool, function(err, result) { - assert.ifError(err); - assert(result); - for (var i = 0; i < cblock.available.length; i++) - assert(cblock.available[i]); - assert.equal(cblock.toBlock().toRaw().toString('hex'), block.toRaw().toString('hex')); - cb(); - }); + var result = cblock.fillMempool(fakeMempool); + assert(result); + for (var i = 0; i < cblock.available.length; i++) + assert(cblock.available[i]); + assert.equal(cblock.toBlock().toRaw().toString('hex'), block.toRaw().toString('hex')); + cb(); }); it('should handle half-full compact block', function(cb) { @@ -262,39 +260,37 @@ describe('Block', function() { var fakeMempool = { getSnapshot: function(callback) { - callback(null, keys); + return keys; }, getTX: function(hash, callback) { - callback(null, map[hash]); + return map[hash]; } }; assert.equal(cblock.sid(block.txs[1].hash()), 125673511480291); - cblock.fillMempool(fakeMempool, function(err, result) { - assert.ifError(err); - assert(!result); + var result = cblock.fillMempool(fakeMempool); + assert(!result); - var req = cblock.toRequest(); - assert.equal(req.hash, cblock.hash('hex')); - assert.deepEqual(req.indexes, [5, 6, 7, 8, 9]); + var req = cblock.toRequest(); + assert.equal(req.hash, cblock.hash('hex')); + assert.deepEqual(req.indexes, [5, 6, 7, 8, 9]); - req = bip152.TXRequest.fromRaw(req.toRaw()); - assert.equal(req.hash, cblock.hash('hex')); - assert.deepEqual(req.indexes, [5, 6, 7, 8, 9]); + req = bip152.TXRequest.fromRaw(req.toRaw()); + assert.equal(req.hash, cblock.hash('hex')); + assert.deepEqual(req.indexes, [5, 6, 7, 8, 9]); - var res = bip152.TXResponse.fromBlock(block, req); - res = bip152.TXResponse.fromRaw(res.toRaw()); + var res = bip152.TXResponse.fromBlock(block, req); + res = bip152.TXResponse.fromRaw(res.toRaw()); - var result = cblock.fillMissing(res); - assert(result); + var result = cblock.fillMissing(res); + assert(result); - for (var i = 0; i < cblock.available.length; i++) - assert(cblock.available[i]); + for (var i = 0; i < cblock.available.length; i++) + assert(cblock.available[i]); - assert.equal(cblock.toBlock().toRaw().toString('hex'), block.toRaw().toString('hex')); + assert.equal(cblock.toBlock().toRaw().toString('hex'), block.toRaw().toString('hex')); - cb(); - }); + cb(); }); }); diff --git a/test/mempool-test.js b/test/mempool-test.js index 16b3d355..1fceab09 100644 --- a/test/mempool-test.js +++ b/test/mempool-test.js @@ -126,41 +126,30 @@ describe('Mempool', function() { assert.ifError(err); mempool.addTX(t4, function(err) { assert.ifError(err); - mempool.getBalance(function(err, balance) { + var balance = mempool.getBalance(); + assert.equal(balance, 0); + mempool.addTX(t1, function(err) { assert.ifError(err); - assert.equal(balance.total, 0); - mempool.addTX(t1, function(err) { + var balance = mempool.getBalance(); + assert.equal(balance, 60000); + mempool.addTX(t2, function(err) { assert.ifError(err); - mempool.getBalance(function(err, balance) { + var balance = mempool.getBalance(); + assert.equal(balance, 50000); + mempool.addTX(t3, function(err) { assert.ifError(err); - assert.equal(balance.total, 60000); - mempool.addTX(t2, function(err) { + var balance = mempool.getBalance(); + assert.equal(balance, 22000); + mempool.addTX(f1, function(err) { assert.ifError(err); - mempool.getBalance(function(err, balance) { - assert.ifError(err); - assert.equal(balance.total, 50000); - mempool.addTX(t3, function(err) { - assert.ifError(err); - mempool.getBalance(function(err, balance) { - assert.ifError(err); - assert.equal(balance.total, 22000); - mempool.addTX(f1, function(err) { - assert.ifError(err); - mempool.getBalance(function(err, balance) { - assert.ifError(err); - assert.equal(balance.total, 20000); - mempool.getHistory(function(err, txs) { - assert(txs.some(function(tx) { - return tx.hash('hex') === f1.hash('hex'); - })); + var balance = mempool.getBalance(); + assert.equal(balance, 20000); + var txs = mempool.getHistory(); + assert(txs.some(function(tx) { + return tx.hash('hex') === f1.hash('hex'); + })); - cb(); - }); - }); - }); - }); - }); - }); + cb(); }); }); });