txdb: refactor batches.

This commit is contained in:
Christopher Jeffrey 2016-08-11 15:53:47 -07:00
parent 400680e208
commit 9c94fae2a8
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
2 changed files with 371 additions and 290 deletions

View File

@ -216,6 +216,7 @@ TXDB.prototype.mapAddresses = function mapAddresses(address, callback) {
*/
TXDB.prototype._addOrphan = function _addOrphan(key, hash, index, callback) {
var self = this;
var p;
this.db.get('o/' + key, function(err, buf) {
@ -229,7 +230,9 @@ TXDB.prototype._addOrphan = function _addOrphan(key, hash, index, callback) {
bcoin.outpoint(hash, index).toRaw(p);
return callback(null, p.render());
self.batch().put('o/' + key, p.render());
return callback();
});
};
@ -396,6 +399,46 @@ TXDB.prototype.removeBlock = function removeBlock(block, callback, force) {
});
};
TXDB.prototype.start = function start() {
assert(!this.current);
this.current = this.db.batch();
return this.current;
};
TXDB.prototype.put = function put(key, value) {
assert(this.current);
this.current.put(key, value);
};
TXDB.prototype.del = function del(key, value) {
assert(this.current);
this.current.del(key);
};
TXDB.prototype.batch = function batch() {
assert(this.current);
return this.current;
};
TXDB.prototype.drop = function drop() {
assert(this.current);
this.current.clear();
this.current = null;
};
TXDB.prototype.commit = function commit(callback) {
var self = this;
assert(this.current);
this.current.write(function(err) {
if (err) {
self.current = null;
return callback(err);
}
self.current = null;
return callback();
});
};
/**
* Add a transaction to the database, map addresses
* to wallet IDs, potentially store orphans, resolve
@ -406,6 +449,12 @@ TXDB.prototype.removeBlock = function removeBlock(block, callback, force) {
TXDB.prototype.add = function add(tx, callback, force) {
var self = this;
var unlock = this._lock(add, [tx, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
return this.getInfo(tx, function(err, info) {
if (err)
@ -420,22 +469,147 @@ TXDB.prototype.add = function add(tx, callback, force) {
self.logger.debug(info.paths);
return self._add(tx, info, callback, force);
return self._add(tx, info, callback);
});
};
TXDB.prototype._add = function add(tx, info, callback, force) {
TXDB.prototype._verify = function _verify(tx, info, callback) {
var self = this;
utils.forEachSerial(tx.inputs, function(input, next, i) {
var prevout = input.prevout;
var address, paths;
if (tx.isCoinbase())
return next();
address = input.getHash('hex');
paths = info.getPaths(address);
// Only bother if this input is ours.
if (!paths)
return next();
self.getCoin(prevout.hash, prevout.index, function(err, coin) {
if (err)
return next(err);
if (coin) {
// Add TX to inputs and spend money
input.coin = coin;
// Skip invalid transactions
if (self.options.verify) {
if (!tx.verifyInput(i))
return callback(null, false);
}
return next();
}
input.coin = null;
self.isSpent(prevout.hash, prevout.index, function(err, spent) {
if (err)
return next(err);
// Are we double-spending?
// Replace older txs with newer ones.
if (!spent)
return next();
self.getTX(prevout.hash, function(err, prev) {
if (err)
return next(err);
if (!prev)
return callback(new Error('Could not find double-spent coin.'));
input.coin = bcoin.coin.fromTX(prev, prevout.index);
// Skip invalid transactions
if (self.options.verify) {
if (!tx.verifyInput(i))
return callback(null, false);
}
self._removeConflict(spent.hash, tx, function(err, rtx, rinfo) {
if (err)
return next(err);
// Spender was not removed, the current
// transaction is not elligible to be added.
if (!rtx)
return callback(null, false);
self.emit('conflict', rtx, rinfo);
next();
});
});
});
});
}, function(err) {
if (err)
return callback(err);
return callback(null, true);
});
};
TXDB.prototype._resolveOrphans = function _resolveOrphans(tx, i, callback) {
var self = this;
var batch = this.batch();
var hash = tx.hash('hex');
var output = tx.outputs[i];
var key = hash + '/' + i;
var coin;
this._getOrphans(key, function(err, orphans) {
if (err)
return callback(err);
if (!orphans)
return callback(null, false);
batch.del('o/' + key);
coin = bcoin.coin.fromTX(tx, i);
// Add input to orphan
utils.forEachSerial(orphans, function(pair, next) {
var input = pair[0];
var orphan = pair[1];
// Probably removed by some other means.
if (!orphan)
return next();
orphan.inputs[input.index].coin = coin;
assert(orphan.inputs[input.index].prevout.hash === hash);
assert(orphan.inputs[input.index].prevout.index === i);
// Verify that input script is correct, if not - add
// output to unspent and remove orphan from storage
if (!self.options.verify || orphan.verifyInput(input.index)) {
batch.put('d/' + input.hash + '/' + pad32(input.index), coin.toRaw());
return callback(null, true);
}
self._lazyRemove(orphan, next);
}, function(err) {
if (err)
return next(err);
return callback(null, false);
});
});
};
TXDB.prototype._add = function add(tx, info, callback) {
var self = this;
var updated = false;
var batch, hash, i, j, unlock, path, paths, id;
unlock = this._lock(add, [tx, info, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
if (tx.mutable)
tx = tx.toTX();
@ -448,248 +622,147 @@ TXDB.prototype._add = function add(tx, info, callback, force) {
if (existing)
return callback(null, true, info);
hash = tx.hash('hex');
self._verify(tx, info, function(err, result) {
if (err)
return callback(err);
batch = self.db.batch();
if (!result)
return callback(null, result, info);
batch.put('t/' + hash, tx.toExtended());
hash = tx.hash('hex');
if (tx.ts === 0)
batch.put('p/' + hash, DUMMY);
else
batch.put('h/' + pad32(tx.height) + '/' + hash, DUMMY);
self.start();
batch = self.batch();
batch.put('t/' + hash, tx.toExtended());
batch.put('m/' + pad32(tx.ps) + '/' + hash, DUMMY);
for (i = 0; i < info.keys.length; i++) {
id = info.keys[i];
batch.put('T/' + id + '/' + hash, DUMMY);
if (tx.ts === 0)
batch.put('P/' + id + '/' + hash, DUMMY);
batch.put('p/' + hash, DUMMY);
else
batch.put('H/' + id + '/' + pad32(tx.height) + '/' + hash, DUMMY);
batch.put('M/' + id + '/' + pad32(tx.ps) + '/' + hash, DUMMY);
}
batch.put('h/' + pad32(tx.height) + '/' + hash, DUMMY);
// Consume unspent money or add orphans
utils.forEachSerial(tx.inputs, function(input, next, i) {
var prevout = input.prevout;
var key, address;
batch.put('m/' + pad32(tx.ps) + '/' + hash, DUMMY);
if (tx.isCoinbase())
return next();
for (i = 0; i < info.keys.length; i++) {
id = info.keys[i];
batch.put('T/' + id + '/' + hash, DUMMY);
if (tx.ts === 0)
batch.put('P/' + id + '/' + hash, DUMMY);
else
batch.put('H/' + id + '/' + pad32(tx.height) + '/' + hash, DUMMY);
batch.put('M/' + id + '/' + pad32(tx.ps) + '/' + hash, DUMMY);
}
address = input.getHash('hex');
paths = info.getPaths(address);
// Consume unspent money or add orphans
utils.forEachSerial(tx.inputs, function(input, next, i) {
var prevout = input.prevout;
var key, address;
// Only bother if this input is ours.
if (!paths)
return next();
if (tx.isCoinbase())
return next();
self.getCoin(prevout.hash, prevout.index, function(err, coin) {
if (err)
return next(err);
address = input.getHash('hex');
paths = info.getPaths(address);
// Only bother if this input is ours.
if (!paths)
return next();
key = prevout.hash + '/' + prevout.index;
// s/[outpoint-key] -> [spender-hash]|[spender-input-index]
batch.put('s/' + key, bcoin.outpoint.fromTX(tx, i).toRaw());
if (coin) {
// Add TX to inputs and spend money
input.coin = coin;
// Skip invalid transactions
if (self.options.verify) {
if (!tx.verifyInput(i))
return callback(null, false);
}
updated = true;
for (j = 0; j < paths.length; j++) {
path = paths[j];
id = path.id + '/' + path.account;
batch.del('C/' + id + '/' + key);
}
batch.del('c/' + key);
batch.put('d/' + tx.hash('hex') + '/' + pad32(i), coin.toRaw());
self.coinCache.remove(key);
return next();
if (!input.coin) {
// Add orphan, if no parent transaction is yet known
return self._addOrphan(key, hash, i, next);
}
input.coin = null;
updated = true;
self.isSpent(prevout.hash, prevout.index, function(err, spent) {
if (err)
return next(err);
for (j = 0; j < paths.length; j++) {
path = paths[j];
id = path.id + '/' + path.account;
batch.del('C/' + id + '/' + key);
}
// Are we double-spending?
// Replace older txs with newer ones.
if (spent) {
return self.getTX(prevout.hash, function(err, prev) {
if (err)
return next(err);
batch.del('c/' + key);
batch.put('d/' + hash + '/' + pad32(i), input.coin.toRaw());
if (!prev)
return callback(new Error('Could not find double-spent coin.'));
self.coinCache.remove(key);
input.coin = bcoin.coin.fromTX(prev, prevout.index);
// Skip invalid transactions
if (self.options.verify) {
if (!tx.verifyInput(i))
return callback(null, false, info);
}
return self._removeConflict(spent, tx, function(err, rtx, rinfo) {
if (err)
return next(err);
// Spender was not removed, the current
// transaction is not elligible to be added.
if (!rtx)
return callback(null, false, info);
self.emit('conflict', rtx, rinfo);
batch.clear();
self._add(tx, info, callback, true);
});
});
}
// Add orphan, if no parent transaction is yet known
self._addOrphan(key, hash, i, function(err, orphans) {
if (err)
return next(err);
batch.put('o/' + key, orphans);
return next();
});
});
});
}, function(err) {
if (err)
return callback(err);
// Add unspent outputs or resolve orphans
utils.forEachSerial(tx.outputs, function(output, next, i) {
var address = output.getHash('hex');
var key = hash + '/' + i;
var coin;
if (output.script.isUnspendable())
return next();
paths = info.getPaths(address);
// Do not add unspents for outputs that aren't ours.
if (!paths)
return next();
coin = bcoin.coin.fromTX(tx, i);
self._getOrphans(key, function(err, orphans) {
var some = false;
if (err)
return callback(err);
if (!orphans)
return finish();
// Add input to orphan
utils.forEachSerial(orphans, function(pair, next) {
if (some)
return next();
var input = pair[0];
var orphan = pair[1];
// Probably removed by some other means.
if (!orphan)
return next();
orphan.inputs[input.index].coin = coin;
assert(orphan.inputs[input.index].prevout.hash === hash);
assert(orphan.inputs[input.index].prevout.index === i);
// Verify that input script is correct, if not - add
// output to unspent and remove orphan from storage
if (!self.options.verify || orphan.verifyInput(input.index)) {
some = true;
batch.put('d/' + input.hash + '/' + pad32(input.index), coin.toRaw());
return next();
}
self.lazyRemove(orphan, next, true);
}, function(err) {
if (err)
return next(err);
if (!some)
orphans = null;
self.db.del('o/' + key, finish);
});
function finish(err) {
if (err)
return next(err);
if (!orphans) {
for (j = 0; j < paths.length; j++) {
path = paths[j];
id = path.id + '/' + path.account;
batch.put('C/' + id + '/' + key, DUMMY);
}
coin = coin.toRaw();
batch.put('c/' + key, coin);
self.coinCache.set(key, coin);
updated = true;
}
next();
}
});
next();
}, function(err) {
if (err)
return callback(err);
batch.write(function(err) {
if (err)
return callback(err);
// Add unspent outputs or resolve orphans
utils.forEachSerial(tx.outputs, function(output, next, i) {
var address = output.getHash('hex');
var key = hash + '/' + i;
var coin;
self.walletdb.handleTX(tx, info, function(err) {
if (output.script.isUnspendable())
return next();
paths = info.getPaths(address);
// Do not add unspents for outputs that aren't ours.
if (!paths)
return next();
self._resolveOrphans(tx, i, function(err, orphans) {
if (err)
return callback(err);
self.emit('tx', tx, info);
if (orphans)
return next();
if (updated) {
if (tx.ts !== 0)
self.emit('confirmed', tx, info);
coin = bcoin.coin.fromTX(tx, i);
self.emit('updated', tx, info);
for (j = 0; j < paths.length; j++) {
path = paths[j];
id = path.id + '/' + path.account;
batch.put('C/' + id + '/' + key, DUMMY);
}
return callback(null, true, info);
coin = coin.toRaw();
batch.put('c/' + key, coin);
self.coinCache.set(key, coin);
updated = true;
next();
});
}, function(err) {
if (err)
return callback(err);
self.commit(function(err) {
if (err)
return callback(err);
self.walletdb.handleTX(tx, info, function(err) {
if (err)
return callback(err);
self.emit('tx', tx, info);
if (updated) {
if (tx.ts !== 0)
self.emit('confirmed', tx, info);
self.emit('updated', tx, info);
}
return callback(null, true, info);
});
});
});
});
});
}, true);
});
};
/**
@ -763,7 +836,7 @@ TXDB.prototype._removeRecursive = function _removeRecursive(tx, callback) {
// Remove all of the spender's spenders first.
if (spent) {
return self.getTX(spent, function(err, tx) {
return self.getTX(spent.hash, function(err, tx) {
if (err)
return callback(err);
@ -780,8 +853,19 @@ TXDB.prototype._removeRecursive = function _removeRecursive(tx, callback) {
if (err)
return callback(err);
self.start();
// Remove the spender.
return self.lazyRemove(tx, callback, true);
self._lazyRemove(tx, function(err, res1, res2) {
if (err)
return callback(err);
self.commit(function(err) {
if (err)
return callback(err);
callback(null, res1, res2);
});
});
});
};
@ -817,8 +901,8 @@ TXDB.prototype.isDoubleSpend = function isDoubleSpend(tx, callback) {
TXDB.prototype.isSpent = function isSpent(hash, index, callback) {
var key = 's/' + hash + '/' + index;
return this.db.fetch(key, function(hash) {
return hash.toString('hex', 0, 32);
return this.db.fetch(key, function(data) {
return bcoin.outpoint.fromRaw(data);
}, callback);
};
@ -832,17 +916,10 @@ TXDB.prototype.isSpent = function isSpent(hash, index, callback) {
* transaction was confirmed, or should be ignored.
*/
TXDB.prototype._confirm = function _confirm(tx, info, callback, force) {
TXDB.prototype._confirm = function _confirm(tx, info, callback) {
var self = this;
var hash, batch, unlock, i, id;
unlock = this._lock(_confirm, [tx, info, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
hash = tx.hash('hex');
this.getTX(hash, function(err, existing) {
@ -862,12 +939,12 @@ TXDB.prototype._confirm = function _confirm(tx, info, callback, force) {
if (tx.ts === 0)
return callback(null, true, info);
batch = self.db.batch();
// Tricky - update the tx and coin in storage,
// and remove pending flag to mark as confirmed.
assert(tx.height >= 0);
self.start();
batch = self.batch();
batch.put('t/' + hash, tx.toExtended());
batch.del('p/' + hash);
@ -909,19 +986,13 @@ TXDB.prototype._confirm = function _confirm(tx, info, callback, force) {
if (err)
return callback(err);
batch.write(function(err) {
self.emit('confirmed', tx, info);
self.emit('tx', tx, info);
self.commit(function(err) {
if (err)
return callback(err);
self.walletdb.syncOutputs(tx, info, function(err) {
if (err)
return callback(err);
self.emit('confirmed', tx, info);
self.emit('tx', tx, info);
return callback(null, true, info);
});
return callback(null, true, info);
});
});
});
@ -955,7 +1026,19 @@ TXDB.prototype.remove = function remove(hash, callback, force) {
if (!info)
return callback(null, false);
return self._remove(tx, info, callback, force);
self.start();
return self._remove(tx, info, function(err, res1, res2) {
if (err) {
self.drop();
return callback(err);
}
self.commit(function(err) {
if (err)
return callback(err);
callback(null, res1, res2);
});
});
});
});
};
@ -968,17 +1051,16 @@ TXDB.prototype.remove = function remove(hash, callback, force) {
* @param {Function} callback - Returns [Error].
*/
TXDB.prototype.lazyRemove = function lazyRemove(tx, callback, force) {
TXDB.prototype._lazyRemove = function lazyRemove(tx, callback) {
var self = this;
return this.getInfo(tx, function(err, info) {
this.getInfo(tx, function(err, info) {
if (err)
return callback(err);
if (!info)
return callback(null, false);
return self._remove(tx, info, callback, force);
return self._remove(tx, info, callback);
});
};
@ -990,21 +1072,14 @@ TXDB.prototype.lazyRemove = function lazyRemove(tx, callback, force) {
* @param {Function} callback - Returns [Error].
*/
TXDB.prototype._remove = function remove(tx, info, callback, force) {
TXDB.prototype._remove = function remove(tx, info, callback) {
var self = this;
var unlock, hash, batch, i, j, path, id;
var key, paths, address, input, output, coin;
unlock = this._lock(remove, [tx, info, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
hash = tx.hash('hex');
batch = this.db.batch();
batch = this.batch();
batch.del('t/' + hash);
@ -1085,14 +1160,9 @@ TXDB.prototype._remove = function remove(tx, info, callback, force) {
self.coinCache.remove(key);
}
batch.write(function(err) {
if (err)
return callback(err);
self.emit('remove tx', tx, info);
self.emit('remove tx', tx, info);
return callback(null, true, info);
});
return callback(null, true, info);
});
};
@ -1104,12 +1174,16 @@ TXDB.prototype._remove = function remove(tx, info, callback, force) {
TXDB.prototype.unconfirm = function unconfirm(hash, callback, force) {
var self = this;
var unlock = this._lock(unconfirm, [tx, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
if (hash.hash)
hash = hash.hash('hex');
callback = utils.ensure(callback);
this.getTX(hash, function(err, tx) {
if (err)
return callback(err);
@ -1126,7 +1200,19 @@ TXDB.prototype.unconfirm = function unconfirm(hash, callback, force) {
if (!info)
return callback(null, false);
return self._unconfirm(tx, info, callback, force);
self.start();
return self._unconfirm(tx, info, function(err, res1, res2) {
if (err) {
self.drop();
return callback(err);
}
self.commit(function(err) {
if (err)
return callback(err);
callback(null, res1, res2);
});
});
});
});
};
@ -1142,13 +1228,7 @@ TXDB.prototype._unconfirm = function unconfirm(tx, info, callback, force) {
var self = this;
var batch, unlock, hash, height, i, id;
unlock = this._lock(unconfirm, [tx, info, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
batch = this.batch();
hash = tx.hash('hex');
height = tx.height;
@ -1197,14 +1277,9 @@ TXDB.prototype._unconfirm = function unconfirm(tx, info, callback, force) {
if (err)
return callback(err);
batch.write(function(err) {
if (err)
return callback(err);
self.emit('unconfirmed', tx, info);
self.emit('unconfirmed', tx, info);
return callback(null, true, info);
});
return callback(null, true, info);
});
};
@ -1907,7 +1982,7 @@ TXDB.prototype.zap = function zap(id, age, callback, force) {
utils.forEachSerial(txs, function(tx, next) {
if (tx.ts !== 0)
return next();
self.lazyRemove(tx, next, true);
self.remove(tx.hash('hex'), next, true);
}, callback);
});
});

View File

@ -313,24 +313,30 @@ describe('Wallet', function() {
return t + tx.getOutputValue();
}, 0);
assert.equal(total, 154000);
dw.sign(t1, function(err) {
dw.getCoins(function(err, coins) {
assert.ifError(err);
dw.getBalance(function(err, balance) {
dw.sign(t1, function(err) {
assert.ifError(err);
assert.equal(balance.total, 11000);
walletdb.addTX(t1, function(err) {
dw.getBalance(function(err, balance) {
assert.ifError(err);
dw.getBalance(function(err, balance) {
assert.equal(balance.total, 11000);
walletdb.addTX(t1, function(err) {
assert.ifError(err);
assert.equal(balance.total, 6000);
dw.getHistory(function(err, txs) {
dw.getCoins(function(err, coins) {
assert.ifError(err);
assert.equal(txs.length, 2);
var total = txs.reduce(function(t, tx) {
return t + tx.getOutputValue();
}, 0);
assert.equal(total, 56000);
cb();
dw.getBalance(function(err, balance) {
assert.ifError(err);
assert.equal(balance.total, 6000);
dw.getHistory(function(err, txs) {
assert.ifError(err);
assert.equal(txs.length, 2);
var total = txs.reduce(function(t, tx) {
return t + tx.getOutputValue();
}, 0);
assert.equal(total, 56000);
cb();
});
});
});
});
});