more mempool work.

This commit is contained in:
Christopher Jeffrey 2016-05-13 04:48:29 -07:00
parent ced84ca25b
commit 5a2834dcec
No known key found for this signature in database
GPG Key ID: 8962AB9DE6666BBD
2 changed files with 209 additions and 138 deletions

View File

@ -6,6 +6,7 @@
*/
var utils = require('./utils');
var assert = utils.assert;
var EventEmitter = require('events').EventEmitter;
/**
@ -69,6 +70,17 @@ LowlevelUp.prototype.open = function open(callback) {
this.once('open', callback);
};
/**
* Close the database (not recallable).
* @param {Function} callback
*/
LowlevelUp.prototype.close = function close(callback) {
assert(this.loaded, 'Cannot use database before it is loaded.');
this.loaded = false;
return this.binding.close(callback);
};
/**
* Retrieve a record from the database.
* @param {String} key
@ -77,10 +89,13 @@ LowlevelUp.prototype.open = function open(callback) {
*/
LowlevelUp.prototype.get = function get(key, options, callback) {
assert(this.loaded, 'Cannot use database before it is loaded.');
if (typeof options === 'function') {
callback = options;
options = {};
}
return this.binding.get(key, options, function(err, result) {
if (err) {
if (err.notFound || /not\s*found/i.test(err.message)) {
@ -93,15 +108,6 @@ LowlevelUp.prototype.get = function get(key, options, callback) {
});
};
/**
* Close the database (not recallable).
* @param {Function} callback
*/
LowlevelUp.prototype.close = function close(callback) {
return this.binding.close(callback);
};
/**
* Store a record in the database.
* @param {String} key
@ -111,6 +117,7 @@ LowlevelUp.prototype.close = function close(callback) {
*/
LowlevelUp.prototype.put = function put(key, value, options, callback) {
assert(this.loaded, 'Cannot use database before it is loaded.');
return this.binding.put(key, value, options, callback);
};
@ -122,6 +129,7 @@ LowlevelUp.prototype.put = function put(key, value, options, callback) {
*/
LowlevelUp.prototype.del = function del(key, options, callback) {
assert(this.loaded, 'Cannot use database before it is loaded.');
return this.binding.del(key, options, callback);
};
@ -134,8 +142,11 @@ LowlevelUp.prototype.del = function del(key, options, callback) {
*/
LowlevelUp.prototype.batch = function batch(ops, options, callback) {
assert(this.loaded, 'Cannot use database before it is loaded.');
if (!ops)
return this.binding.batch();
return this.binding.batch(ops, options, callback);
};
@ -146,6 +157,7 @@ LowlevelUp.prototype.batch = function batch(ops, options, callback) {
*/
LowlevelUp.prototype.iterator = function iterator(options) {
assert(this.loaded, 'Cannot use database before it is loaded.');
return this.db.iterator(options);
};
@ -156,6 +168,8 @@ LowlevelUp.prototype.iterator = function iterator(options) {
*/
LowlevelUp.prototype.getProperty = function getProperty(name) {
assert(this.loaded, 'Cannot use database before it is loaded.');
if (!this.binding.getProperty)
return null;
@ -170,6 +184,7 @@ LowlevelUp.prototype.getProperty = function getProperty(name) {
*/
LowlevelUp.prototype.approximateSize = function approximateSize(start, end, callback) {
assert(this.loaded, 'Cannot use database before it is loaded.');
return this.binding.approximateSize(start, end, callback);
};
@ -191,7 +206,8 @@ LowlevelUp.prototype.has = function has(key, callback) {
/**
* Get and deserialize a record with a callback.
* @param {String} key
* @param {Function} parse
* @param {Function} parse - Accepts [Buffer(data), String(key)].
* Return value should be the parsed object.
* @param {Function} callback - Returns [Error, Object].
*/
@ -253,7 +269,7 @@ LowlevelUp.prototype.iterate = function iterate(options, callback) {
if (options.values) {
if (options.parse) {
try {
value = options.parse(value);
value = options.parse(value, key);
} catch (e) {
return iter.end(function() {
return callback(e);
@ -287,7 +303,7 @@ LowlevelUp.prototype.lookup = function lookup(options, callback) {
var self = this;
var items = [];
options.values = false;
assert(!options.values, 'Cannot pass `values` into lookup.');
return this.iterate(options, function(err, keys) {
if (err)
@ -301,15 +317,12 @@ LowlevelUp.prototype.lookup = function lookup(options, callback) {
if (!value)
return next();
if (!options.parse) {
items.push(value);
return next();
}
try {
value = options.parse(value, key);
} catch (e) {
return callback(e);
if (options.parse) {
try {
value = options.parse(value, key);
} catch (e) {
return callback(e);
}
}
if (value)

View File

@ -229,72 +229,37 @@ Mempool.prototype.addBlock = function addBlock(block, callback, force) {
callback = utils.wrap(callback, unlock);
// We add the txs we haven't seen to
// the mempool first to potentially
// resolve orphans.
utils.forEachSerial(block.txs, function(entry, next) {
var hash, copy;
if (!self.chain.isFull())
return next();
utils.forEachSerial(block.txs.slice().reverse(), function(tx, next) {
var hash = tx.hash('hex');
var copy;
if (tx.isCoinbase())
return next();
hash = tx.hash('hex');
self.hasTX(hash, function(err, exists) {
self.getEntry(hash, function(err, entry) {
if (err)
return next(err);
if (exists)
return next();
if (!entry)
return self.removeOrphan(hash, next);
self.removeOrphan(hash, function(err) {
self.removeUnchecked(entry, false, function(err) {
if (err)
return next(err);
entry = MempoolEntry.fromTX(tx, block.height);
self.emit('confirmed', tx, block);
self.addUnchecked(entry, next, true);
});
return next();
}, true);
});
}, function(err) {
if (err)
return callback(err);
utils.forEachSerial(block.txs.slice().reverse(), function(tx, next) {
var hash = tx.hash('hex');
var copy;
self.blockSinceBump = true;
self.lastFeeUpdate = utils.now();
if (tx.isCoinbase())
return next();
self.getEntry(hash, function(err, entry) {
if (err)
return next(err);
if (!entry)
return self.removeOrphan(hash, next);
self.removeUnchecked(entry, false, function(err) {
if (err)
return next(err);
self.emit('confirmed', tx, block);
return next();
}, true);
});
}, function(err) {
if (err)
return callback(err);
self.blockSinceBump = true;
self.lastFeeUpdate = utils.now();
return callback();
});
return callback();
});
};
@ -342,15 +307,16 @@ Mempool.prototype.removeBlock = function removeBlock(block, callback, force) {
/**
* Ensure the size of the mempool stays below 300mb.
* @param {Hash} entryHash - TX that initiated the trim.
* @param {Function} callback
*/
Mempool.prototype.limitMempoolSize = function limitMempoolSize(callback) {
Mempool.prototype.limitMempoolSize = function limitMempoolSize(entryHash, callback) {
var self = this;
var rate;
var trimmed = false;
if (this.getSize() <= this.maxSize)
return callback(null, true);
return callback(null, trimmed);
this.getRange({
start: 0,
@ -361,7 +327,10 @@ Mempool.prototype.limitMempoolSize = function limitMempoolSize(callback) {
utils.forEachSerial(function(entry, next) {
if (self.getSize() <= self.maxSize)
return callback(null, true);
return callback(null, trimmed);
if (!trimmed)
trimmed = entry.tx.hash('hex') === entryHash;
self.removeUnchecked(entry, true, next, true);
}, function(err) {
@ -369,7 +338,7 @@ Mempool.prototype.limitMempoolSize = function limitMempoolSize(callback) {
return callback(err);
if (self.getSize() <= self.maxSize)
return callback(null, true);
return callback(null, trimmed);
self.getSnapshot(function(err, hashes) {
if (err)
@ -377,7 +346,7 @@ Mempool.prototype.limitMempoolSize = function limitMempoolSize(callback) {
utils.forEachSerial(hashes, function(hash, next) {
if (self.getSize() <= self.maxSize)
return callback(null, true);
return callback(null, trimmed);
self.getEntry(hash, function(err, entry) {
if (err)
@ -386,13 +355,16 @@ Mempool.prototype.limitMempoolSize = function limitMempoolSize(callback) {
if (!entry)
return next();
if (!trimmed)
trimmed = hash === entryHash;
self.removeUnchecked(entry, true, next, true);
});
}, function(err) {
if (err)
return callback(err);
return callback(null, self.getSize() <= self.maxSize);
return callback(null, trimmed);
});
});
});
@ -431,15 +403,9 @@ Mempool.prototype.limitOrphans = function limitOrphans(callback) {
*/
Mempool.prototype.getTX = function getTX(hash, callback) {
return this.getEntry(hash, function(err, entry) {
if (err)
return callback(err);
if (!entry)
return callback();
return callback(null, entry.tx);
});
return this.db.fetch('t/' + hash, function(data) {
return bcoin.tx.fromRaw(data);
}, callback);
};
/**
@ -450,8 +416,8 @@ Mempool.prototype.getTX = function getTX(hash, callback) {
*/
Mempool.prototype.getEntry = function getEntry(hash, callback) {
return this.db.fetch('t/' + hash, function(entry) {
return MempoolEntry.fromRaw(entry);
return this.db.fetch('t/' + hash, function(data) {
return MempoolEntry.fromRaw(data);
}, callback);
};
@ -463,8 +429,8 @@ Mempool.prototype.getEntry = function getEntry(hash, callback) {
*/
Mempool.prototype.getCoin = function getCoin(hash, index, callback) {
return this.db.fetch('c/' + hash + '/' + index, function(coin) {
coin = bcoin.coin.fromRaw(coin);
return this.db.fetch('c/' + hash + '/' + index, function(data) {
var coin = bcoin.coin.fromRaw(data);
coin.hash = hash;
coin.index = index;
return coin;
@ -482,52 +448,30 @@ Mempool.prototype.getCoin = function getCoin(hash, index, callback) {
*/
Mempool.prototype.isSpent = function isSpent(hash, index, callback) {
return this.db.fetch('s/' + hash, function(spender) {
return spender.toString('hex');
return this.db.fetch('s/' + hash + '/' + index, function(data) {
assert(data.length === 32, 'Database corruption.');
return data.toString('hex');
}, callback);
};
/**
* Find all coins pertaining to a certain address.
* @param {Base58Address} address
* @param {Base58Address|Base58Address[]} addresses
* @param {Function} callback - Returns [Error, {@link Coin}[]].
*/
Mempool.prototype.getCoinsByAddress = function getCoinsByAddress(address, callback) {
return this.db.lookup({
gte: 'C/' + address,
lte: 'C/' + address + '~',
transform: function(key) {
key = key.split('/');
return 'c/' + key[2] + '/' + key[3];
},
parse: function(data, key) {
var coin = bcoin.coin.fromRaw(data);
var hash = key.split('/');
coin.hash = hash[1];
coin.index = +hash[2];
return coin;
}
}, callback);
Mempool.prototype.getCoinsByAddress = function getCoinsByAddress(addresses, callback) {
return this.chain.db.getCoinsByAddress.call(this, addresses, callback);
};
/**
* Find all transactions pertaining to a certain address.
* @param {Base58Address} address
* @param {Base58Address|Base58Address[]} addresses
* @param {Function} callback - Returns [Error, {@link TX}[]].
*/
Mempool.prototype.getTXByAddress = function getTXByAddress(address, callback) {
return this.db.lookup({
gte: 'T/' + address,
lte: 'T/' + address + '~',
transform: function(key) {
return 't/' + key.split('/')[2];
},
parse: function(data, key) {
return bcoin.tx.fromRaw(data);
}
}, callback);
Mempool.prototype.getTXByAddress = function getTXByAddress(addresses, callback) {
return this.chain.db.getTXByAddress.call(this, addresses, callback);
};
/**
@ -777,19 +721,24 @@ Mempool.prototype.addTX = function addTX(tx, callback, force) {
if (err)
return callback(err);
self.limitMempoolSize(function(err, result) {
self.addUnchecked(entry, function(err) {
if (err)
return callback(err);
if (!result) {
return callback(new VerifyError(tx,
'insufficientfee',
'mempool full',
0));
}
self.limitMempoolSize(hash, function(err, trimmed) {
if (err)
return callback(err);
self.addUnchecked(entry, callback, true);
});
if (trimmed) {
return callback(new VerifyError(tx,
'insufficientfee',
'mempool full',
0));
}
return callback();
});
}, true);
});
});
});
@ -834,7 +783,7 @@ Mempool.prototype.addUnchecked = function addUnchecked(entry, callback, force) {
return callback(err);
utils.forEachSerial(resolved, function(tx, next) {
entry = MempoolEntry.fromTX(tx, self.chain.height);
var entry = MempoolEntry.fromTX(tx, self.chain.height);
self.verify(entry, function(err) {
if (err) {
if (err.type === 'VerifyError') {
@ -1497,7 +1446,6 @@ Mempool.prototype.isDoubleSpend = function isDoubleSpend(tx, callback) {
}, function(err, result) {
if (err)
return callback(err);
return callback(null, !result);
});
};
@ -1747,13 +1695,26 @@ Mempool.prototype._removeUnchecked = function _removeUnchecked(hash, limit, call
});
};
/**
* Calculate the memory usage of a transaction.
* @param {TX} tx
* @returns {Number} Usage in bytes.
*/
Mempool.prototype.memUsage = function memUsage(tx) {
if (this.accurateMemory)
return this.memUsageAccurate(tx);
return this.memUsageBitcoind(tx);
};
Mempool.prototype.memUsageAccurate = function memUsage(tx) {
/**
* Calculate the memory usage of a transaction
* accurately (the amount bcoin is actually using).
* @param {TX} tx
* @returns {Number} Usage in bytes.
*/
Mempool.prototype.memUsageAccurate = function memUsageAccurate(tx) {
return 0
+ (tx.getSize() + 4 + 32 + 4 + 4 + 4) // extended
+ (2 + 64) // t
@ -1762,7 +1723,18 @@ Mempool.prototype.memUsageAccurate = function memUsage(tx) {
+ (tx.outputs.length * (2 + 64 + 1 + 2 + 80)); // c
};
Mempool.prototype.memUsageBitcoind = function memUsage(tx) {
/**
* Calculate the memory usage of a transaction based on
* bitcoind's memory estimation algorithm. This will
* _not_ be accurate to bcoin's actual memory usage,
* but it helps accurately replicate the bitcoind
* mempool.
* @see DynamicMemoryUsage()
* @param {TX} tx
* @returns {Number} Usage in bytes.
*/
Mempool.prototype.memUsageBitcoind = function memUsageBitcoind(tx) {
var mem = 0;
var i, input;
@ -1787,6 +1759,12 @@ Mempool.prototype.memUsageBitcoind = function memUsage(tx) {
return mem;
};
/**
* Calculate the memory usage of the entire mempool.
* @see DynamicMemoryUsage()
* @returns {Number} Usage in bytes.
*/
Mempool.prototype.getSize = function getSize() {
if (this.accurateMemory)
return this.size;
@ -1798,18 +1776,48 @@ Mempool.prototype.getSize = function getSize() {
+ this.size;
};
/**
* Represents a mempool entry.
* @exports MempoolEntry
* @constructor
* @param {Object} options
* @param {TX} options.tx - Transaction in mempool.
* @param {Number} options.height - Entry height.
* @param {BN} options.priority - Entry priority.
* @param {Number} options.ts - Entry time.
* @param {BN} options.chainValue - Value of on-chain coins.
* @param {Number} options.count - Number of descendants (includes tx).
* @param {Number} options.size - TX and descendant modified size.
* @param {BN} options.fees - TX and descendant delta-applied fees.
* @property {TX} tx
* @property {Number} height
* @property {BN} priority
* @property {Number} ts
* @property {BN} chainValue
* @property {Number} count
* @property {Number} size
* @property {BN} fees
*/
function MempoolEntry(options) {
this.tx = options.tx;
this.height = options.height;
this.priority = options.priority;
this.chainValue = options.chainValue;
this.ts = options.ts;
this.chainValue = options.chainValue;
this.count = options.count;
this.size = options.size;
this.fees = options.fees;
}
/**
* Create a mempool entry from a TX.
* @param {TX} tx
* @param {Number} height - Entry height.
* @returns {MempoolEntry}
*/
MempoolEntry.fromTX = function fromTX(tx, height) {
var data = tx.getPriority(height);
@ -1825,6 +1833,16 @@ MempoolEntry.fromTX = function fromTX(tx, height) {
});
};
/**
* Serialize a mempool entry. Note that this
* can still be parsed as a regular tx since
* the mempool entry data comes after the
* serialized transaction.
* @param {TX} tx
* @param {Number} height - Entry height.
* @returns {MempoolEntry}
*/
MempoolEntry.prototype.toRaw = function toRaw() {
var p = new BufferWriter();
bcoin.protocol.framer.tx(this.tx, p);
@ -1838,7 +1856,13 @@ MempoolEntry.prototype.toRaw = function toRaw() {
return p.render();
};
MempoolEntry.fromRaw = function fromRaw(data, saveCoins) {
/**
* Create a mempool entry from serialized data.
* @param {Buffer|BufferReader} data
* @returns {MempoolEntry}
*/
MempoolEntry.fromRaw = function fromRaw(data) {
var p = new BufferReader(data);
return new MempoolEntry({
tx: bcoin.tx.fromRaw(p),
@ -1852,6 +1876,14 @@ MempoolEntry.fromRaw = function fromRaw(data, saveCoins) {
});
};
/**
* Calculate priority, taking into account
* the entry height delta, modified size,
* and chain value.
* @param {Number} height
* @returns {BN} Priority.
*/
MempoolEntry.prototype.getPriority = function getPriority(height) {
var heightDelta = Math.max(0, height - this.height);
var modSize = this.tx.getModifiedSize(this.size);
@ -1862,6 +1894,14 @@ MempoolEntry.prototype.getPriority = function getPriority(height) {
return result;
};
/**
* Test whether the entry is free with
* the current priority (calculated by
* current height).
* @param {Number} height
* @returns {Boolean}
*/
MempoolEntry.prototype.isFree = function isFree(height) {
var priority = this.getPriority(height);
return priority.cmp(constants.tx.FREE_THRESHOLD) > 0;
@ -1871,13 +1911,29 @@ MempoolEntry.prototype.isFree = function isFree(height) {
* Helpers
*/
// Assume 64 bit for arm since version
// number is not exposed by node.js.
/**
* "Guessed" pointer size based on ISA. This
* assumes 64 bit for arm since the arm
* version number is not exposed by node.js.
* @const {Number}
*/
var ptrSize = (process.platform == null
|| process.platform === 'x64'
|| process.platform === 'ia64'
|| process.platform === 'arm') ? 8 : 4;
/**
* Calculate malloc usage based on pointer size.
* If you're scratching your head as to why this
* function is here, it is only here to accurately
* replicate bitcoind's memory usage algorithm.
* (I know javascript doesn't have malloc or
* pointers).
* @param {Number} alloc - Size of Buffer object.
* @returns {Number} Allocated size.
*/
function mallocUsage(alloc) {
if (alloc === 0)
return 0;
@ -1886,5 +1942,7 @@ function mallocUsage(alloc) {
return ((alloc + 15) >>> 3) << 3;
}
Mempool.mempoolentry = MempoolEntry;
return Mempool;
};