diff --git a/lib/bcoin/mempool.js b/lib/bcoin/mempool.js index c425ec0c..9bc7b1d4 100644 --- a/lib/bcoin/mempool.js +++ b/lib/bcoin/mempool.js @@ -10,8 +10,6 @@ module.exports = function(bcoin) { /* * Database Layout: * (inherits all from txdb) - * o/[hash] -> orphan/waiting hashes - * O/[hash] -> orphan tx */ var EventEmitter = require('events').EventEmitter; @@ -43,7 +41,7 @@ var DUMMY = new Buffer([0]); * @property {Boolean} loaded * @property {Object} db * @property {Number} size - * @property {Number} orphans + * @property {Number} totalOrphans * @property {Locker} locker * @property {Number} freeCount * @property {Number} lastTime @@ -76,7 +74,9 @@ function Mempool(options) { this.db = null; this.tx = null; this.size = 0; - this.orphans = 0; + this.waiting = {}; + this.orphans = {}; + this.totalOrphans = 0; this.spent = 0; this.total = 0; @@ -406,47 +406,14 @@ Mempool.prototype.limitMempoolSize = function limitMempoolSize(callback) { Mempool.prototype.pruneOrphans = function pruneOrphans(callback) { var self = this; - var hashes = []; - var iter; callback = utils.ensure(callback); - iter = this.db.iterator({ - gte: 'O', - lte: 'O~', - keys: true, - values: false, - fillCache: false, - keyAsBuffer: false - }); - - function done(err) { - if (err) - return callback(err); - - utils.forEachSerial(hashes, function(hash, next) { - if (self.orphans <= constants.mempool.MAX_ORPHAN_TX) - return next(); - self.removeOrphan(hash, next); - }, callback); - } - - (function next() { - iter.next(function(err, key, value) { - if (err) { - return iter.end(function() { - callback(err); - }); - } - - if (key === undefined) - return iter.end(done); - - hashes.push(key.split('/')[1]); - - next(); - }); - })(); + utils.forEachSerial(Object.keys(this.orphans), function(hash, next) { + if (self.totalOrphans <= constants.mempool.MAX_ORPHAN_TX) + return next(); + self.removeOrphan(hash, next); + }, callback); }; /** @@ -1020,7 +987,7 @@ Mempool.prototype.countAncestors = function countAncestors(tx, callback) { Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) { var self = this; var prevout = {}; - var i, hash, batch, input, p; + var i, hash, batch, input, prev; if (tx.getSize() > 5000) { bcoin.debug('Ignoring large orphan: %s', tx.rhash); @@ -1028,7 +995,6 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) { } hash = tx.hash('hex'); - batch = this.db.batch(); for (i = 0; i < tx.inputs.length; i++) { input = tx.inputs[i]; @@ -1040,40 +1006,25 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) { assert(prevout.length > 0); - utils.forEachSerial(prevout, function(prev, next) { - self.getWaiting(prev, function(err, orphans, buf) { + for (i = 0; i < prevout.length; i++) { + prev = prevout[i]; + if (!this.waiting[prev]) + this.waiting[prev] = []; + this.waiting[prev].push(hash); + } + + this.orphans[hash] = tx.toExtended(true); + this.totalOrphans++; + + if (self.totalOrphans > constants.mempool.MAX_ORPHAN_TX) { + return self.pruneOrphans(function(err) { if (err) - return next(err); - - p = new BufferWriter(); - - if (buf) - p.writeBytes(buf); - - p.writeHash(hash); - - batch.put('o/' + prev, p.render()); - - next(); + return callback(err); + callback(); }); - }, function(err) { - if (err) - return callback(err); + } - self.orphans++; - - batch.put('O/' + hash, tx.toExtended(true)); - - if (self.orphans > constants.mempool.MAX_ORPHAN_TX) { - return self.pruneOrphans(function(err) { - if (err) - return callback(err); - batch.write(callback); - }); - } - - batch.write(callback); - }); + return callback(); }; /** @@ -1101,28 +1052,7 @@ Mempool.prototype.getHistory = function getHistory(callback) { */ Mempool.prototype.getWaiting = function getWaiting(hash, callback) { - var self = this; - var hashes = []; - var p; - - this.db.get('o/' + hash, function(err, buf) { - if (err && err.type !== 'NotFoundError') - return callback(err); - - if (!buf) - return callback(null, hashes, buf); - - p = new BufferReader(buf); - - try { - while (p.left()) - hashes.push(p.readHash('hex')); - } catch (e) { - return callback(e); - } - - return callback(null, hashes, buf); - }); + return callback(null, this.waiting[hash] || []); }; /** @@ -1133,22 +1063,18 @@ Mempool.prototype.getWaiting = function getWaiting(hash, callback) { Mempool.prototype.getOrphan = function getOrphan(orphanHash, callback) { var self = this; + var orphan = this.orphans[orphanHash]; - this.db.get('O/' + orphanHash, function(err, orphan) { - if (err && err.type !== 'NotFoundError') - return callback(err); + if (!orphan) + return callback(); - if (!orphan) - return callback(); + try { + orphan = bcoin.tx.fromExtended(orphan, true); + } catch (e) { + return callback(e); + } - try { - orphan = bcoin.tx.fromExtended(orphan, true); - } catch (e) { - return callback(e); - } - - return callback(null, orphan); - }); + return callback(null, orphan); }; /** @@ -1157,12 +1083,7 @@ Mempool.prototype.getOrphan = function getOrphan(orphanHash, callback) { */ Mempool.prototype.hasOrphan = function hasOrphan(orphanHash, callback) { - this.db.get('O/' + orphanHash, function(err, orphan) { - if (err && err.type !== 'NotFoundError') - return callback(err); - - return callback(null, orphan != null); - }); + return callback(null, this.orphans[orphanHash] != null); }; /** @@ -1178,7 +1099,6 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force) var self = this; var hash = tx.hash('hex'); var resolved = []; - var batch = this.db.batch(); this.getWaiting(hash, function(err, hashes) { if (err) @@ -1195,29 +1115,23 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force) orphan.fillCoins(tx); if (orphan.hasCoins()) { - self.orphans--; - batch.del('O/' + orphanHash); + self.totalOrphans--; + delete self.orphans[orphanHash]; resolved.push(orphan); return next(); } - batch.put('O/' + orphanHash, orphan.toExtended(true)); + self.orphans[orphanHash] = orphan.toExtended(true); + next(); }); }, function(err) { if (err) return callback(err); - function done(err) { - if (err) - return callback(err); + delete self.waiting[hash]; - return callback(null, resolved); - } - - batch.del('o/' + hash); - - return batch.write(done); + return callback(null, resolved); }); }); }; @@ -1230,7 +1144,7 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force) Mempool.prototype.removeOrphan = function removeOrphan(tx, callback) { var self = this; - var batch, prevout, hash; + var prevout, hash; function getOrphan(tx, callback) { if (typeof tx === 'string') @@ -1245,14 +1159,10 @@ Mempool.prototype.removeOrphan = function removeOrphan(tx, callback) { if (!tx) return callback(); - batch = self.db.batch(); - hash = tx.hash('hex'); prevout = tx.getPrevout(); - batch.del('O/' + hash); - - utils.forEach(prevout, function(prev, next) { + utils.forEachSerial(prevout, function(prev, next) { var i, p; self.getWaiting(prev, function(err, hashes) { if (err) @@ -1266,16 +1176,11 @@ Mempool.prototype.removeOrphan = function removeOrphan(tx, callback) { hashes.splice(i, 1); if (hashes.length === 0) { - batch.del('o/' + prev); + delete self.waiting[prev]; return next(); } - p = new BufferWriter(); - - for (i = 0; i < hashes.length; i++) - p.writeHash(hashes[i]); - - batch.put('o/' + prev, p.render()); + self.waiting[prev] = hashes; next(); }); @@ -1283,9 +1188,10 @@ Mempool.prototype.removeOrphan = function removeOrphan(tx, callback) { if (err) return callback(err); - self.orphans--; + delete self.orphans[hash]; + self.totalOrphans--; - batch.write(callback); + callback(); }); }); };