txdb and mempool work.

This commit is contained in:
Christopher Jeffrey 2016-03-26 01:29:04 -07:00
parent e205c70f97
commit 37c488c802
4 changed files with 303 additions and 222 deletions

View File

@ -118,12 +118,21 @@ Fullnode.prototype._init = function _init() {
// Update the mempool.
this.chain.on('add block', function(block) {
self.mempool.addBlock(block);
self.mempool.addBlock(block, function(err) {
if (err)
self.emit('error', err);
});
});
this.chain.on('remove block', function(block) {
self.mempool.removeBlock(block);
self.walletdb.removeBlock(block);
self.mempool.removeBlock(block, function(err) {
if (err)
self.emit('error', err);
});
self.walletdb.removeBlock(block, function(err) {
if (err)
self.emit('error', err);
});
});
function load(err) {
@ -149,7 +158,7 @@ Fullnode.prototype._init = function _init() {
self.createWallet(options, function(err, wallet) {
if (err)
throw err;
return self.emit('error', err);
// Set the miner payout address if the
// programmer didn't pass one in.
@ -340,22 +349,14 @@ Fullnode.prototype.getTXByAddress = function getTXByAddress(addresses, callback)
Fullnode.prototype.fillCoin = function fillCoin(tx, callback) {
var self = this;
this.mempool.tx.isDoubleSpend(tx, function(err, result) {
this.mempool.fillCoin(tx, function(err) {
if (err)
return callback(err);
if (result)
return callback(null, tx, true);
if (tx.hasPrevout())
return callback(null, tx);
self.mempool.fillCoin(tx, function(err) {
if (err)
return callback(err);
if (tx.hasPrevout())
return callback(null, tx);
self.chain.db.fillCoin(tx, callback);
});
self.chain.db.fillCoin(tx, callback);
});
};

View File

@ -31,7 +31,6 @@ function Mempool(node, options) {
this.options = options;
this.node = node;
this.chain = node.chain;
// this.db = node.chain.db.db;
this.db = bcoin.ldb('mempool', {
db: 'memdown'
@ -100,22 +99,34 @@ Mempool.prototype.open = function open(callback) {
return this.db.open(callback);
};
Mempool.prototype.addBlock = function addBlock(block, callback) {
Mempool.prototype.addBlock = function addBlock(block, callback, force) {
var self = this;
callback = utils.ensure(callback);
// Remove now-mined transactions
// XXX should batch this
utils.forEachSerial(block.txs, function(tx, next) {
self.tx.remove(tx, next);
}, callback);
var unlock = this._lock(addBlock, [block, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
this.open(function(err) {
if (err)
return callback(err);
utils.forEachSerial(block.txs, function(tx, next) {
self.tx.removeUnchecked(tx, next);
}, callback);
});
};
Mempool.prototype.removeBlock = function removeBlock(block, callback) {
Mempool.prototype.removeBlock = function removeBlock(block, callback, force) {
var self = this;
callback = utils.ensure(callback);
// XXX should batch this
var unlock = this._lock(removeBlock, [block, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
utils.forEachSerial(block.txs.slice().reverse(), function(tx, next) {
self.tx.add(tx, next);
self.tx.addUnchecked(tx, next);
}, callback);
};
@ -208,7 +219,7 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback, force) {
if (exists)
return callback();
self.node.fillCoin(tx, function(err, tx, doubleSpend) {
self.tx.isDoubleSpend(tx, function(err, doubleSpend) {
if (err)
return callback(err);
@ -217,17 +228,22 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback, force) {
return callback(new VerifyError('bad-txns-inputs-spent', 0));
}
if (!tx.hasPrevout())
return self.storeOrphan(tx, callback);
self.verify(tx, function(err) {
if (err) {
if (err.type === 'VerifyError' && err.score >= 0)
peer.sendReject(tx, err.reason, err.score);
self.node.fillCoin(tx, function(err) {
if (err)
return callback(err);
}
self.addUnchecked(tx, peer, callback);
if (!tx.hasPrevout())
return self.storeOrphan(tx, callback);
self.verify(tx, function(err) {
if (err) {
if (err.type === 'VerifyError' && err.score >= 0)
peer.sendReject(tx, err.reason, err.score);
return callback(err);
}
self.addUnchecked(tx, peer, callback);
});
});
});
});
@ -235,7 +251,7 @@ Mempool.prototype.addTX = function addTX(tx, peer, callback, force) {
Mempool.prototype.addUnchecked = function addUnchecked(tx, peer, callback) {
var self = this;
self.tx.add(tx, function(err) {
self.tx.addUnchecked(tx, function(err) {
if (err)
return callback(err);
@ -351,7 +367,7 @@ Mempool.prototype._hasTX = function hasTX(tx, callback, force) {
if (result)
return callback(null, result);
self.db.get('D/' + hash, function(err, tx) {
self.db.get('m/D/' + hash, function(err, tx) {
if (err && err.type !== 'NotFoundError')
return callback(err);
@ -378,7 +394,7 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) {
assert(outputs.length > 0);
utils.forEachSerial(outputs, function(key, next) {
self.db.get('d/' + key, function(err, buf) {
self.db.get('m/d/' + key, function(err, buf) {
if (err && err.type !== 'NotFoundError')
return next(err);
@ -389,7 +405,7 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) {
p.writeHash(hash);
batch.put('d/' + key, p.render());
batch.put('m/d/' + key, p.render());
next();
});
@ -397,7 +413,7 @@ Mempool.prototype.storeOrphan = function storeOrphan(tx, callback, force) {
if (err)
return callback(err);
batch.put('D/' + hash, tx.toExtended(true));
batch.put('m/D/' + hash, tx.toExtended(true));
batch.write(callback);
});
};
@ -418,7 +434,7 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force)
var batch = this.db.batch();
var p;
this.db.get('d/' + hash, function(err, buf) {
this.db.get('m/d/' + hash, function(err, buf) {
if (err && err.type !== 'NotFoundError')
return callback(err);
@ -439,7 +455,7 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force)
p.end();
utils.forEachSerial(hashes, function(orphanHash, next, i) {
self.db.get('D/' + orphanHash, function(err, orphan) {
self.db.get('m/D/' + orphanHash, function(err, orphan) {
if (err && err.type !== 'NotFoundError')
return next(err);
@ -452,10 +468,13 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force)
return next(e);
}
orphan.fillPrevout(tx);
orphan.inputs.forEach(function(input) {
if (!input.output && input.prevout.hash === hash)
input.output = bcoin.coin(tx, input.prevout.index);
});
if (orphan.hasPrevout()) {
batch.del('D/' + orphanHash);
batch.del('m/D/' + orphanHash);
return self.verify(orphan, function(err) {
if (err) {
if (err.type === 'VerifyError')
@ -467,7 +486,7 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force)
});
}
batch.put('D/' + orphanHash, orphan.toExtended(true));
batch.put('m/D/' + orphanHash, orphan.toExtended(true));
next();
});
}, function(err) {
@ -481,7 +500,7 @@ Mempool.prototype.resolveOrphans = function resolveOrphans(tx, callback, force)
return callback(null, resolved);
}
batch.del('d/' + hash);
batch.del('m/d/' + hash);
return batch.write(done);
});
@ -492,40 +511,6 @@ Mempool.prototype.getInv = function getInv(callback) {
return this.tx.getAllHashes(callback);
};
Mempool.prototype.remove =
Mempool.prototype.removeTX = function removeTX(hash, callback, force) {
var self = this;
var unlock = this._lock(removeTX, [hash, callback], force);
if (!unlock)
return;
function getTX() {
if (hash instanceof bcoin.tx)
return callback(null, hash);
return self.getTX(hash, function(err, tx) {
if (err)
return callback(err);
if (!tx)
return callback();
return self.node.fillTX(tx, callback);
});
}
return getTX(function(err, tx) {
if (err)
return callback(err);
self.tx.remove(tx, function(err) {
if (err)
return callback(err);
self.emit('remove tx', tx);
});
});
};
Mempool.prototype.checkTX = function checkTX(tx, peer) {
return Mempool.checkTX(tx, peer);
};

View File

@ -30,6 +30,7 @@ function TXPool(prefix, db, options) {
this.options = options;
this.busy = false;
this.jobs = [];
this.locker = new bcoin.locker(this);
if (this.options.mapAddress)
this.options.indexAddress = true;
@ -38,40 +39,7 @@ function TXPool(prefix, db, options) {
utils.inherits(TXPool, EventEmitter);
TXPool.prototype._lock = function _lock(func, args, force) {
var self = this;
var called;
if (force) {
assert(this.busy);
return function unlock() {
assert(!called);
called = true;
};
}
if (this.busy) {
this.jobs.push([func, args]);
return;
}
this.busy = true;
return function unlock() {
var item;
assert(!called);
called = true;
self.busy = false;
if (self.jobs.length === 0) {
self.emit('flush');
return;
}
item = self.jobs.shift();
item[0].apply(self, item[1]);
};
return this.locker.lock(func, args, force);
};
TXPool.prototype.getMap = function getMap(tx, callback) {
@ -325,20 +293,24 @@ TXPool.prototype._add = function add(tx, map, callback, force) {
// Consume unspent money or add orphans
utils.forEachSerial(tx.inputs, function(input, next, i) {
var key;
var key, address;
if (tx.isCoinbase())
return next();
key = input.prevout.hash + '/' + input.prevout.index;
address = input.getAddress();
// Only add orphans if this input is ours.
if (self.options.mapAddress) {
if (!address || !map[address].length)
return next();
}
self.getCoin(input.prevout.hash, input.prevout.index, function(err, coin) {
var address;
if (err)
return next(err);
address = input.getAddress();
key = input.prevout.hash + '/' + input.prevout.index;
if (coin) {
// Add TX to inputs and spend money
@ -354,50 +326,52 @@ TXPool.prototype._add = function add(tx, map, callback, force) {
if (self.options.indexAddress && address) {
map[address].forEach(function(id) {
batch.del(
prefix + 'u/a/' + id
+ '/' + input.prevout.hash
+ '/' + input.prevout.index);
batch.del(prefix + 'u/a/' + id + '/' + key);
});
}
batch.del(
prefix + 'u/t/'
+ input.prevout.hash
+ '/' + input.prevout.index);
if (self.options.indexSpent) {
batch.put(
prefix + 's/t/'
+ input.prevout.hash
+ '/' + input.prevout.index,
tx.hash());
}
batch.del(prefix + 'u/t/' + key);
batch.put(prefix + 's/t/' + key, tx.hash());
return next();
}
input.output = null;
// Only add orphans if this input is ours.
if (self.options.mapAddress) {
if (!address || !map[address].length)
return next();
}
self.isSpent(input.prevout.hash, input.prevout.index, function(err, result) {
self.isSpent(input.prevout.hash, input.prevout.index, function(err, spentBy) {
if (err)
return next(err);
// Are we double-spending?
if (result) {
if (!self.options.indexSpent)
return next(new Error('Transaction is double-spending.'));
return self._removeSpenders(hash, function(err) {
// Replace older txs with newer ones.
if (spentBy) {
return self.getTX(input.prevout.hash, function(err, prev) {
if (err)
return next(err);
batch.clear();
self._add(tx, map, callback, true);
if (!prev)
return callback(new Error('Could not find double-spent coin.'));
input.output = bcoin.coin(prev, input.prevout.index);
// Skip invalid transactions
if (self.options.verify) {
if (!tx.verify(i))
return callback(null, false);
}
return self._removeSpenders(spentBy, tx, function(err, result) {
if (err)
return next(err);
if (!result) {
assert(tx.ts === 0, 'I\'m confused');
return callback(null, false);
}
batch.clear();
self._add(tx, map, callback, true);
});
});
}
@ -525,7 +499,7 @@ TXPool.prototype._add = function add(tx, map, callback, force) {
}, true);
};
TXPool.prototype._removeSpenders = function removeSpenders(hash, callback) {
TXPool.prototype._removeSpenders = function removeSpenders(hash, ref, callback) {
var self = this;
this.getTX(hash, function(err, tx) {
if (err)
@ -535,54 +509,31 @@ TXPool.prototype._removeSpenders = function removeSpenders(hash, callback) {
return callback(new Error('Could not find spender.'));
if (tx.ts !== 0)
return callback(new Error('Transaction is double-spending.'));
return callback(null, false);
utils.forEachSerial(tx.outputs, function(next, output, i) {
if (ref.ts === 0 && ref.ps < ts.ps)
return callback(null, false);
utils.forEachSerial(tx.outputs, function(output, next, i) {
self.isSpent(hash, i, function(err, spent) {
if (err)
return next(err);
if (spent)
return self._removeSpenders(spent, next);
return self._removeSpenders(spent, ref, next);
next();
});
}, function(err) {
if (err)
return callback(err);
return self.lazyRemove(tx, callback, true);
return self.lazyRemove(tx, function(err) {
if (err)
return callback(err);
return callback(null, true);
}, true);
});
});
};
TXPool.prototype._collectSpenders = function _collectSpenders(tx, callback, spenders) {
var self = this;
var hash = tx.hash('hex');
if (!spenders)
spenders = [];
utils.forEachSerial(tx.outputs, function(next, output, i) {
self.isSpent(hash, i, function(err, spentBy) {
if (err)
return next(err);
if (spentBy) {
spenders.push(spentBy);
return self.getTX(spentBy, function(err, tx) {
if (err)
return next(err);
if (!tx)
return next();
return self._removeSpenders(tx, next, spenders);
});
}
next();
});
}, function(err) {
if (err)
return callback(err);
return callback(null, spenders);
});
};
TXPool.prototype.isDoubleSpend = function isDoubleSpend(tx, callback) {
var self = this;
utils.everySerial(tx.inputs, function(input, next) {
@ -601,36 +552,19 @@ TXPool.prototype.isDoubleSpend = function isDoubleSpend(tx, callback) {
});
};
TXPool.prototype.isSpent = function isSpent(hash, index, callback, checkCoin) {
TXPool.prototype.isSpent = function isSpent(hash, index, callback) {
var self = this;
var prefix = this.prefix + '/';
var key = prefix + 's/t/' + hash + '/' + index;
if (this.options.indexSpent) {
return this.db.get('s/t/' + hash + '/' + index, function(err, exists) {
if (err && err.type !== 'NotFoundError')
return callback(err);
return callback(null, exists ? exists.toString('hex') : null);
});
}
function getCoin(callback) {
if (!checkCoin)
return callback(null, null);
return self.getCoin(hash, index, callback);
}
return getCoin(function(err, coin) {
if (err)
return this.db.get(key, function(err, hash) {
if (err && err.type !== 'NotFoundError')
return callback(err);
if (coin)
return callback(null, false);
if (!hash)
return callback(null, null);
return self.getTX(hash, function(err, tx) {
if (err)
return callback(err);
return callback(null, !!tx);
});
return callback(null, utils.toHex(hash));
});
};
@ -728,10 +662,6 @@ TXPool.prototype._confirm = function _confirm(tx, map, callback, force) {
TXPool.prototype.remove = function remove(hash, callback, force) {
var self = this;
var first = Array.isArray(hash) ? hash[0] : hash;
if (!this.options.indexExtra && (first instanceof bcoin.tx))
return this.lazyRemove(hash, callback, force);
if (Array.isArray(hash)) {
return utils.forEachSerial(hash, function(hash, next) {
@ -858,11 +788,9 @@ TXPool.prototype._remove = function remove(tx, map, callback, force) {
+ '/' + input.prevout.index,
input.output.toRaw());
if (self.options.indexSpent) {
batch.del(prefix + 's/t/'
+ input.prevout.hash
+ '/' + input.prevout.index);
}
batch.del(prefix + 's/t/'
+ input.prevout.hash
+ '/' + input.prevout.index);
batch.del(prefix + 'o/' + input.prevout.hash + '/' + input.prevout.index);
});
@ -1603,6 +1531,158 @@ TXPool.prototype.getBalance = function getBalance(callback) {
return this.getBalanceByAddress(null, callback);
};
TXPool.prototype.addUnchecked = function addUnchecked(tx, callback, force) {
var self = this;
var prefix = this.prefix + '/';
var hash = tx.hash('hex');
var batch;
var unlock = this._lock(addUnchecked, [tx, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
batch = this.db.batch();
batch.put(prefix + 't/t/' + hash, tx.toExtended());
tx.getAddresses().forEach(function(address) {
batch.put(prefix + 't/a/' + address + '/' + hash, DUMMY);
});
tx.inputs.forEach(function(input) {
var key = input.prevout.hash + '/' + input.prevout.index;
var address;
if (tx.isCoinbase())
return;
assert(input.output);
address = input.getAddress();
batch.del(prefix + 'u/t/' + key);
batch.put(prefix + 's/t/' + key, tx.hash());
if (address)
batch.del(prefix + 'u/a/' + address + '/' + key);
});
tx.outputs.forEach(function(output, i) {
var key = hash + '/' + i;
var address = output.getAddress();
var coin = bcoin.coin(tx, i).toRaw();
batch.put(prefix + 'u/t/' + key, coin);
if (address)
batch.put(prefix + 'u/a/' + address + '/' + key, DUMMY);
});
return batch.write(function(err) {
if (err)
return callback(err);
self.emit('add tx', tx);
return callback();
});
};
TXPool.prototype.removeUnchecked = function removeUnchecked(tx, callback, force) {
var self = this;
var prefix = this.prefix + '/';
var hash = tx.hash('hex');
var batch;
var unlock = this._lock(removeUnchecked, [tx, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
batch = this.db.batch();
batch.del(prefix + 't/t/' + hash);
batch.del(prefix + 'D/' + hash);
tx.getAddresses().forEach(function(address) {
batch.del(prefix + 't/a/' + address + '/' + hash);
});
tx.inputs.forEach(function(input) {
var key = input.prevout.hash + '/' + input.prevout.index;
var address;
if (tx.isCoinbase())
return;
if (!input.output)
return;
address = input.getAddress();
batch.del(prefix + 'u/t/' + key);
batch.del(prefix + 's/t/' + key);
if (address)
batch.del(prefix + 'u/a/' + address + '/' + key);
});
tx.outputs.forEach(function(output, i) {
var key = hash + '/' + i;
var address = output.getAddress();
batch.del(prefix + 'u/t/' + key);
if (address)
batch.del(prefix + 'u/a/' + address + '/' + key);
});
batch.write(function(err) {
if (err)
return callback(err);
self.emit('remove tx', tx);
return callback();
});
};
TXPool.prototype.zap = function zap(tip, callback, force) {
var self = this;
var now = tip.ts;
var age = 10 * 60 * 60;
var unlock = this._lock(zap, [tip, callback], force);
if (!unlock)
return;
callback = utils.wrap(callback, unlock);
// return this.getRange(null, {
// start: 0,
// end: now - age
// }, function(err, txs) {
// });
this.getPending(function(err, txs) {
if (err)
return callback(err);
txs = txs.filter(function(tx) {
return now > tx.ps + age;
});
if (txs.length === 0)
return callback();
self.fillTX(txs, function(err) {
if (err)
return callback(err);
utils.forEachSerial(txs, function(tx, next) {
self.lazyRemove(tx, next);
}, callback);
});
});
};
/**
* Expose
*/

View File

@ -19,6 +19,7 @@ var dummyInput = {
index: 0
},
script: new bcoin.script([]),
witness: new bcoin.script.witness([]),
sequence: 0xffffffff
};
@ -131,11 +132,13 @@ describe('Wallet', function() {
assert(tx.verify());
});
var dw, di;
it('should have TX pool and be serializable', function(cb) {
wdb.create({}, function(err, w) {
assert.noError(err);
wdb.create({}, function(err, f) {
assert.noError(err);
dw = w;
// Coinbase
var t1 = bcoin.mtx().addOutput(w, 50000).addOutput(w, 1000);
@ -145,6 +148,7 @@ describe('Wallet', function() {
var t2 = bcoin.mtx().addInput(t1, 0) // 50000
.addOutput(w, 24000)
.addOutput(w, 24000);
di = t2.inputs[0];
// balance: 49000
w.sign(t2);
var t3 = bcoin.mtx().addInput(t1, 1) // 1000
@ -179,8 +183,6 @@ describe('Wallet', function() {
fake.hint = 'fake';
// Fake TX should temporarly change output
// wdb.addTX(fake);
wdb.addTX(fake, function(err) {
assert.noError(err);
wdb.addTX(t4, function(err) {
@ -234,6 +236,19 @@ describe('Wallet', function() {
});
});
it('should cleanup spenders after double-spend', function(cb) {
var t1 = bcoin.mtx().addOutput(dw, 5000);
t1.addInput(di);
wdb.addTX(t1, function(err) {
assert.noError(err);
dw.getBalance(function(err, balance) {
assert.noError(err);
assert.equal(balance.toString(10), '11000');
cb();
});
});
});
it('should fill tx with inputs', function(cb) {
wdb.create({}, function(err, w1) {
assert.noError(err);