mempool: rewrite.

This commit is contained in:
Christopher Jeffrey 2016-08-15 15:46:37 -07:00
parent c794c17077
commit c8bc9fb8b6
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
9 changed files with 727 additions and 870 deletions

View File

@ -171,51 +171,42 @@ CompactBlock.prototype.toRequest = function toRequest() {
return TXRequest.fromCompact(this);
};
CompactBlock.prototype.fillMempool = function fillMempool(mempool, callback) {
var self = this;
CompactBlock.prototype.fillMempool = function fillMempool(mempool) {
var have = {};
var id, index;
var hashes = mempool.getSnapshot();
var i, id, index, hash, tx;
mempool.getSnapshot(function(err, hashes) {
if (err)
return callback(err);
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
id = this.sid(hash);
index = this.idMap[id];
utils.forEachSerial(hashes, function(hash, next) {
id = self.sid(hash);
index = self.idMap[id];
if (index == null)
continue;
if (index == null)
return next();
if (have[index]) {
// Siphash collision, just request it.
this.available[index] = null;
this.count--;
continue;
}
if (have[index]) {
// Siphash collision, just request it.
self.available[index] = null;
self.count--;
return next();
}
tx = mempool.getTX(hash);
mempool.getTX(hash, function(err, tx) {
if (err)
return callback(err);
if (!tx)
continue;
// Race condition: tx
// fell out of mempool.
if (!tx)
return next();
this.available[index] = tx;
have[index] = true;
this.count++;
self.available[index] = tx;
have[index] = true;
self.count++;
// We actually may have a siphash collision
// here, but exit early anyway for perf.
if (this.count === this.totalTX)
return true;
}
// We actually may have a siphash collision
// here, but exit early anyway for perf.
if (self.count === self.totalTX)
return callback(null, true);
next();
});
}, callback);
});
return false;
};
CompactBlock.prototype.fillMissing = function fillMissing(res) {

View File

@ -483,32 +483,15 @@ Fullnode.prototype.getFullBlock = function getFullBlock(hash, callback) {
*/
Fullnode.prototype.getCoin = function getCoin(hash, index, callback) {
var self = this;
this.mempool.getCoin(hash, index, function(err, coin) {
if (err)
return callback(err);
var coin = this.mempool.getCoin(hash, index);
if (coin)
return callback(null, coin);
if (coin)
return callback(null, coin);
self.chain.db.getCoin(hash, index, function(err, coin) {
if (err)
return callback(err);
if (this.mempool.isSpent(hash, index))
return callback();
if (!coin)
return callback();
self.mempool.isSpent(hash, index, function(err, spent) {
if (err)
return callback(err);
if (spent)
return callback();
return callback(null, coin);
});
});
});
this.chain.db.getCoin(hash, index, callback);
};
/**
@ -520,29 +503,23 @@ Fullnode.prototype.getCoin = function getCoin(hash, index, callback) {
Fullnode.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, callback) {
var self = this;
this.mempool.getCoinsByAddress(addresses, function(err, coins) {
var coins = this.mempool.getCoinsByAddress(addresses);
this.chain.db.getCoinsByAddress(addresses, function(err, blockCoins) {
if (err)
return callback(err);
self.chain.db.getCoinsByAddress(addresses, function(err, blockCoins) {
utils.forEach(blockCoins, function(coin, next) {
var spent = self.mempool.isSpent(coin.hash, coin.index);
if (!spent)
coins.push(coin);
return next();
}, function(err) {
if (err)
return callback(err);
utils.forEach(blockCoins, function(coin, next) {
self.mempool.isSpent(coin.hash, coin.index, function(err, spent) {
if (err)
return callback(err);
if (!spent)
coins.push(coin);
return next();
});
}, function(err) {
if (err)
return callback(err);
return callback(null, coins);
});
return callback(null, coins);
});
});
};
@ -554,25 +531,12 @@ Fullnode.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, cal
*/
Fullnode.prototype.getTX = function getTX(hash, callback) {
var self = this;
var tx = this.mempool.getTX(hash);
this.mempool.getTX(hash, function(err, tx) {
if (err)
return callback(err);
if (tx)
return callback(null, tx);
if (tx)
return callback(null, tx);
self.chain.db.getTX(hash, function(err, tx) {
if (err)
return callback(err);
if (!tx)
return callback();
return callback(null, tx);
});
});
this.chain.db.getTX(hash, callback);
};
/**
@ -582,17 +546,10 @@ Fullnode.prototype.getTX = function getTX(hash, callback) {
*/
Fullnode.prototype.hasTX = function hasTX(hash, callback) {
var self = this;
if (this.mempool.hasTX(hash))
return callback(null, true);
this.mempool.hasTX(hash, function(err, result) {
if (err)
return callback(err);
if (result)
return callback(null, true);
self.chain.db.hasTX(hash, callback);
});
this.chain.db.hasTX(hash, callback);
};
/**
@ -603,17 +560,10 @@ Fullnode.prototype.hasTX = function hasTX(hash, callback) {
*/
Fullnode.prototype.isSpent = function isSpent(hash, index, callback) {
var self = this;
if (this.mempool.isSpent(hash, index))
return callback(null, true);
this.mempool.isSpent(hash, index, function(err, spent) {
if (err)
return callback(err);
if (spent)
return callback(null, true);
self.chain.db.isSpent(hash, index, callback);
});
this.chain.db.isSpent(hash, index, callback);
};
/**
@ -624,18 +574,13 @@ Fullnode.prototype.isSpent = function isSpent(hash, index, callback) {
*/
Fullnode.prototype.getTXByAddress = function getTXByAddress(addresses, callback) {
var self = this;
var mempool = this.mempool.getTXByAddress(addresses);
this.mempool.getTXByAddress(addresses, function(err, mempool) {
this.chain.db.getTXByAddress(addresses, function(err, txs) {
if (err)
return callback(err);
self.chain.db.getTXByAddress(addresses, function(err, txs) {
if (err)
return callback(err);
return callback(null, mempool.concat(txs));
});
return callback(null, mempool.concat(txs));
});
};

View File

@ -1094,50 +1094,41 @@ RPC.prototype.getrawmempool = function getrawmempool(args, callback) {
RPC.prototype.mempoolToJSON = function mempoolToJSON(verbose, callback) {
var self = this;
var out = {};
var tx;
var i, tx, hashes, hash, entry;
if (verbose) {
return this.mempool.getSnapshot(function(err, hashes) {
if (err)
return callback(err);
hashes = this.mempool.getSnapshot();
utils.forEachSerial(hashes, function(hash, next) {
self.mempool.getEntry(hash, function(err, entry) {
if (err)
return callback(err);
for (i = 0; i < hashes.length; i++) {
hash = hashes[i];
entry = this.mempool.getEntry(hash);
tx = entry.tx;
if (!entry)
continue;
out[tx.rhash] = {
size: entry.size,
fee: entry.fee,
modifiedfee: entry.fees,
time: entry.ts,
height: entry.height,
startingpriority: entry.priority,
currentpriority: entry.getPriority(self.chain.height),
descendantcount: 0,
descendantsize: entry.size,
descendantfees: entry.fees,
depends: []
};
tx = entry.tx;
next();
});
}, function(err) {
if (err)
return callback(err);
return callback(null, out);
});
});
out[tx.rhash] = {
size: entry.size,
fee: entry.fee,
modifiedfee: entry.fees,
time: entry.ts,
height: entry.height,
startingpriority: entry.priority,
currentpriority: entry.getPriority(self.chain.height),
descendantcount: 0,
descendantsize: entry.size,
descendantfees: entry.fees,
depends: []
};
}
return callback(null, out);
}
this.mempool.getSnapshot(function(err, hashes) {
if (err)
return callback(err);
hashes = this.mempool.getSnapshot();
return callback(null, hashes.map(utils.revHex));
});
return callback(null, hashes.map(utils.revHex));
};
RPC.prototype.gettxout = function gettxout(args, callback) {
@ -1573,7 +1564,7 @@ RPC.prototype.getnetworkhashps = function getnetworkhashps(args, callback) {
RPC.prototype.prioritisetransaction = function prioritisetransaction(args, callback) {
var self = this;
var hash, pri, fee;
var hash, pri, fee, entry;
if (args.help || args.length !== 3) {
return callback(new RPCError('prioritisetransaction'
@ -1592,35 +1583,32 @@ RPC.prototype.prioritisetransaction = function prioritisetransaction(args, callb
hash = utils.revHex(hash);
this.mempool.getEntry(hash, function(err, entry) {
entry = this.mempool.getEntry(hash);
if (!entry)
return callback(new RPCError('Transaction not in mempool.'));
entry.priority += pri;
entry.fees += fee;
if (entry.priority < 0)
entry.priority = 0;
if (entry.fees < 0)
entry.fees = 0;
this.mempool.fillAllCoins(entry.tx, function(err) {
if (err)
return callback(err);
if (!entry)
if (!entry.tx.hasCoins())
return callback(new RPCError('Transaction not in mempool.'));
entry.priority += pri;
entry.fees += fee;
if (entry.priority < 0)
entry.priority = 0;
if (entry.fees < 0)
entry.fees = 0;
this.mempool.fillAllCoins(entry.tx, function(err) {
self.mempool.addUnchecked(entry, function(err) {
if (err)
return callback(err);
if (!entry.tx.hasCoins())
return callback(new RPCError('Transaction not in mempool.'));
self.mempool.addUnchecked(entry, function(err) {
if (err)
return callback(err);
callback(null, true);
});
callback(null, true);
});
});
};

File diff suppressed because it is too large Load Diff

View File

@ -1124,7 +1124,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
if (!payload.mempool)
return callback();
self.mempool.getCoin(hash, index, callback);
callback(null, self.mempool.getCoin(hash, index));
}
function isSpent(hash, index, callback) {
@ -1134,7 +1134,7 @@ Peer.prototype._handleGetUTXOs = function _handleGetUTXOs(payload) {
if (!payload.mempool)
return callback(null, false);
self.mempool.isSpent(hash, index, callback);
callback(null, self.mempool.isSpent(hash, index));
}
if (payload.prevout.length > 15)
@ -1427,7 +1427,7 @@ Peer.prototype._handleVersion = function _handleVersion(version) {
Peer.prototype._handleMempool = function _handleMempool() {
var self = this;
var items = [];
var i;
var i, hashes;
var unlock = this.locker.lock(_handleMempool, []);
if (!unlock)
@ -1450,18 +1450,14 @@ Peer.prototype._handleMempool = function _handleMempool() {
if (this.pool.options.selfish)
return done();
this.mempool.getSnapshot(function(err, hashes) {
if (err)
return done(err);
hashes = this.mempool.getSnapshot();
for (i = 0; i < hashes.length; i++)
items.push(new InvItem(constants.inv.TX, hashes[i]));
for (i = 0; i < hashes.length; i++)
items.push(new InvItem(constants.inv.TX, hashes[i]));
self.logger.debug('Sending mempool snapshot (%s).', self.hostname);
self.logger.debug('Sending mempool snapshot (%s).', self.hostname);
self.sendInv(items);
done();
});
self.sendInv(items);
};
/**
@ -1494,7 +1490,7 @@ Peer.prototype._getItem = function _getItem(item, callback) {
if (item.isTX()) {
if (!this.mempool)
return callback();
return this.mempool.getEntry(item.hash, callback);
return callback(null, this.mempool.getEntry(item.hash));
}
if (this.chain.db.options.spv)
@ -1941,6 +1937,7 @@ Peer.prototype._handleSendCmpct = function _handleSendCmpct(payload) {
Peer.prototype._handleCmpctBlock = function _handleCmpctBlock(block) {
var self = this;
var hash = block.hash('hex');
var result;
function done(err) {
if (err) {
@ -1970,31 +1967,28 @@ Peer.prototype._handleCmpctBlock = function _handleCmpctBlock(block) {
// Sort of a lock too.
this.compactBlocks[hash] = block;
block.fillMempool(this.mempool, function(err, result) {
if (err)
return done(err);
result = block.fillMempool(this.mempool);
if (result) {
delete self.compactBlocks[hash];
self.emit('block', block.toBlock());
self.logger.debug(
'Received full compact block %s (%s).',
block.rhash, self.hostname);
return;
}
if (result) {
delete this.compactBlocks[hash];
this.fire('block', block.toBlock());
this.logger.debug(
'Received full compact block %s (%s).',
block.rhash, this.hostname);
return;
}
self.write(self.framer.getBlockTxn(block.toRequest()));
this.write(this.framer.getBlockTxn(block.toRequest()));
this.logger.debug(
'Received semi-full compact block %s (%s).',
block.rhash, this.hostname);
block.startTimeout(10000, function() {
self.logger.debug(
'Received semi-full compact block %s (%s).',
'Compact block timed out: %s (%s).',
block.rhash, self.hostname);
block.startTimeout(10000, function() {
self.logger.debug(
'Compact block timed out: %s (%s).',
block.rhash, self.hostname);
delete self.compactBlocks[hash];
});
delete self.compactBlocks[hash];
});
};

View File

@ -1563,7 +1563,7 @@ Pool.prototype.has = function has(type, hash, force, callback) {
}
// Check the mempool.
return this.mempool.has(hash, check);
return check(null, this.mempool.has(hash));
}
// Check the chain.

View File

@ -37,6 +37,9 @@ function RBT(location, options) {
if (!options)
options = {};
if (typeof options === 'function')
options = { compare: options };
this.options = options;
this.root = SENTINEL;
this.compare = options.compare || utils.cmp;

View File

@ -224,23 +224,21 @@ describe('Block', function() {
var fakeMempool = {
getSnapshot: function(callback) {
callback(null, Object.keys(map));
return Object.keys(map);
},
getTX: function(hash, callback) {
callback(null, map[hash]);
return map[hash];
}
};
assert.equal(cblock.sid(block.txs[1].hash()), 125673511480291);
cblock.fillMempool(fakeMempool, function(err, result) {
assert.ifError(err);
assert(result);
for (var i = 0; i < cblock.available.length; i++)
assert(cblock.available[i]);
assert.equal(cblock.toBlock().toRaw().toString('hex'), block.toRaw().toString('hex'));
cb();
});
var result = cblock.fillMempool(fakeMempool);
assert(result);
for (var i = 0; i < cblock.available.length; i++)
assert(cblock.available[i]);
assert.equal(cblock.toBlock().toRaw().toString('hex'), block.toRaw().toString('hex'));
cb();
});
it('should handle half-full compact block', function(cb) {
@ -262,39 +260,37 @@ describe('Block', function() {
var fakeMempool = {
getSnapshot: function(callback) {
callback(null, keys);
return keys;
},
getTX: function(hash, callback) {
callback(null, map[hash]);
return map[hash];
}
};
assert.equal(cblock.sid(block.txs[1].hash()), 125673511480291);
cblock.fillMempool(fakeMempool, function(err, result) {
assert.ifError(err);
assert(!result);
var result = cblock.fillMempool(fakeMempool);
assert(!result);
var req = cblock.toRequest();
assert.equal(req.hash, cblock.hash('hex'));
assert.deepEqual(req.indexes, [5, 6, 7, 8, 9]);
var req = cblock.toRequest();
assert.equal(req.hash, cblock.hash('hex'));
assert.deepEqual(req.indexes, [5, 6, 7, 8, 9]);
req = bip152.TXRequest.fromRaw(req.toRaw());
assert.equal(req.hash, cblock.hash('hex'));
assert.deepEqual(req.indexes, [5, 6, 7, 8, 9]);
req = bip152.TXRequest.fromRaw(req.toRaw());
assert.equal(req.hash, cblock.hash('hex'));
assert.deepEqual(req.indexes, [5, 6, 7, 8, 9]);
var res = bip152.TXResponse.fromBlock(block, req);
res = bip152.TXResponse.fromRaw(res.toRaw());
var res = bip152.TXResponse.fromBlock(block, req);
res = bip152.TXResponse.fromRaw(res.toRaw());
var result = cblock.fillMissing(res);
assert(result);
var result = cblock.fillMissing(res);
assert(result);
for (var i = 0; i < cblock.available.length; i++)
assert(cblock.available[i]);
for (var i = 0; i < cblock.available.length; i++)
assert(cblock.available[i]);
assert.equal(cblock.toBlock().toRaw().toString('hex'), block.toRaw().toString('hex'));
assert.equal(cblock.toBlock().toRaw().toString('hex'), block.toRaw().toString('hex'));
cb();
});
cb();
});
});

View File

@ -126,41 +126,30 @@ describe('Mempool', function() {
assert.ifError(err);
mempool.addTX(t4, function(err) {
assert.ifError(err);
mempool.getBalance(function(err, balance) {
var balance = mempool.getBalance();
assert.equal(balance, 0);
mempool.addTX(t1, function(err) {
assert.ifError(err);
assert.equal(balance.total, 0);
mempool.addTX(t1, function(err) {
var balance = mempool.getBalance();
assert.equal(balance, 60000);
mempool.addTX(t2, function(err) {
assert.ifError(err);
mempool.getBalance(function(err, balance) {
var balance = mempool.getBalance();
assert.equal(balance, 50000);
mempool.addTX(t3, function(err) {
assert.ifError(err);
assert.equal(balance.total, 60000);
mempool.addTX(t2, function(err) {
var balance = mempool.getBalance();
assert.equal(balance, 22000);
mempool.addTX(f1, function(err) {
assert.ifError(err);
mempool.getBalance(function(err, balance) {
assert.ifError(err);
assert.equal(balance.total, 50000);
mempool.addTX(t3, function(err) {
assert.ifError(err);
mempool.getBalance(function(err, balance) {
assert.ifError(err);
assert.equal(balance.total, 22000);
mempool.addTX(f1, function(err) {
assert.ifError(err);
mempool.getBalance(function(err, balance) {
assert.ifError(err);
assert.equal(balance.total, 20000);
mempool.getHistory(function(err, txs) {
assert(txs.some(function(tx) {
return tx.hash('hex') === f1.hash('hex');
}));
var balance = mempool.getBalance();
assert.equal(balance, 20000);
var txs = mempool.getHistory();
assert(txs.some(function(tx) {
return tx.hash('hex') === f1.hash('hex');
}));
cb();
});
});
});
});
});
});
cb();
});
});
});