implement pruning.

This commit is contained in:
Christopher Jeffrey 2016-03-11 00:04:22 -08:00
parent 4f0289f5ab
commit 2c861aff82
5 changed files with 248 additions and 54 deletions

View File

@ -29,6 +29,8 @@ function BlockDB(node, options) {
this.options = options;
this.fsync = !!options.fsync;
this.keepBlocks = options.keepBlocks || 288;
this.prune = !!options.prune;
this.node = node;
@ -60,7 +62,23 @@ BlockDB.prototype.saveBlock = function saveBlock(block, callback) {
batch.put('t/t/' + tx.hash('hex'), tx.toExtended());
});
self.connectBlock(block, callback, batch);
self.connectBlock(block, function(err) {
if (err)
return callback(err);
if (!self.prune)
return callback(null, block);
// Check for now-fully-spent txs. Try to remove
// them or queue them up for future deletion if
// it is currently unsafe to remove them.
self._pruneBlock(block, function(err) {
if (err)
return callback(err);
return callback(null, block);
});
}, batch);
};
BlockDB.prototype.removeBlock = function removeBlock(hash, callback) {
@ -173,6 +191,8 @@ BlockDB.prototype.disconnectBlock = function disconnectBlock(hash, callback, bat
var self = this;
this._getTXBlock(hash, function(err, block) {
var height;
if (err)
return callback(err);
@ -187,7 +207,9 @@ BlockDB.prototype.disconnectBlock = function disconnectBlock(hash, callback, bat
if (typeof hash === 'string')
assert(block.hash('hex') === hash);
batch.del('b/t');
height = new Buffer(4);
utils.writeU32(height, block.height - 1, 0);
batch.put('b/t', height);
batch.del('b/h/' + pad32(block.height));
block.txs.forEach(function(tx, i) {
@ -671,7 +693,7 @@ BlockDB.prototype._getHash = function _getHash(height, callback) {
return callback(null, height);
this.db.get('b/h/' + pad32(height), function(err, hash) {
if (err)
if (err && err.type !== 'NotFoundError')
return callback(err);
if (!hash)
return callback();
@ -770,56 +792,52 @@ BlockDB.prototype._getTX = function _getTX(hash, callback) {
return this.getTX(hash);
};
BlockDB.prototype._spentTX = function _spentTX(hash, callback) {
var self = this;
this._getTX(hash, function(err, tx) {
var hash, spent;
if (err)
return callback(err);
if (!tx)
return callback(null, 0, -1);
hash = tx.hash('hex');
spent = 0;
utils.forEach(tx.outputs, function(output, next, i) {
self.isSpent(hash, i, function(err, result) {
if (err)
return next(err);
if (result)
spent++;
next();
});
});
}, function(err) {
if (err)
return callback(err);
return callback(null, spent, tx.outputs.length);
});
};
// For BIP30
// https://bitcointalk.org/index.php?topic=67738.0
BlockDB.prototype.isUnspentTX = function isUnspentTX(hash, callback) {
return this._spentTX(hash, function(err, spent, outputs) {
return this.isSpentTX(hash, function(err, spent) {
if (err)
return callback(err);
return callback(null, spent < outputs);
return callback(null, !spent);
});
};
BlockDB.prototype.isSpentTX = function isSpentTX(hash, callback) {
return this._spentTX(hash, function(err, spent, outputs) {
var spent = true;
var iter = this.db.db.iterator({
gte: 'u/t/' + hash,
lte: 'u/t/' + hash + '~',
keys: true,
values: false,
fillCache: false,
keyAsBuffer: false
});
(function next() {
iter.next(function(err, key, value) {
if (err) {
return iter.end(function() {
done(err);
});
}
if (key === undefined)
return iter.end(done);
spent = false;
next();
});
})();
function done(err) {
if (err)
return callback(err);
return callback(null, spent === outputs);
});
return callback(null, spent);
}
};
BlockDB.prototype.hasTX = function hasTX(hash, callback) {
@ -908,6 +926,158 @@ BlockDB.prototype.reset = function reset(height, callback, emit) {
});
};
BlockDB.prototype._pruneBlock = function _pruneBlock(block, callback) {
var self = this;
var batch = this.batch();
// For much more aggressive pruning, we could delete
// the block headers 288 blocks before this one here as well.
return utils.forEachSerial(block.txs, function(tx, next) {
self._pruneTX(tx, batch, next);
}, function(err) {
if (err)
return callback(err);
self._pruneQueue(block, batch, function(err) {
if (err)
return callback(err);
return batch.write(callback);
});
});
};
BlockDB.prototype._pruneTX = function _pruneTX(tx, batch, callback) {
var self = this;
var watermark = tx.height - self.keepBlocks;
if (tx.isCoinbase())
return callback();
utils.forEachSerial(tx.inputs, function(input, next) {
self.isSpentTX(input.prevout.hash, function(err, result) {
var futureHeight;
if (err)
return next(err);
if (!result)
return next();
// Output's tx is below the watermark. It's not
// safe to delete yet. Queue it up to be deleted
// at a future height.
if (watermark >= 0 && input.output.height > watermark) {
futureHeight = input.output.height + watermark;
// This may screw up txs that end up being
// in a side chain in the future, but technically
// they should be safe to delete anyway at the
// future height. It's unlikely there will be
// _another_ reorg to take over 288 blocks.
batch.put('t/q/' + futureHeight + '/' + input.prevout.hash, DUMMY);
return next();
}
self._removeTX(input.prevout.hash, batch, next);
});
}, callback);
};
BlockDB.prototype._removeTX = function _removeTX(hash, batch, callback) {
var self = this;
var uniq = {};
batch.del('t/t/' + hash);
if (!self.options.indexAddress)
return callback();
this.getTX(hash, function(err, tx) {
if (err)
return callback(err);
if (!tx)
return callback();
tx.inputs.forEach(function(input) {
var address;
if (input.isCoinbase())
return;
// Since we may have pruned these outputs, we have to
// guess at the address. Should be correct 90% of the
// time, though we may leave some fluff behind. Not
// a perfect pruning, but probably good enough.
address = input.getAddress();
if (address && !uniq[address]) {
uniq[address] = true;
batch.del('t/a/' + address + '/' + hash);
}
});
tx.outputs.forEach(function(output, i) {
var address = output.getAddress();
if (address && !uniq[address]) {
uniq[address] = true;
batch.del('t/a/' + address + '/' + hash);
}
});
callback();
});
};
BlockDB.prototype._pruneQueue = function _pruneQueue(block, batch, callback) {
var self = this;
var hashes = [];
var iter = self.db.db.iterator({
gte: 't/q/' + block.height,
lte: 't/q/' + block.height + '~',
keys: true,
values: false,
fillCache: false,
keyAsBuffer: false
});
(function next() {
iter.next(function(err, key, value) {
var parts, hash, index;
if (err) {
return iter.end(function() {
done(err);
});
}
if (key === undefined)
return iter.end(done);
parts = key.split('/');
hash = parts[3];
hashes.push(hash);
next();
});
})();
function done(err) {
if (err)
return callback(err);
if (hashes.length)
utils.debug('Retroactively pruning txs at height %d', block.height);
utils.forEachSerial(hashes, function(hash, next) {
batch.del('t/q/' + block.height + '/' + hash);
self._removeTX(hash, batch, next);
}, callback);
}
};
BlockDB.prototype.batch = function batch() {
if (this.fsync)
return new utils.SyncBatch(this.db);

View File

@ -616,7 +616,7 @@ Chain.prototype._checkDuplicates = function _checkDuplicates(block, prev, callba
var hash = tx.hash('hex');
// BIP30 - Ensure there are no duplicate txids
self.blockdb.hasTX(hash, function(err, result) {
self.blockdb.isUnspentTX(hash, function(err, result) {
if (err)
return next(err);

View File

@ -46,7 +46,8 @@ Fullnode.prototype._init = function _init() {
// used for tx retrieval.
this.blockdb = new bcoin.blockdb(this, {
cache: false,
fsync: false
fsync: false,
prune: true
});
// Mempool needs access to blockdb.

View File

@ -170,8 +170,10 @@ exports.snapshot = function snapshot(name, callback) {
if (bcoin.debug) {
mem = process.memoryUsage();
utils.debug('Memory: rss=%dmb, heap=%dmb',
utils.mb(mem.rss), utils.mb(mem.heapUsed));
utils.debug('Memory: rss=%dmb, js-heap=%dmb native-heap=%dmb',
utils.mb(mem.rss),
utils.mb(mem.heapUsed),
utils.mb(mem.rss - mem.heapUsed));
}
if (!profiler)

View File

@ -1070,14 +1070,15 @@ TX.fromRaw = function fromRaw(data, enc) {
return new bcoin.tx(TX._fromRaw(data, enc));
};
TX.prototype.toExtended = function toExtended(coins) {
TX.prototype.toExtended = function toExtended(saveCoins) {
var tx = this.render();
var buf = new Buffer(tx.length + 4 + 32 + 4 + 4 + 4);
var size = tx.length + 4 + 32 + 4 + 4 + 4;
var block = this.block ? new Buffer(this.block, 'hex') : constants.zeroHash;
var height = this.height;
var index = this.index;
var changeIndex = this.changeIndex != null ? this.changeIndex : -1;
var off = 0;
var buf, coins;
if (height === -1)
height = 0x7fffffff;
@ -1088,6 +1089,27 @@ TX.prototype.toExtended = function toExtended(coins) {
if (changeIndex === -1)
changeIndex = 0x7fffffff;
if (saveCoins) {
coins = [];
size += utils.sizeIntv(this.inputs.length);
this.inputs.forEach(function(input) {
var coin;
if (!input.output) {
size += utils.sizeIntv(0);
coins.push(null);
return;
}
coin = bcoin.protocol.framer.coin(input.output);
size += utils.sizeIntv(coin.length);
size += coin.length;
coins.push(coin);
});
}
buf = new Buffer(size);
off += utils.copy(tx, buf, off);
off += utils.writeU32(buf, height, off);
off += utils.copy(block, buf, off);
@ -1096,22 +1118,21 @@ TX.prototype.toExtended = function toExtended(coins) {
off += utils.writeU32(buf, this.ps, off);
// off += utils.writeU32(buf, changeIndex, off);
if (coins) {
if (saveCoins) {
off += utils.writeIntv(buf, this.inputs.length, off);
this.inputs.forEach(function(input) {
var coin;
if (!input.output) {
coins.forEach(function(coin) {
if (!coin) {
off += utils.writeIntv(buf, 0, off);
return;
}
coin = bcoin.protocol.framer.coin(input.output);
off += utils.writeIntv(buf, coin.length, off);
off += utils.copy(coin, buf, off);
});
}
assert(off === buf.length);
buf._witnessSize = tx._witnessSize;
buf._size = tx._size;
buf._extendedSize = off;
@ -1119,7 +1140,7 @@ TX.prototype.toExtended = function toExtended(coins) {
return buf;
};
TX._fromExtended = function _fromExtended(buf, coins) {
TX._fromExtended = function _fromExtended(buf, saveCoins) {
var tx, coinCount, chunkSize, coin, i;
var off = 0;
@ -1154,7 +1175,7 @@ TX._fromExtended = function _fromExtended(buf, coins) {
if (tx.changeIndex === 0x7fffffff)
tx.changeIndex = -1;
if (coins) {
if (saveCoins) {
coinCount = utils.readIntv(buf, off);
off = coinCount.off;
coinCount = coinCount.r;