From 9c94fae2a8f44fe32ad0e94d5e2e8acd769e4497 Mon Sep 17 00:00:00 2001 From: Christopher Jeffrey Date: Thu, 11 Aug 2016 15:53:47 -0700 Subject: [PATCH] txdb: refactor batches. --- lib/bcoin/txdb.js | 629 +++++++++++++++++++++++++------------------- test/wallet-test.js | 32 ++- 2 files changed, 371 insertions(+), 290 deletions(-) diff --git a/lib/bcoin/txdb.js b/lib/bcoin/txdb.js index 3e6a00c2..1afa56a7 100644 --- a/lib/bcoin/txdb.js +++ b/lib/bcoin/txdb.js @@ -216,6 +216,7 @@ TXDB.prototype.mapAddresses = function mapAddresses(address, callback) { */ TXDB.prototype._addOrphan = function _addOrphan(key, hash, index, callback) { + var self = this; var p; this.db.get('o/' + key, function(err, buf) { @@ -229,7 +230,9 @@ TXDB.prototype._addOrphan = function _addOrphan(key, hash, index, callback) { bcoin.outpoint(hash, index).toRaw(p); - return callback(null, p.render()); + self.batch().put('o/' + key, p.render()); + + return callback(); }); }; @@ -396,6 +399,46 @@ TXDB.prototype.removeBlock = function removeBlock(block, callback, force) { }); }; +TXDB.prototype.start = function start() { + assert(!this.current); + this.current = this.db.batch(); + return this.current; +}; + +TXDB.prototype.put = function put(key, value) { + assert(this.current); + this.current.put(key, value); +}; + +TXDB.prototype.del = function del(key, value) { + assert(this.current); + this.current.del(key); +}; + +TXDB.prototype.batch = function batch() { + assert(this.current); + return this.current; +}; + +TXDB.prototype.drop = function drop() { + assert(this.current); + this.current.clear(); + this.current = null; +}; + +TXDB.prototype.commit = function commit(callback) { + var self = this; + assert(this.current); + this.current.write(function(err) { + if (err) { + self.current = null; + return callback(err); + } + self.current = null; + return callback(); + }); +}; + /** * Add a transaction to the database, map addresses * to wallet IDs, potentially store orphans, resolve @@ -406,6 +449,12 @@ TXDB.prototype.removeBlock = function removeBlock(block, callback, force) { TXDB.prototype.add = function add(tx, callback, force) { var self = this; + var unlock = this._lock(add, [tx, callback], force); + + if (!unlock) + return; + + callback = utils.wrap(callback, unlock); return this.getInfo(tx, function(err, info) { if (err) @@ -420,22 +469,147 @@ TXDB.prototype.add = function add(tx, callback, force) { self.logger.debug(info.paths); - return self._add(tx, info, callback, force); + return self._add(tx, info, callback); }); }; -TXDB.prototype._add = function add(tx, info, callback, force) { +TXDB.prototype._verify = function _verify(tx, info, callback) { + var self = this; + utils.forEachSerial(tx.inputs, function(input, next, i) { + var prevout = input.prevout; + var address, paths; + + if (tx.isCoinbase()) + return next(); + + address = input.getHash('hex'); + paths = info.getPaths(address); + + // Only bother if this input is ours. + if (!paths) + return next(); + + self.getCoin(prevout.hash, prevout.index, function(err, coin) { + if (err) + return next(err); + + if (coin) { + // Add TX to inputs and spend money + input.coin = coin; + + // Skip invalid transactions + if (self.options.verify) { + if (!tx.verifyInput(i)) + return callback(null, false); + } + + return next(); + } + + input.coin = null; + + self.isSpent(prevout.hash, prevout.index, function(err, spent) { + if (err) + return next(err); + + // Are we double-spending? + // Replace older txs with newer ones. + if (!spent) + return next(); + + self.getTX(prevout.hash, function(err, prev) { + if (err) + return next(err); + + if (!prev) + return callback(new Error('Could not find double-spent coin.')); + + input.coin = bcoin.coin.fromTX(prev, prevout.index); + + // Skip invalid transactions + if (self.options.verify) { + if (!tx.verifyInput(i)) + return callback(null, false); + } + + self._removeConflict(spent.hash, tx, function(err, rtx, rinfo) { + if (err) + return next(err); + + // Spender was not removed, the current + // transaction is not elligible to be added. + if (!rtx) + return callback(null, false); + + self.emit('conflict', rtx, rinfo); + + next(); + }); + }); + }); + }); + }, function(err) { + if (err) + return callback(err); + return callback(null, true); + }); +}; + +TXDB.prototype._resolveOrphans = function _resolveOrphans(tx, i, callback) { + var self = this; + var batch = this.batch(); + var hash = tx.hash('hex'); + var output = tx.outputs[i]; + var key = hash + '/' + i; + var coin; + + this._getOrphans(key, function(err, orphans) { + if (err) + return callback(err); + + if (!orphans) + return callback(null, false); + + batch.del('o/' + key); + + coin = bcoin.coin.fromTX(tx, i); + + // Add input to orphan + utils.forEachSerial(orphans, function(pair, next) { + var input = pair[0]; + var orphan = pair[1]; + + // Probably removed by some other means. + if (!orphan) + return next(); + + orphan.inputs[input.index].coin = coin; + + assert(orphan.inputs[input.index].prevout.hash === hash); + assert(orphan.inputs[input.index].prevout.index === i); + + // Verify that input script is correct, if not - add + // output to unspent and remove orphan from storage + if (!self.options.verify || orphan.verifyInput(input.index)) { + batch.put('d/' + input.hash + '/' + pad32(input.index), coin.toRaw()); + return callback(null, true); + } + + self._lazyRemove(orphan, next); + }, function(err) { + if (err) + return next(err); + + return callback(null, false); + }); + }); +}; + +TXDB.prototype._add = function add(tx, info, callback) { var self = this; var updated = false; var batch, hash, i, j, unlock, path, paths, id; - unlock = this._lock(add, [tx, info, callback], force); - - if (!unlock) - return; - - callback = utils.wrap(callback, unlock); - if (tx.mutable) tx = tx.toTX(); @@ -448,248 +622,147 @@ TXDB.prototype._add = function add(tx, info, callback, force) { if (existing) return callback(null, true, info); - hash = tx.hash('hex'); + self._verify(tx, info, function(err, result) { + if (err) + return callback(err); - batch = self.db.batch(); + if (!result) + return callback(null, result, info); - batch.put('t/' + hash, tx.toExtended()); + hash = tx.hash('hex'); - if (tx.ts === 0) - batch.put('p/' + hash, DUMMY); - else - batch.put('h/' + pad32(tx.height) + '/' + hash, DUMMY); + self.start(); + batch = self.batch(); + batch.put('t/' + hash, tx.toExtended()); - batch.put('m/' + pad32(tx.ps) + '/' + hash, DUMMY); - - for (i = 0; i < info.keys.length; i++) { - id = info.keys[i]; - batch.put('T/' + id + '/' + hash, DUMMY); if (tx.ts === 0) - batch.put('P/' + id + '/' + hash, DUMMY); + batch.put('p/' + hash, DUMMY); else - batch.put('H/' + id + '/' + pad32(tx.height) + '/' + hash, DUMMY); - batch.put('M/' + id + '/' + pad32(tx.ps) + '/' + hash, DUMMY); - } + batch.put('h/' + pad32(tx.height) + '/' + hash, DUMMY); - // Consume unspent money or add orphans - utils.forEachSerial(tx.inputs, function(input, next, i) { - var prevout = input.prevout; - var key, address; + batch.put('m/' + pad32(tx.ps) + '/' + hash, DUMMY); - if (tx.isCoinbase()) - return next(); + for (i = 0; i < info.keys.length; i++) { + id = info.keys[i]; + batch.put('T/' + id + '/' + hash, DUMMY); + if (tx.ts === 0) + batch.put('P/' + id + '/' + hash, DUMMY); + else + batch.put('H/' + id + '/' + pad32(tx.height) + '/' + hash, DUMMY); + batch.put('M/' + id + '/' + pad32(tx.ps) + '/' + hash, DUMMY); + } - address = input.getHash('hex'); - paths = info.getPaths(address); + // Consume unspent money or add orphans + utils.forEachSerial(tx.inputs, function(input, next, i) { + var prevout = input.prevout; + var key, address; - // Only bother if this input is ours. - if (!paths) - return next(); + if (tx.isCoinbase()) + return next(); - self.getCoin(prevout.hash, prevout.index, function(err, coin) { - if (err) - return next(err); + address = input.getHash('hex'); + paths = info.getPaths(address); + + // Only bother if this input is ours. + if (!paths) + return next(); key = prevout.hash + '/' + prevout.index; // s/[outpoint-key] -> [spender-hash]|[spender-input-index] batch.put('s/' + key, bcoin.outpoint.fromTX(tx, i).toRaw()); - if (coin) { - // Add TX to inputs and spend money - input.coin = coin; - - // Skip invalid transactions - if (self.options.verify) { - if (!tx.verifyInput(i)) - return callback(null, false); - } - - updated = true; - - for (j = 0; j < paths.length; j++) { - path = paths[j]; - id = path.id + '/' + path.account; - batch.del('C/' + id + '/' + key); - } - - batch.del('c/' + key); - batch.put('d/' + tx.hash('hex') + '/' + pad32(i), coin.toRaw()); - - self.coinCache.remove(key); - - return next(); + if (!input.coin) { + // Add orphan, if no parent transaction is yet known + return self._addOrphan(key, hash, i, next); } - input.coin = null; + updated = true; - self.isSpent(prevout.hash, prevout.index, function(err, spent) { - if (err) - return next(err); + for (j = 0; j < paths.length; j++) { + path = paths[j]; + id = path.id + '/' + path.account; + batch.del('C/' + id + '/' + key); + } - // Are we double-spending? - // Replace older txs with newer ones. - if (spent) { - return self.getTX(prevout.hash, function(err, prev) { - if (err) - return next(err); + batch.del('c/' + key); + batch.put('d/' + hash + '/' + pad32(i), input.coin.toRaw()); - if (!prev) - return callback(new Error('Could not find double-spent coin.')); + self.coinCache.remove(key); - input.coin = bcoin.coin.fromTX(prev, prevout.index); - - // Skip invalid transactions - if (self.options.verify) { - if (!tx.verifyInput(i)) - return callback(null, false, info); - } - - return self._removeConflict(spent, tx, function(err, rtx, rinfo) { - if (err) - return next(err); - - // Spender was not removed, the current - // transaction is not elligible to be added. - if (!rtx) - return callback(null, false, info); - - self.emit('conflict', rtx, rinfo); - - batch.clear(); - - self._add(tx, info, callback, true); - }); - }); - } - - // Add orphan, if no parent transaction is yet known - self._addOrphan(key, hash, i, function(err, orphans) { - if (err) - return next(err); - - batch.put('o/' + key, orphans); - - return next(); - }); - }); - }); - }, function(err) { - if (err) - return callback(err); - - // Add unspent outputs or resolve orphans - utils.forEachSerial(tx.outputs, function(output, next, i) { - var address = output.getHash('hex'); - var key = hash + '/' + i; - var coin; - - if (output.script.isUnspendable()) - return next(); - - paths = info.getPaths(address); - - // Do not add unspents for outputs that aren't ours. - if (!paths) - return next(); - - coin = bcoin.coin.fromTX(tx, i); - - self._getOrphans(key, function(err, orphans) { - var some = false; - - if (err) - return callback(err); - - if (!orphans) - return finish(); - - // Add input to orphan - utils.forEachSerial(orphans, function(pair, next) { - if (some) - return next(); - - var input = pair[0]; - var orphan = pair[1]; - - // Probably removed by some other means. - if (!orphan) - return next(); - - orphan.inputs[input.index].coin = coin; - - assert(orphan.inputs[input.index].prevout.hash === hash); - assert(orphan.inputs[input.index].prevout.index === i); - - // Verify that input script is correct, if not - add - // output to unspent and remove orphan from storage - if (!self.options.verify || orphan.verifyInput(input.index)) { - some = true; - batch.put('d/' + input.hash + '/' + pad32(input.index), coin.toRaw()); - return next(); - } - - self.lazyRemove(orphan, next, true); - }, function(err) { - if (err) - return next(err); - - if (!some) - orphans = null; - - self.db.del('o/' + key, finish); - }); - - function finish(err) { - if (err) - return next(err); - - if (!orphans) { - for (j = 0; j < paths.length; j++) { - path = paths[j]; - id = path.id + '/' + path.account; - batch.put('C/' + id + '/' + key, DUMMY); - } - - coin = coin.toRaw(); - - batch.put('c/' + key, coin); - - self.coinCache.set(key, coin); - - updated = true; - } - - next(); - } - }); + next(); }, function(err) { if (err) return callback(err); - batch.write(function(err) { - if (err) - return callback(err); + // Add unspent outputs or resolve orphans + utils.forEachSerial(tx.outputs, function(output, next, i) { + var address = output.getHash('hex'); + var key = hash + '/' + i; + var coin; - self.walletdb.handleTX(tx, info, function(err) { + if (output.script.isUnspendable()) + return next(); + + paths = info.getPaths(address); + + // Do not add unspents for outputs that aren't ours. + if (!paths) + return next(); + + self._resolveOrphans(tx, i, function(err, orphans) { if (err) return callback(err); - self.emit('tx', tx, info); + if (orphans) + return next(); - if (updated) { - if (tx.ts !== 0) - self.emit('confirmed', tx, info); + coin = bcoin.coin.fromTX(tx, i); - self.emit('updated', tx, info); + for (j = 0; j < paths.length; j++) { + path = paths[j]; + id = path.id + '/' + path.account; + batch.put('C/' + id + '/' + key, DUMMY); } - return callback(null, true, info); + coin = coin.toRaw(); + + batch.put('c/' + key, coin); + + self.coinCache.set(key, coin); + + updated = true; + + next(); + }); + }, function(err) { + if (err) + return callback(err); + + self.commit(function(err) { + if (err) + return callback(err); + + self.walletdb.handleTX(tx, info, function(err) { + if (err) + return callback(err); + + self.emit('tx', tx, info); + + if (updated) { + if (tx.ts !== 0) + self.emit('confirmed', tx, info); + + self.emit('updated', tx, info); + } + + return callback(null, true, info); + }); }); }); }); }); - }, true); + }); }; /** @@ -763,7 +836,7 @@ TXDB.prototype._removeRecursive = function _removeRecursive(tx, callback) { // Remove all of the spender's spenders first. if (spent) { - return self.getTX(spent, function(err, tx) { + return self.getTX(spent.hash, function(err, tx) { if (err) return callback(err); @@ -780,8 +853,19 @@ TXDB.prototype._removeRecursive = function _removeRecursive(tx, callback) { if (err) return callback(err); + self.start(); + // Remove the spender. - return self.lazyRemove(tx, callback, true); + self._lazyRemove(tx, function(err, res1, res2) { + if (err) + return callback(err); + + self.commit(function(err) { + if (err) + return callback(err); + callback(null, res1, res2); + }); + }); }); }; @@ -817,8 +901,8 @@ TXDB.prototype.isDoubleSpend = function isDoubleSpend(tx, callback) { TXDB.prototype.isSpent = function isSpent(hash, index, callback) { var key = 's/' + hash + '/' + index; - return this.db.fetch(key, function(hash) { - return hash.toString('hex', 0, 32); + return this.db.fetch(key, function(data) { + return bcoin.outpoint.fromRaw(data); }, callback); }; @@ -832,17 +916,10 @@ TXDB.prototype.isSpent = function isSpent(hash, index, callback) { * transaction was confirmed, or should be ignored. */ -TXDB.prototype._confirm = function _confirm(tx, info, callback, force) { +TXDB.prototype._confirm = function _confirm(tx, info, callback) { var self = this; var hash, batch, unlock, i, id; - unlock = this._lock(_confirm, [tx, info, callback], force); - - if (!unlock) - return; - - callback = utils.wrap(callback, unlock); - hash = tx.hash('hex'); this.getTX(hash, function(err, existing) { @@ -862,12 +939,12 @@ TXDB.prototype._confirm = function _confirm(tx, info, callback, force) { if (tx.ts === 0) return callback(null, true, info); - batch = self.db.batch(); - // Tricky - update the tx and coin in storage, // and remove pending flag to mark as confirmed. assert(tx.height >= 0); + self.start(); + batch = self.batch(); batch.put('t/' + hash, tx.toExtended()); batch.del('p/' + hash); @@ -909,19 +986,13 @@ TXDB.prototype._confirm = function _confirm(tx, info, callback, force) { if (err) return callback(err); - batch.write(function(err) { + self.emit('confirmed', tx, info); + self.emit('tx', tx, info); + + self.commit(function(err) { if (err) return callback(err); - - self.walletdb.syncOutputs(tx, info, function(err) { - if (err) - return callback(err); - - self.emit('confirmed', tx, info); - self.emit('tx', tx, info); - - return callback(null, true, info); - }); + return callback(null, true, info); }); }); }); @@ -955,7 +1026,19 @@ TXDB.prototype.remove = function remove(hash, callback, force) { if (!info) return callback(null, false); - return self._remove(tx, info, callback, force); + self.start(); + + return self._remove(tx, info, function(err, res1, res2) { + if (err) { + self.drop(); + return callback(err); + } + self.commit(function(err) { + if (err) + return callback(err); + callback(null, res1, res2); + }); + }); }); }); }; @@ -968,17 +1051,16 @@ TXDB.prototype.remove = function remove(hash, callback, force) { * @param {Function} callback - Returns [Error]. */ -TXDB.prototype.lazyRemove = function lazyRemove(tx, callback, force) { +TXDB.prototype._lazyRemove = function lazyRemove(tx, callback) { var self = this; - - return this.getInfo(tx, function(err, info) { + this.getInfo(tx, function(err, info) { if (err) return callback(err); if (!info) return callback(null, false); - return self._remove(tx, info, callback, force); + return self._remove(tx, info, callback); }); }; @@ -990,21 +1072,14 @@ TXDB.prototype.lazyRemove = function lazyRemove(tx, callback, force) { * @param {Function} callback - Returns [Error]. */ -TXDB.prototype._remove = function remove(tx, info, callback, force) { +TXDB.prototype._remove = function remove(tx, info, callback) { var self = this; var unlock, hash, batch, i, j, path, id; var key, paths, address, input, output, coin; - unlock = this._lock(remove, [tx, info, callback], force); - - if (!unlock) - return; - - callback = utils.wrap(callback, unlock); - hash = tx.hash('hex'); - batch = this.db.batch(); + batch = this.batch(); batch.del('t/' + hash); @@ -1085,14 +1160,9 @@ TXDB.prototype._remove = function remove(tx, info, callback, force) { self.coinCache.remove(key); } - batch.write(function(err) { - if (err) - return callback(err); + self.emit('remove tx', tx, info); - self.emit('remove tx', tx, info); - - return callback(null, true, info); - }); + return callback(null, true, info); }); }; @@ -1104,12 +1174,16 @@ TXDB.prototype._remove = function remove(tx, info, callback, force) { TXDB.prototype.unconfirm = function unconfirm(hash, callback, force) { var self = this; + var unlock = this._lock(unconfirm, [tx, callback], force); + + if (!unlock) + return; + + callback = utils.wrap(callback, unlock); if (hash.hash) hash = hash.hash('hex'); - callback = utils.ensure(callback); - this.getTX(hash, function(err, tx) { if (err) return callback(err); @@ -1126,7 +1200,19 @@ TXDB.prototype.unconfirm = function unconfirm(hash, callback, force) { if (!info) return callback(null, false); - return self._unconfirm(tx, info, callback, force); + self.start(); + + return self._unconfirm(tx, info, function(err, res1, res2) { + if (err) { + self.drop(); + return callback(err); + } + self.commit(function(err) { + if (err) + return callback(err); + callback(null, res1, res2); + }); + }); }); }); }; @@ -1142,13 +1228,7 @@ TXDB.prototype._unconfirm = function unconfirm(tx, info, callback, force) { var self = this; var batch, unlock, hash, height, i, id; - unlock = this._lock(unconfirm, [tx, info, callback], force); - - if (!unlock) - return; - - callback = utils.wrap(callback, unlock); - + batch = this.batch(); hash = tx.hash('hex'); height = tx.height; @@ -1197,14 +1277,9 @@ TXDB.prototype._unconfirm = function unconfirm(tx, info, callback, force) { if (err) return callback(err); - batch.write(function(err) { - if (err) - return callback(err); + self.emit('unconfirmed', tx, info); - self.emit('unconfirmed', tx, info); - - return callback(null, true, info); - }); + return callback(null, true, info); }); }; @@ -1907,7 +1982,7 @@ TXDB.prototype.zap = function zap(id, age, callback, force) { utils.forEachSerial(txs, function(tx, next) { if (tx.ts !== 0) return next(); - self.lazyRemove(tx, next, true); + self.remove(tx.hash('hex'), next, true); }, callback); }); }); diff --git a/test/wallet-test.js b/test/wallet-test.js index c684fbac..f0ebeec4 100644 --- a/test/wallet-test.js +++ b/test/wallet-test.js @@ -313,24 +313,30 @@ describe('Wallet', function() { return t + tx.getOutputValue(); }, 0); assert.equal(total, 154000); - dw.sign(t1, function(err) { + dw.getCoins(function(err, coins) { assert.ifError(err); - dw.getBalance(function(err, balance) { + dw.sign(t1, function(err) { assert.ifError(err); - assert.equal(balance.total, 11000); - walletdb.addTX(t1, function(err) { + dw.getBalance(function(err, balance) { assert.ifError(err); - dw.getBalance(function(err, balance) { + assert.equal(balance.total, 11000); + walletdb.addTX(t1, function(err) { assert.ifError(err); - assert.equal(balance.total, 6000); - dw.getHistory(function(err, txs) { + dw.getCoins(function(err, coins) { assert.ifError(err); - assert.equal(txs.length, 2); - var total = txs.reduce(function(t, tx) { - return t + tx.getOutputValue(); - }, 0); - assert.equal(total, 56000); - cb(); + dw.getBalance(function(err, balance) { + assert.ifError(err); + assert.equal(balance.total, 6000); + dw.getHistory(function(err, txs) { + assert.ifError(err); + assert.equal(txs.length, 2); + var total = txs.reduce(function(t, tx) { + return t + tx.getOutputValue(); + }, 0); + assert.equal(total, 56000); + cb(); + }); + }); }); }); });