From 37c488c802575b44854178d5092a957bd6a60b70 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Sat, 26 Mar 2016 01:29:04 -0700 Subject: [PATCH] txdb and mempool work. --- lib/bcoin/fullnode.js | 33 ++-- lib/bcoin/mempool.js | 119 +++++++------- lib/bcoin/txdb.js | 354 ++++++++++++++++++++++++++---------------- test/wallet-test.js | 19 ++- 4 files changed, 303 insertions(+), 222 deletions(-) diff --git a/lib/bcoin/fullnode.js b/lib/bcoin/fullnode.js index c042f554..e5f6c435 100644 --- a/lib/bcoin/fullnode.js +++ b/lib/bcoin/fullnode.js @@ -118,12 +118,21 @@ Fullnode.prototype._init = function _init() { // Update the mempool. this.chain.on('add block', function(block) { - self.mempool.addBlock(block); + self.mempool.addBlock(block, function(err) { + if (err) + self.emit('error', err); + }); }); this.chain.on('remove block', function(block) { - self.mempool.removeBlock(block); - self.walletdb.removeBlock(block); + self.mempool.removeBlock(block, function(err) { + if (err) + self.emit('error', err); + }); + self.walletdb.removeBlock(block, function(err) { + if (err) + self.emit('error', err); + }); }); function load(err) { @@ -149,7 +158,7 @@ Fullnode.prototype._init = function _init() { self.createWallet(options, function(err, wallet) { if (err) - throw err; + return self.emit('error', err); // Set the miner payout address if the // programmer didn't pass one in. @@ -340,22 +349,14 @@ Fullnode.prototype.getTXByAddress = function getTXByAddress(addresses, callback) Fullnode.prototype.fillCoin = function fillCoin(tx, callback) { var self = this; - this.mempool.tx.isDoubleSpend(tx, function(err, result) { + this.mempool.fillCoin(tx, function(err) { if (err) return callback(err); - if (result) - return callback(null, tx, true); + if (tx.hasPrevout()) + return callback(null, tx); - self.mempool.fillCoin(tx, function(err) { - if (err) - return callback(err); - - if (tx.hasPrevout()) - return callback(null, tx); - - self.chain.db.fillCoin(tx, callback); - }); + self.chain.db.fillCoin(tx, callback); }); }; diff --git a/lib/bcoin/mempool.js b/lib/bcoin/mempool.js index 00408e50..ddfe0d1a 100644 --- a/lib/bcoin/mempool.js +++ b/lib/bcoin/mempool.js @@ -31,7 +31,6 @@ function Mempool(node, options) { this.options = options; this.node = node; this.chain = node.chain; - // this.db = node.chain.db.db; this.db = bcoin.ldb('mempool', { db: 'memdown' @@ -100,22 +99,34 @@ Mempool.prototype.open = function open(callback) { return this.db.open(callback); }; -Mempool.prototype.addBlock = function addBlock(block, callback) { +Mempool.prototype.addBlock = function addBlock(block, callback, force) { var self = this; - callback = utils.ensure(callback); - // Remove now-mined transactions - // XXX should batch this - utils.forEachSerial(block.txs, function(tx, next) { - self.tx.remove(tx, next); - }, callback); + var unlock = this._lock(addBlock, [block, callback], force); + if (!unlock) + return; + + callback = utils.wrap(callback, unlock); + + this.open(function(err) { + if (err) + return callback(err); + + utils.forEachSerial(block.txs, function(tx, next) { + self.tx.removeUnchecked(tx, next); + }, callback); + }); }; -Mempool.prototype.removeBlock = function removeBlock(block, callback) { +Mempool.prototype.removeBlock = function removeBlock(block, callback, force) { var self = this; - callback = utils.ensure(callback); - // XXX should batch this + var unlock = this._lock(removeBlock, [block, callback], force); + if (!unlock) + return; + + callback = utils.wrap(callback, unlock); + utils.forEachSerial(block.txs.slice().reverse(), function(tx, next) { - self.tx.add(tx, next); + self.tx.addUnchecked(tx, next); }, callback); }; @@ -208,7 +219,7 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback, force) { if (exists) return callback(); - self.node.fillCoin(tx, function(err, tx, doubleSpend) { + self.tx.isDoubleSpend(tx, function(err, doubleSpend) { if (err) return callback(err); @@ -217,17 +228,22 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback, force) { return callback(new VerifyError('bad-txns-inputs-spent', 0)); } - if (!tx.hasPrevout()) - return self.storeOrphan(tx, callback); - - self.verify(tx, function(err) { - if (err) { - if (err.type === 'VerifyError' && err.score >= 0) - peer.sendReject(tx, err.reason, err.score); + self.node.fillCoin(tx, function(err) { + if (err) return callback(err); - } - self.addUnchecked(tx, peer, callback); + if (!tx.hasPrevout()) + return self.storeOrphan(tx, callback); + + self.verify(tx, function(err) { + if (err) { + if (err.type === 'VerifyError' && err.score >= 0) + peer.sendReject(tx, err.reason, err.score); + return callback(err); + } + + self.addUnchecked(tx, peer, callback); + }); }); }); }); @@ -235,7 +251,7 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback, force) { Mempool.prototype.addUnchecked = function addUnchecked(tx, peer, callback) { var self = this; - self.tx.add(tx, function(err) { + self.tx.addUnchecked(tx, function(err) { if (err) return callback(err); @@ -351,7 +367,7 @@ Mempool.prototype._hasTX = function hasTX(tx, callback, force) { if (result) return callback(null, result); - self.db.get('D/' + hash, function(err, tx) { + self.db.get('m/D/' + hash, function(err, tx) { if (err && err.type !== 'NotFoundError') return callback(err); @@ -378,7 +394,7 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) { assert(outputs.length > 0); utils.forEachSerial(outputs, function(key, next) { - self.db.get('d/' + key, function(err, buf) { + self.db.get('m/d/' + key, function(err, buf) { if (err && err.type !== 'NotFoundError') return next(err); @@ -389,7 +405,7 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) { p.writeHash(hash); - batch.put('d/' + key, p.render()); + batch.put('m/d/' + key, p.render()); next(); }); @@ -397,7 +413,7 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) { if (err) return callback(err); - batch.put('D/' + hash, tx.toExtended(true)); + batch.put('m/D/' + hash, tx.toExtended(true)); batch.write(callback); }); }; @@ -418,7 +434,7 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force) var batch = this.db.batch(); var p; - this.db.get('d/' + hash, function(err, buf) { + this.db.get('m/d/' + hash, function(err, buf) { if (err && err.type !== 'NotFoundError') return callback(err); @@ -439,7 +455,7 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force) p.end(); utils.forEachSerial(hashes, function(orphanHash, next, i) { - self.db.get('D/' + orphanHash, function(err, orphan) { + self.db.get('m/D/' + orphanHash, function(err, orphan) { if (err && err.type !== 'NotFoundError') return next(err); @@ -452,10 +468,13 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force) return next(e); } - orphan.fillPrevout(tx); + orphan.inputs.forEach(function(input) { + if (!input.output && input.prevout.hash === hash) + input.output = bcoin.coin(tx, input.prevout.index); + }); if (orphan.hasPrevout()) { - batch.del('D/' + orphanHash); + batch.del('m/D/' + orphanHash); return self.verify(orphan, function(err) { if (err) { if (err.type === 'VerifyError') @@ -467,7 +486,7 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force) }); } - batch.put('D/' + orphanHash, orphan.toExtended(true)); + batch.put('m/D/' + orphanHash, orphan.toExtended(true)); next(); }); }, function(err) { @@ -481,7 +500,7 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force) return callback(null, resolved); } - batch.del('d/' + hash); + batch.del('m/d/' + hash); return batch.write(done); }); @@ -492,40 +511,6 @@ Mempool.prototype.getInv = function getInv(callback) { return this.tx.getAllHashes(callback); }; -Mempool.prototype.remove = -Mempool.prototype.removeTX = function removeTX(hash, callback, force) { - var self = this; - - var unlock = this._lock(removeTX, [hash, callback], force); - if (!unlock) - return; - - function getTX() { - if (hash instanceof bcoin.tx) - return callback(null, hash); - - return self.getTX(hash, function(err, tx) { - if (err) - return callback(err); - if (!tx) - return callback(); - return self.node.fillTX(tx, callback); - }); - } - - return getTX(function(err, tx) { - if (err) - return callback(err); - - self.tx.remove(tx, function(err) { - if (err) - return callback(err); - - self.emit('remove tx', tx); - }); - }); -}; - Mempool.prototype.checkTX = function checkTX(tx, peer) { return Mempool.checkTX(tx, peer); }; diff --git a/lib/bcoin/txdb.js b/lib/bcoin/txdb.js index 1f8d6559..3d6998b6 100644 --- a/lib/bcoin/txdb.js +++ b/lib/bcoin/txdb.js @@ -30,6 +30,7 @@ function TXPool(prefix, db, options) { this.options = options; this.busy = false; this.jobs = []; + this.locker = new bcoin.locker(this); if (this.options.mapAddress) this.options.indexAddress = true; @@ -38,40 +39,7 @@ function TXPool(prefix, db, options) { utils.inherits(TXPool, EventEmitter); TXPool.prototype._lock = function _lock(func, args, force) { - var self = this; - var called; - - if (force) { - assert(this.busy); - return function unlock() { - assert(!called); - called = true; - }; - } - - if (this.busy) { - this.jobs.push([func, args]); - return; - } - - this.busy = true; - - return function unlock() { - var item; - - assert(!called); - called = true; - - self.busy = false; - - if (self.jobs.length === 0) { - self.emit('flush'); - return; - } - - item = self.jobs.shift(); - item[0].apply(self, item[1]); - }; + return this.locker.lock(func, args, force); }; TXPool.prototype.getMap = function getMap(tx, callback) { @@ -325,20 +293,24 @@ TXPool.prototype._add = function add(tx, map, callback, force) { // Consume unspent money or add orphans utils.forEachSerial(tx.inputs, function(input, next, i) { - var key; + var key, address; if (tx.isCoinbase()) return next(); - key = input.prevout.hash + '/' + input.prevout.index; + address = input.getAddress(); + + // Only add orphans if this input is ours. + if (self.options.mapAddress) { + if (!address || !map[address].length) + return next(); + } self.getCoin(input.prevout.hash, input.prevout.index, function(err, coin) { - var address; - if (err) return next(err); - address = input.getAddress(); + key = input.prevout.hash + '/' + input.prevout.index; if (coin) { // Add TX to inputs and spend money @@ -354,50 +326,52 @@ TXPool.prototype._add = function add(tx, map, callback, force) { if (self.options.indexAddress && address) { map[address].forEach(function(id) { - batch.del( - prefix + 'u/a/' + id - + '/' + input.prevout.hash - + '/' + input.prevout.index); + batch.del(prefix + 'u/a/' + id + '/' + key); }); } - batch.del( - prefix + 'u/t/' - + input.prevout.hash - + '/' + input.prevout.index); - - if (self.options.indexSpent) { - batch.put( - prefix + 's/t/' - + input.prevout.hash - + '/' + input.prevout.index, - tx.hash()); - } + batch.del(prefix + 'u/t/' + key); + batch.put(prefix + 's/t/' + key, tx.hash()); return next(); } input.output = null; - // Only add orphans if this input is ours. - if (self.options.mapAddress) { - if (!address || !map[address].length) - return next(); - } - - self.isSpent(input.prevout.hash, input.prevout.index, function(err, result) { + self.isSpent(input.prevout.hash, input.prevout.index, function(err, spentBy) { if (err) return next(err); // Are we double-spending? - if (result) { - if (!self.options.indexSpent) - return next(new Error('Transaction is double-spending.')); - return self._removeSpenders(hash, function(err) { + // Replace older txs with newer ones. + if (spentBy) { + return self.getTX(input.prevout.hash, function(err, prev) { if (err) return next(err); - batch.clear(); - self._add(tx, map, callback, true); + + if (!prev) + return callback(new Error('Could not find double-spent coin.')); + + input.output = bcoin.coin(prev, input.prevout.index); + + // Skip invalid transactions + if (self.options.verify) { + if (!tx.verify(i)) + return callback(null, false); + } + + return self._removeSpenders(spentBy, tx, function(err, result) { + if (err) + return next(err); + + if (!result) { + assert(tx.ts === 0, 'I\'m confused'); + return callback(null, false); + } + + batch.clear(); + self._add(tx, map, callback, true); + }); }); } @@ -525,7 +499,7 @@ TXPool.prototype._add = function add(tx, map, callback, force) { }, true); }; -TXPool.prototype._removeSpenders = function removeSpenders(hash, callback) { +TXPool.prototype._removeSpenders = function removeSpenders(hash, ref, callback) { var self = this; this.getTX(hash, function(err, tx) { if (err) @@ -535,54 +509,31 @@ TXPool.prototype._removeSpenders = function removeSpenders(hash, callback) { return callback(new Error('Could not find spender.')); if (tx.ts !== 0) - return callback(new Error('Transaction is double-spending.')); + return callback(null, false); - utils.forEachSerial(tx.outputs, function(next, output, i) { + if (ref.ts === 0 && ref.ps < ts.ps) + return callback(null, false); + + utils.forEachSerial(tx.outputs, function(output, next, i) { self.isSpent(hash, i, function(err, spent) { if (err) return next(err); if (spent) - return self._removeSpenders(spent, next); + return self._removeSpenders(spent, ref, next); next(); }); }, function(err) { if (err) return callback(err); - return self.lazyRemove(tx, callback, true); + return self.lazyRemove(tx, function(err) { + if (err) + return callback(err); + return callback(null, true); + }, true); }); }); }; -TXPool.prototype._collectSpenders = function _collectSpenders(tx, callback, spenders) { - var self = this; - var hash = tx.hash('hex'); - - if (!spenders) - spenders = []; - - utils.forEachSerial(tx.outputs, function(next, output, i) { - self.isSpent(hash, i, function(err, spentBy) { - if (err) - return next(err); - if (spentBy) { - spenders.push(spentBy); - return self.getTX(spentBy, function(err, tx) { - if (err) - return next(err); - if (!tx) - return next(); - return self._removeSpenders(tx, next, spenders); - }); - } - next(); - }); - }, function(err) { - if (err) - return callback(err); - return callback(null, spenders); - }); -}; - TXPool.prototype.isDoubleSpend = function isDoubleSpend(tx, callback) { var self = this; utils.everySerial(tx.inputs, function(input, next) { @@ -601,36 +552,19 @@ TXPool.prototype.isDoubleSpend = function isDoubleSpend(tx, callback) { }); }; -TXPool.prototype.isSpent = function isSpent(hash, index, callback, checkCoin) { +TXPool.prototype.isSpent = function isSpent(hash, index, callback) { var self = this; + var prefix = this.prefix + '/'; + var key = prefix + 's/t/' + hash + '/' + index; - if (this.options.indexSpent) { - return this.db.get('s/t/' + hash + '/' + index, function(err, exists) { - if (err && err.type !== 'NotFoundError') - return callback(err); - - return callback(null, exists ? exists.toString('hex') : null); - }); - } - - function getCoin(callback) { - if (!checkCoin) - return callback(null, null); - return self.getCoin(hash, index, callback); - } - - return getCoin(function(err, coin) { - if (err) + return this.db.get(key, function(err, hash) { + if (err && err.type !== 'NotFoundError') return callback(err); - if (coin) - return callback(null, false); + if (!hash) + return callback(null, null); - return self.getTX(hash, function(err, tx) { - if (err) - return callback(err); - return callback(null, !!tx); - }); + return callback(null, utils.toHex(hash)); }); }; @@ -728,10 +662,6 @@ TXPool.prototype._confirm = function _confirm(tx, map, callback, force) { TXPool.prototype.remove = function remove(hash, callback, force) { var self = this; - var first = Array.isArray(hash) ? hash[0] : hash; - - if (!this.options.indexExtra && (first instanceof bcoin.tx)) - return this.lazyRemove(hash, callback, force); if (Array.isArray(hash)) { return utils.forEachSerial(hash, function(hash, next) { @@ -858,11 +788,9 @@ TXPool.prototype._remove = function remove(tx, map, callback, force) { + '/' + input.prevout.index, input.output.toRaw()); - if (self.options.indexSpent) { - batch.del(prefix + 's/t/' - + input.prevout.hash - + '/' + input.prevout.index); - } + batch.del(prefix + 's/t/' + + input.prevout.hash + + '/' + input.prevout.index); batch.del(prefix + 'o/' + input.prevout.hash + '/' + input.prevout.index); }); @@ -1603,6 +1531,158 @@ TXPool.prototype.getBalance = function getBalance(callback) { return this.getBalanceByAddress(null, callback); }; +TXPool.prototype.addUnchecked = function addUnchecked(tx, callback, force) { + var self = this; + var prefix = this.prefix + '/'; + var hash = tx.hash('hex'); + var batch; + + var unlock = this._lock(addUnchecked, [tx, callback], force); + if (!unlock) + return; + + callback = utils.wrap(callback, unlock); + + batch = this.db.batch(); + + batch.put(prefix + 't/t/' + hash, tx.toExtended()); + + tx.getAddresses().forEach(function(address) { + batch.put(prefix + 't/a/' + address + '/' + hash, DUMMY); + }); + + tx.inputs.forEach(function(input) { + var key = input.prevout.hash + '/' + input.prevout.index; + var address; + + if (tx.isCoinbase()) + return; + + assert(input.output); + address = input.getAddress(); + + batch.del(prefix + 'u/t/' + key); + batch.put(prefix + 's/t/' + key, tx.hash()); + + if (address) + batch.del(prefix + 'u/a/' + address + '/' + key); + }); + + tx.outputs.forEach(function(output, i) { + var key = hash + '/' + i; + var address = output.getAddress(); + var coin = bcoin.coin(tx, i).toRaw(); + batch.put(prefix + 'u/t/' + key, coin); + + if (address) + batch.put(prefix + 'u/a/' + address + '/' + key, DUMMY); + }); + + return batch.write(function(err) { + if (err) + return callback(err); + self.emit('add tx', tx); + return callback(); + }); +}; + +TXPool.prototype.removeUnchecked = function removeUnchecked(tx, callback, force) { + var self = this; + var prefix = this.prefix + '/'; + var hash = tx.hash('hex'); + var batch; + + var unlock = this._lock(removeUnchecked, [tx, callback], force); + if (!unlock) + return; + + callback = utils.wrap(callback, unlock); + + batch = this.db.batch(); + + batch.del(prefix + 't/t/' + hash); + batch.del(prefix + 'D/' + hash); + + tx.getAddresses().forEach(function(address) { + batch.del(prefix + 't/a/' + address + '/' + hash); + }); + + tx.inputs.forEach(function(input) { + var key = input.prevout.hash + '/' + input.prevout.index; + var address; + + if (tx.isCoinbase()) + return; + + if (!input.output) + return; + + address = input.getAddress(); + + batch.del(prefix + 'u/t/' + key); + batch.del(prefix + 's/t/' + key); + + if (address) + batch.del(prefix + 'u/a/' + address + '/' + key); + }); + + tx.outputs.forEach(function(output, i) { + var key = hash + '/' + i; + var address = output.getAddress(); + + batch.del(prefix + 'u/t/' + key); + + if (address) + batch.del(prefix + 'u/a/' + address + '/' + key); + }); + + batch.write(function(err) { + if (err) + return callback(err); + self.emit('remove tx', tx); + return callback(); + }); +}; + +TXPool.prototype.zap = function zap(tip, callback, force) { + var self = this; + var now = tip.ts; + var age = 10 * 60 * 60; + + var unlock = this._lock(zap, [tip, callback], force); + if (!unlock) + return; + + callback = utils.wrap(callback, unlock); + + // return this.getRange(null, { + // start: 0, + // end: now - age + // }, function(err, txs) { + // }); + + this.getPending(function(err, txs) { + if (err) + return callback(err); + + txs = txs.filter(function(tx) { + return now > tx.ps + age; + }); + + if (txs.length === 0) + return callback(); + + self.fillTX(txs, function(err) { + if (err) + return callback(err); + + utils.forEachSerial(txs, function(tx, next) { + self.lazyRemove(tx, next); + }, callback); + }); + }); +}; + /** * Expose */ diff --git a/test/wallet-test.js b/test/wallet-test.js index feb32655..fa2cab82 100644 --- a/test/wallet-test.js +++ b/test/wallet-test.js @@ -19,6 +19,7 @@ var dummyInput = { index: 0 }, script: new bcoin.script([]), + witness: new bcoin.script.witness([]), sequence: 0xffffffff }; @@ -131,11 +132,13 @@ describe('Wallet', function() { assert(tx.verify()); }); + var dw, di; it('should have TX pool and be serializable', function(cb) { wdb.create({}, function(err, w) { assert.noError(err); wdb.create({}, function(err, f) { assert.noError(err); + dw = w; // Coinbase var t1 = bcoin.mtx().addOutput(w, 50000).addOutput(w, 1000); @@ -145,6 +148,7 @@ describe('Wallet', function() { var t2 = bcoin.mtx().addInput(t1, 0) // 50000 .addOutput(w, 24000) .addOutput(w, 24000); + di = t2.inputs[0]; // balance: 49000 w.sign(t2); var t3 = bcoin.mtx().addInput(t1, 1) // 1000 @@ -179,8 +183,6 @@ describe('Wallet', function() { fake.hint = 'fake'; // Fake TX should temporarly change output - // wdb.addTX(fake); - wdb.addTX(fake, function(err) { assert.noError(err); wdb.addTX(t4, function(err) { @@ -234,6 +236,19 @@ describe('Wallet', function() { }); }); + it('should cleanup spenders after double-spend', function(cb) { + var t1 = bcoin.mtx().addOutput(dw, 5000); + t1.addInput(di); + wdb.addTX(t1, function(err) { + assert.noError(err); + dw.getBalance(function(err, balance) { + assert.noError(err); + assert.equal(balance.toString(10), '11000'); + cb(); + }); + }); + }); + it('should fill tx with inputs', function(cb) { wdb.create({}, function(err, w1) { assert.noError(err);