diff --git a/lib/bcoin/mempool.js b/lib/bcoin/mempool.js index b6032e0a..f1bf5d35 100644 --- a/lib/bcoin/mempool.js +++ b/lib/bcoin/mempool.js @@ -37,10 +37,14 @@ function Mempool(node, options) { this.locker = new bcoin.locker(this, this.add, 20 << 20); - this.db = null; - this.tx = null; - this.size = 0; - this.orphans = 0; + this.totalSize = 0; + this.totalOrphans = 0; + this.coins = {}; + this.txs = {}; + this.addressMap = new AddressMap; + this.orphans = {}; + this.waiting = {}; + this.spent = {}; this.freeCount = 0; this.lastTime = 0; @@ -82,48 +86,19 @@ Mempool.prototype._init = function _init() { assert(unlock); - bcoin.ldb.destroy('mempool', 'memdown', function(err) { - if (err) { - unlock(); - return self.emit('error', err); - } + this.chain.open(function(err) { + unlock(); - self.db = bcoin.ldb('mempool', { - db: 'memdown' - }); + if (err) + self.emit('error', err); - self.tx = new bcoin.txdb('m', self.db, { - indexExtra: false, - indexAddress: false, - mapAddress: false, - verify: false - }); - - self.db.open(function(err) { - if (err) { - unlock(); - return self.emit('error', err); - } - self.dynamicMemoryUsage(function(err, size) { - if (err) - self.emit('error', err); - else - self.size = size; - - self.chain.open(function(err) { - if (err) - self.emit('error', err); - unlock(); - self.loaded = true; - self.emit('open'); - }); - }); - }); + self.loaded = true; + self.emit('open'); }); }; Mempool.prototype.dynamicMemoryUsage = function dynamicMemoryUsage(callback) { - return this.db.approximateSize('m', 'm~', callback); + return utils.asyncify(callback)(null, this.totalSize); }; Mempool.prototype.open = function open(callback) { @@ -135,7 +110,7 @@ Mempool.prototype.open = function open(callback) { Mempool.prototype.close = Mempool.prototype.destroy = function destroy(callback) { - this.db.close(utils.ensure(callback)); + return utils.nextTick(utils.ensure(callback)); }; Mempool.prototype.addBlock = function addBlock(block, callback, force) { @@ -167,10 +142,10 @@ Mempool.prototype.removeBlock = function removeBlock(block, callback, force) { Mempool.prototype.limitMempoolSize = function limitMempoolSize(callback) { var self = this; - if (this.size <= Mempool.MAX_MEMPOOL_SIZE) + if (this.totalSize <= Mempool.MAX_MEMPOOL_SIZE) return callback(null, true); - this.db.getRange({ + this.getRange({ start: 0, end: utils.now() - Mempool.MEMPOOL_EXPIRY }, function(err, txs) { @@ -187,104 +162,251 @@ Mempool.prototype.limitMempoolSize = function limitMempoolSize(callback) { if (err) return callback(err); - return callback(self.size <= Mempool.MAX_MEMPOOL_SIZE); + return callback(self.totalSize <= Mempool.MAX_MEMPOOL_SIZE); }); }); }); }; +Mempool.prototype.getRange = function getRange(options, callback) { + return callback(null, []); +}; + Mempool.prototype.purgeOrphans = function purgeOrphans(callback) { var self = this; var batch = this.db.batch(); callback = utils.ensure(callback); - utils.forEachSerial(['m/D', 'm/d'], function(type, callback) { - var iter = self.db.iterator({ - gte: type, - lte: type + '~', - keys: true, - values: false, - fillCache: false, - keyAsBuffer: false - }); + this.waiting = {}; + this.totalOrphans = 0; - (function next() { - iter.next(function(err, key, value) { - if (err) { - return iter.end(function() { - callback(err); - }); - } - - if (key === undefined) - return iter.end(callback); - - batch.del(key); - - next(); - }); - })(); - }, function(err) { - if (err) - return callback(err); - - batch.write(function(err) { - if (err) - return callback(err); - - self.dynamicMemoryUsage(function(err, size) { - if (err) - return callback(err); - - self.size = size; - self.orphans = 0; - - return callback(); - }); - }); + Object.keys(this.orphans).forEach(function(key) { + self.totalSize -= self.orphans[key].length; + delete self.orphans[key]; }); + + return utils.nextTick(callback); +}; + +Mempool.prototype.getTXSync = function getTXSync(hash) { + var tx; + + if (hash instanceof bcoin.tx) + hash = hash.hash('hex'); + + tx = this.txs[hash]; + + if (!tx) + return; + + return bcoin.tx.fromExtended(tx); +}; + +Mempool.prototype.getCoinSync = function getCoinSync(hash, index) { + var key = hash + '/' + index; + var coin; + + coin = this.coins[key]; + + if (!coin) + return; + + coin = bcoin.coin.fromRaw(coin); + coin.hash = hash; + coin.index = index; + + return coin; +}; + +Mempool.prototype.isSpentSync = function isSpentSync(hash, index) { + var key = hash + '/' + index; + + return this.spent[key]; +}; + +Mempool.prototype.isDoubleSpendSync = function isDoubleSpendSync(tx) { + var i, input; + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + if (this.isSpentSync(input.prevout.hash, input.prevout.index)) + return true; + } + + return false; }; Mempool.prototype.get = Mempool.prototype.getTX = function getTX(hash, callback) { - if (hash instanceof bcoin.tx) - hash = hash.hash('hex'); - return this.tx.getTX(hash, callback); + callback = utils.asyncify(callback); + + try { + return callback(null, this.getTXSync(hash)); + } catch (e) { + return callback(e); + } }; Mempool.prototype.getCoin = function getCoin(hash, index, callback) { - return this.tx.getCoin(hash, index, callback); + callback = utils.asyncify(callback); + + try { + return callback(null, this.getCoinSync(hash, index)); + } catch (e) { + return callback(e); + } }; Mempool.prototype.isSpent = function isSpent(hash, index, callback) { - return this.tx.isSpent(hash, index, callback); + callback = utils.asyncify(callback); + + return callback(null, this.isSpentSync(hash, index)); +}; + +Mempool.prototype.isDoubleSpend = function isDoubleSpend(tx, callback) { + callback = utils.asyncify(callback); + return callback(null, this.isDoubleSpendSync(tx)); }; Mempool.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, callback) { - return this.tx.getCoinsByAddress(addresses, callback); + var uniq = {}; + var coins = []; + var i, j, address, keys, key, coin, parts, hash, index; + + callback = utils.asyncify(callback); + + if (!Array.isArray(addresses)) + addresses = [addresses]; + + addresses = utils.uniqs(addresses); + + for (i = 0; i < addresses.length; i++) { + address = addresses[i]; + keys = this.addressMap.getKeys(address); + for (j = 0; j < keys.length; j++) { + key = keys[j]; + parts = key.split('/'); + hash = parts[0]; + index = +parts[1]; + + try { + coin = this.getCoinSync(hash, index); + } catch (e) { + return callback(e); + } + + if (!coin) + continue; + + coins.push(coin); + } + } + + return callback(null, coins); }; Mempool.prototype.getByAddress = Mempool.prototype.getTXByAddress = function getTXByAddress(addresses, callback) { - return this.tx.getTXByAddress(addresses, callback); + var uniq = {}; + var txs = []; + var i, j, address, hashes, hash, tx; + + callback = utils.asyncify(callback); + + if (!Array.isArray(addresses)) + addresses = [addresses]; + + addresses = utils.uniqs(addresses); + + for (i = 0; i < addresses.length; i++) { + address = addresses[i]; + hashes = this.addressMap.getKeys(address); + for (j = 0; j < hashes.length; j++) { + hash = hashes[j]; + + if (uniq[hash]) + continue; + + try { + tx = this.getTXSync(hash); + } catch (e) { + return callback(e); + } + + if (!tx) + continue; + + uniq[hash] = true; + + txs.push(tx); + } + } + + return callback(null, txs); }; Mempool.prototype.fillTX = function fillTX(tx, callback) { - return this.tx.fillTX(tx, callback); + var i, input, tx; + + callback = utils.asyncify(callback); + + if (tx.isCoinbase()) + return callback(null, tx); + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + + if (input.coin) + continue; + + try { + tx = this.getTXSync(input.prevout.hash); + } catch (e) { + return callback(e); + } + + if (!tx) + continue; + + input.coin = bcoin.coin(tx, input.prevout.index); + } + + return callback(null, tx); }; Mempool.prototype.fillCoins = function fillCoins(tx, callback) { - return this.tx.fillCoins(tx, callback); + var input; + + callback = utils.asyncify(callback); + + if (tx.isCoinbase()) + return callback(null, tx); + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + + if (input.coin) + continue; + + try { + input.coin = this.getCoinSync(input.prevout.hash, input.prevout.index); + } catch (e) { + return callback(e); + } + } + + return callback(null, tx); }; Mempool.prototype.has = Mempool.prototype.hasTX = function hasTX(hash, callback) { - return this.get(hash, function(err, tx) { - if (err) - return callback(err); - return callback(null, !!tx); - }); + callback = utils.asyncify(callback); + + if (hash instanceof bcoin.tx) + hash = hash.hash('hex'); + + return callback(null, !!this.txs[hash]); }; Mempool.prototype.add = @@ -348,7 +470,7 @@ Mempool.prototype.addTX = function addTX(tx, callback, force) { 0)); } - self.tx.isDoubleSpend(tx, function(err, doubleSpend) { + self.isDoubleSpend(tx, function(err, doubleSpend) { if (err) return callback(err); @@ -364,7 +486,7 @@ Mempool.prototype.addTX = function addTX(tx, callback, force) { return callback(err); if (!tx.hasCoins()) { - if (self.size > Mempool.MAX_MEMPOOL_SIZE) { + if (self.totalSize > Mempool.MAX_MEMPOOL_SIZE) { return callback(new VerifyError(tx, 'insufficientfee', 'mempool full', @@ -398,46 +520,195 @@ Mempool.prototype.addTX = function addTX(tx, callback, force) { }); }; +function AddressMap() { + this.map = {}; +} + +AddressMap.prototype.getKeys = function getKeys(address) { + return this.map[address] || []; +}; + +AddressMap.prototype.addTX = function addTX(tx) { + var hash = tx.hash('hex'); + var addresses = tx.getAddresses(); + var address; + var i; + + for (i = 0; i < addresses.length; i++) { + address = addresses[i]; + if (!this.map[address]) + this.map[address] = []; + this.map[address].push(hash); + } +}; + +function binarySearch(items, key, insert) { + var start = 0; + var end = items.length - 1; + var pos, cmp; + + while (start <= end) { + pos = (start + end) >>> 1; + cmp = utils.cmp(items[pos], key); + + if (cmp === 0) + return pos; + + if (cmp < 0) + start = pos + 1; + else + end = pos - 1; + } + + if (!insert) + return -1; + + if (start === 0) + return 0; + + return start - 1; +} + +AddressMap.prototype.removeTX = function removeTX(tx) { + var addresses = tx.getAddresses(); + var address; + var map, i; + + for (i = 0; i < addresses.length; i++) { + address = addresses[i]; + map = this.map[address]; + + if (map) { + i = map.indexOf(hash); + if (i !== -1) + map.splice(i , 1); + if (map.length === 0) + delete this.map[address]; + } + } +}; + +AddressMap.prototype.addCoin = function addCoin(coin) { + var address = coin.getAddress(); + var key = coin.hash + '/' + coin.index; + + if (address) { + if (!this.map[address]) + this.map[address] = []; + this.map[address].push(key); + } +}; + +AddressMap.prototype.removeCoin = function removeCoin(tx, i) { + var address, key, map, i; + + if (tx instanceof bcoin.input) { + address = tx.getAddress(); + key = tx.prevout.hash + '/' + tx.prevout.index; + } else { + address = tx.outputs[i].getAddress(); + key = tx.hash('hex') + '/' + i; + } + + map = this.map[address]; + + if (map) { + i = map.indexOf(key); + if (i !== -1) + map.splice(i, 1); + if (map.length === 0) + delete this.map[address]; + } +}; + Mempool.prototype.addUnchecked = function addUnchecked(tx, callback) { var self = this; - this.tx.addUnchecked(tx, function(err) { + var hash = tx.hash('hex'); + + this.txs[hash] = tx.toExtended(); + + this.addressMap.addTX(tx); + + tx.inputs.forEach(function(input, i) { + var key = input.prevout.hash + '/' + input.prevout.index; + delete self.coins[key]; + self.spent[key] = hash; + self.addressMap.removeCoin(input); + }); + + tx.outputs.forEach(function(output, i) { + var coin = bcoin.coin(tx, i); + var key = coin.hash + '/' + coin.index; + self.coins[key] = coin.toRaw(); + self.addressMap.addCoin(coin); + }); + + self.totalSize += tx.getSize(); + self.emit('tx', tx); + self.emit('add tx', tx); + + utils.debug('Added tx %s to the mempool.', tx.rhash); + + if (self.options.relay) + self.node.broadcast(tx); + + self.resolveOrphans(tx, function(err, resolved) { if (err) return callback(err); - self.size += tx.getSize(); - self.emit('tx', tx); - self.emit('add tx', tx); - - utils.debug('Added tx %s to the mempool.', tx.rhash); - - if (self.options.relay) - self.node.broadcast(tx); - - self.resolveOrphans(tx, function(err, resolved) { - if (err) - return callback(err); - - utils.forEachSerial(resolved, function(tx, next) { - self.addUnchecked(tx, function(err) { - if (err) - self.emit('error', err); - utils.debug('Resolved orphan %s in mempool.', tx.rhash); - next(); - }, true); - }, callback); - }); + utils.forEachSerial(resolved, function(tx, next) { + self.addUnchecked(tx, function(err) { + if (err) + self.emit('error', err); + utils.debug('Resolved orphan %s in mempool.', tx.rhash); + next(); + }, true); + }, callback); }); }; Mempool.prototype.removeUnchecked = function removeUnchecked(tx, callback) { var self = this; - this.tx.removeUnchecked(tx, function(err) { - if (err) - return callback(err); - self.size -= tx.getSize(); - self.emit('remove tx', tx); + var hash; + + callback = utils.asyncify(callback); + + try { + tx = this.getTXSync(tx); + } catch (e) { + return callback(e); + } + + if (!tx) return callback(); + + hash = tx.hash('hex'); + + delete this.txs[hash]; + delete this.orphans[hash]; + + this.addressMap.removeTX(tx); + + tx.inputs.forEach(function(input, i) { + var key = input.prevout.hash + '/' + input.prevout.index; + delete self.coins[key]; + delete self.spent[key]; + self.addressMap.removeCoin(input); }); + + tx.outputs.forEach(function(output, i) { + var address = output.getAddress(); + var key = hash + '/' + i; + + delete self.coins[key]; + delete self.spent[key]; + self.addressMap.removeCoin(tx, i); + }); + + self.totalSize -= tx.getSize(); + self.emit('remove tx', tx); + + return callback(); }; Mempool.prototype.verify = function verify(tx, callback) { @@ -565,38 +836,81 @@ Mempool.prototype.verify = function verify(tx, callback) { }); }; -Mempool.prototype.countAncestors = function countAncestors(tx, callback) { +Mempool.prototype.countAncestorsSync = function countAncestorsSync(tx) { var self = this; var inputs = new Array(tx.inputs.length); - utils.forEachSerial(tx.inputs, function(input, next, i) { + var i, input, prev; + + for (i = 0; i < tx.inputs.length; i++) { + input = tx.inputs[i]; + prev = self.getTXSync(input.prevout.hash); inputs[i] = 0; - self.getTX(input.prevout.hash, function(err, tx) { - if (err) - return next(err); - if (!tx) - return next(); + if (!prev) + continue; - inputs[i] += 1; + inputs[i] += 1; + inputs[i] += self.countAncestorsSync(prev); + } - self.countAncestors(tx, function(err, max) { - if (err) - return next(err); - - inputs[i] += max; - - next(); - }); - }); - }, function(err) { - if (err) - return callback(err); - - return callback(null, inputs.sort().pop()); - }); + return inputs.sort().pop(); }; -Mempool.prototype._hasTX = function hasTX(tx, callback, force) { +Mempool.prototype.countAncestors = function countAncestors(tx, callback) { + var count; + + callback = utils.asyncify(callback); + + try { + count = this.countAncestorsSync(tx); + } catch (e) { + return callback(e); + } + + return callback(null, count); +}; + +Mempool.prototype.hasOrphanSync = function hasOrphanSync(hash) { + if (hash instanceof bcoin.tx) + hash = hash.hash('hex'); + + return !!this.orphans[hash]; +}; + +Mempool.prototype.getOrphanSync = function getOrphanSync(hash) { + var orphan; + + if (hash instanceof bcoin.tx) + hash = hash.hash('hex'); + + orphan = this.orphans[hash]; + + if (!orphan) + return callback(); + + return bcoin.tx.fromExtended(orphan, true); +}; + +Mempool.prototype.hasOrphan = function hasOrphan(hash, callback) { + callback = utils.asyncify(callback); + return callback(null, this.hasOrphanSync(hash)); +}; + +Mempool.prototype.getOrphan = function getOrphan(hash, callback) { + var orphan; + + callback = utils.asyncify(callback); + + try { + orphan = this.getOrphanSync(hash); + } catch (e) { + return callback(e); + } + + return callback(null, orphan); +}; + +Mempool.prototype._hasTX = function hasTX(tx, callback) { var self = this; var hash = tx.hash('hex'); @@ -607,160 +921,159 @@ Mempool.prototype._hasTX = function hasTX(tx, callback, force) { if (result) return callback(null, result); - self.db.get('m/D/' + hash, function(err, tx) { - if (err && err.type !== 'NotFoundError') + self.hasOrphan(hash, function(err, result) { + if (err) return callback(err); - return callback(null, !!tx); + return callback(null, result); }); }); }; -Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) { +Mempool.prototype.storeOrphan = function storeOrphan(tx, callback) { var self = this; - var outputs = {}; - var batch = this.db.batch(); + var prevout = {}; var hash = tx.hash('hex'); var i, input, p; for (i = 0; i < tx.inputs.length; i++) { input = tx.inputs[i]; if (!input.coin) - outputs[input.prevout.hash] = true; + prevout[input.prevout.hash] = true; } - outputs = Object.keys(outputs); + prevout = Object.keys(prevout); - assert(outputs.length > 0); + assert(prevout.length > 0); - utils.forEachSerial(outputs, function(key, next) { - self.db.get('m/d/' + key, function(err, buf) { - if (err && err.type !== 'NotFoundError') - return next(err); + prevout.forEach(function(key) { + if (!self.waiting[key]) + self.waiting[key] = []; - p = new BufferWriter(); - - if (buf) - p.writeBytes(buf); - - p.writeHash(hash); - - batch.put('m/d/' + key, p.render()); - - next(); - }); - }, function(err) { - if (err) - return callback(err); - - self.orphans++; - - batch.put('m/D/' + hash, tx.toExtended(true)); - - if (self.orphans > Mempool.MAX_ORPHAN_TX) { - return self.purgeOrphans(function(err) { - if (err) - return callback(err); - batch.write(callback); - }); - } - - batch.write(callback); + self.waiting[key].push(hash); }); + + self.totalOrphans++; + + self.orphans[hash] = tx.toExtended(true); + + if (self.totalOrphans > Mempool.MAX_ORPHAN_TX) + return self.purgeOrphans(callback); + + return utils.nextTick(callback); }; Mempool.prototype.getBalance = function getBalance(callback) { - return this.tx.getBalance(callback); -}; + var coins = []; + var hashes = Object.keys(this.coins); + var parts, hash, index, i, coin; + var unconfirmed = new bn(0); + var confirmed = new bn(0); -Mempool.prototype.getAll = function getAll(callback) { - return this.tx.getAll(callback); -}; + callback = utils.asyncify(callback); -Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force) { - var self = this; - var hash = tx.hash('hex'); - var hashes = []; - var resolved = []; - var batch = this.db.batch(); - var p; - - this.db.get('m/d/' + hash, function(err, buf) { - if (err && err.type !== 'NotFoundError') - return callback(err); - - if (!buf) - return callback(null, resolved); - - p = new BufferReader(buf); - - p.start(); + for (i = 0; i < hashes.length; i++) { + parts = hashes[i].split('/'); + hash = parts[0]; + index = +parts[1]; try { - while (p.left()) - hashes.push(p.readHash('hex')); + coin = this.getCoinSync(hash, index); } catch (e) { return callback(e); } - p.end(); + coins.push(coin); + } - utils.forEachSerial(hashes, function(orphanHash, next, i) { - self.db.get('m/D/' + orphanHash, function(err, orphan) { - if (err && err.type !== 'NotFoundError') - return next(err); + for (i = 0; i < coins.length; i++) { + coin = coins[i]; + if (coin.height !== -1) + confirmed.iadd(coin.value); + unconfirmed.iadd(coin.value); + } - if (!orphan) - return next(); + return callback(null, { + unconfirmed: unconfirmed, + confirmed: confirmed + }); +}; - try { - orphan = bcoin.tx.fromExtended(orphan, true); - } catch (e) { - return next(e); - } +Mempool.prototype.getAll = function getAll(callback) { + var txs = []; + var hashes = Object.keys(this.txs); + var i, tx; - orphan.inputs.forEach(function(input) { - if (!input.coin && input.prevout.hash === hash) - input.coin = bcoin.coin(tx, input.prevout.index); - }); + callback = utils.asyncify(callback); - if (orphan.hasCoins()) { - self.orphans--; - batch.del('m/D/' + orphanHash); - return self.verify(orphan, function(err) { - if (err) { - if (err.type === 'VerifyError') - return next(); - return next(err); - } - resolved.push(orphan); - return next(); - }); - } + for (i = 0; i < hashes.length; i++) { + try { + tx = this.getTXSync(hashes[i]); + } catch (e) { + return callback(e); + } - batch.put('m/D/' + orphanHash, orphan.toExtended(true)); - next(); - }); - }, function(err) { - if (err) - return callback(err); + txs.push(tx); + } - function done(err) { - if (err) - return callback(err); + return callback(null, txs); +}; - return callback(null, resolved); - } +Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback) { + var self = this; + var hash = tx.hash('hex'); + var hashes = this.waiting[hash]; + var resolved = []; - batch.del('m/d/' + hash); + if (!hashes) + return callback(null, resolved); - return batch.write(done); + utils.forEachSerial(hashes, function(orphanHash, next, i) { + var orphan = self.orphans[orphanHash]; + + if (!orphan) + return next(); + + try { + orphan = bcoin.tx.fromExtended(orphan, true); + } catch (e) { + return next(e); + } + + orphan.inputs.forEach(function(input) { + if (!input.coin && input.prevout.hash === hash) + input.coin = bcoin.coin(tx, input.prevout.index); }); + + if (orphan.hasCoins()) { + self.totalOrphans--; + delete self.orphans[orphanHash]; + return self.verify(orphan, function(err) { + if (err) { + if (err.type === 'VerifyError') + return next(); + return next(err); + } + resolved.push(orphan); + return next(); + }); + } + + self.orphans[orphanHash] = orphan.toExtended(true); + + next(); + }, function(err) { + if (err) + return callback(err); + + delete self.waiting[hash]; + + return callback(null, resolved); }); }; Mempool.prototype.getSnapshot = function getSnapshot(callback) { - return this.tx.getAllHashes(callback); + return utils.asyncify(callback)(null, Object.keys(this.txs)); }; Mempool.prototype.checkLocks = function checkLocks(tx, flags, callback) {