better pruning. data store improvements.

This commit is contained in:
Christopher Jeffrey 2016-03-12 04:59:56 -08:00
parent f79d2cfa89
commit bb306c3547
2 changed files with 113 additions and 72 deletions

View File

@ -1216,8 +1216,15 @@ ChainDB.prototype.hasBlock = function hasBlock(hash, callback) {
var self = this;
var id = 'b/b/' + hash;
if (typeof hash === 'number')
id = 'b/h/' + pad32(hash);
if (typeof hash === 'number') {
return this.getHash(hash, function(err, hash) {
if (err)
return callback(err);
if (!hash)
return callback();
return self.hasBlock(hash, callback);
});
}
this.db.get(id, function(err, data) {
if (err && err.type !== 'NotFoundError')
@ -1330,6 +1337,7 @@ ChainDB.prototype._pruneBlock = function _pruneBlock(block, callback) {
// For much more aggressive pruning, we could delete
// the block headers 288 blocks before this one here as well.
batch.put('b/q/' + pad32(block.height + self.keepBlocks), block.hash());
return utils.forEachSerial(block.txs, function(tx, next) {
self._pruneTX(tx, batch, next);
@ -1383,6 +1391,26 @@ ChainDB.prototype._pruneTX = function _pruneTX(tx, batch, callback) {
}, callback);
};
ChainDB.prototype._pruneTX = function _pruneTX(tx, batch, callback) {
var self = this;
batch.put(
't/q/' + pad32(tx.height + self.keepBlocks) + '/' + tx.hash('hex'),
DUMMY);
return callback();
if (tx.isCoinbase())
return callback();
tx.inputs.forEach(function(input) {
batch.put(
't/q/' + pad32(tx.height + self.keepBlocks) + '/' + input.prevout.hash,
DUMMY);
});
callback();
};
ChainDB.prototype._removeTX = function _removeTX(hash, batch, callback) {
var self = this;
var uniq = {};
@ -1468,13 +1496,23 @@ ChainDB.prototype._pruneQueue = function _pruneQueue(block, batch, callback) {
if (err)
return callback(err);
if (hashes.length)
utils.debug('Retroactively pruning txs at height %d.', block.height);
// if (hashes.length)
// utils.debug('Retroactively pruning txs at height %d.', block.height);
utils.forEachSerial(hashes, function(hash, next) {
batch.del('t/q/' + pad32(block.height) + '/' + hash);
self._removeTX(hash, batch, next);
}, callback);
self.db.get('b/q/' + pad32(block.height), function(err, hash) {
if (err && err.type !== 'NotFoundError')
return callback(err);
if (hash) {
batch.del('b/q/' + pad32(block.height));
batch.del('b/b/' + utils.toHex(hash));
}
utils.forEachSerial(hashes, function(hash, next) {
batch.del('t/q/' + pad32(block.height) + '/' + hash);
self._removeTX(hash, batch, next);
}, callback);
});
}
};

View File

@ -14,8 +14,10 @@ var assert = utils.assert;
var fs = bcoin.fs;
var pad32 = utils.pad32;
var MAX_FILE_SIZE = 512 * 1024 * 1024;
var MAX_FILE_SIZE = 1 * 1024 * 1024;
var nullBuffer = new Buffer([0xff, 0xff, 0xff, 0xff]);
var MAX_FILE_SIZE = 100 * 1024;
var NULL_CHUNK = new Buffer([0xff, 0xff, 0xff, 0xff]);
/**
* DataStore
@ -38,18 +40,21 @@ function DataStore(db, options) {
if (!this.dir)
this.dir = bcoin.prefix + '/store-' + network.type + '.db';
// Keep a pool of FDs open for disk cache benefits
this._db = db;
this.db = this;
this.busy = false;
this.jobs = [];
this.fileIndex = -1;
// Keep a pool of FDs open for disk cache benefits
this.pool = new bcoin.lru(50, 1, function(key, value) {
fs.close(value.fd, function(err) {
if (err)
self.emit('error', err);
});
});
this.busy = false;
this.jobs = [];
this.fileIndex = -1;
this._init();
}
@ -206,7 +211,11 @@ DataStore.prototype.getData = function get(off, callback) {
this.openFile(off.fileIndex, function(err, fd, fsize) {
if (err)
return callback(err);
return self.read(fd, off.offset, off.size, callback);
return self.read(fd, off.offset, off.size, function(err, data) {
if (err)
return callback(err);
return callback(null, data);
});
});
};
@ -215,28 +224,35 @@ DataStore.prototype.get = function get(key, callback) {
return this._db.get(key, function(err, offset) {
if (err)
return callback(err);
if (isDirect(offset))
if (isDirect(key, offset))
return callback(null, offset);
return self.getData(offset, callback);
});
};
DataStore.prototype.batch = function batch(ops, options, callback) {
var batch = new Batch(this);
var batch;
if (!callback) {
callback = options;
options = null;
}
if (!options)
options = {};
batch = new Batch(this, options);
if (ops) {
if (!callback) {
callback = options;
options = null;
}
if (!options)
options = {};
batch.ops = ops;
return batch.write(callback);
}
return batch;
};
function Batch(store) {
function Batch(store, options) {
this.options = options;
this.ops = [];
this.store = store;
this._db = store._db;
@ -257,11 +273,16 @@ Batch.prototype.write = function write(callback) {
if (!this._db)
return callback(new Error('Already written.'));
batch = this._db.batch();
batch = this.options.sync
? utils.syncBatch(this._db)
: this._db.batch();
if (this.options.sync)
this._db.fsync = true;
utils.forEachSerial(this.ops, function(op, next) {
if (op.type === 'put') {
if (isDirect(op.value)) {
if (isDirect(op.key, op.value)) {
batch.put(op.key, op.value);
return next();
}
@ -280,7 +301,7 @@ Batch.prototype.write = function write(callback) {
if (!offset)
return next();
batch.del(op.key);
if (isDirect(offset))
if (isDirect(op.key, offset))
return next();
self.store.delData(offset, next);
});
@ -318,8 +339,9 @@ Iterator.prototype.seek = function seek(key) {
// Store coins, chain entries, dummies, lookup
// hashes directly in the db (unless they're
// the same length as offset).
function isDirect(data) {
return data.length !== 12 && data.length <= 116;
function isDirect(key, data) {
return !/^(b\/b\/|t\/t\/)/.test(key);
return data.length !== 12 && (data.length <= 87 || data.length === 116);
}
Iterator.prototype.next = function next(callback) {
@ -328,7 +350,7 @@ Iterator.prototype.next = function next(callback) {
return callback(err);
if (value) {
if (isDirect(value))
if (isDirect(key, value))
return callback(null, key, value);
return self.getData(value, function(err, data) {
if (err)
@ -359,7 +381,7 @@ utils.wrap = function wrap(callback, unlock) {
DataStore.prototype.putData = function putData(data, callback, force) {
var self = this;
var offset, undo;
var offset;
var unlock = this._lock(putData, [data, callback], force);
if (!unlock)
@ -412,7 +434,7 @@ DataStore.prototype.put = function put(key, value, callback) {
callback = utils.wrap(callback, unlock);
if (isDirect(value))
if (isDirect(key, value))
return self._db.put(key, value, callback);
return this.putData(value, function(err, offset) {
@ -425,7 +447,6 @@ DataStore.prototype.put = function put(key, value, callback) {
DataStore.prototype.readUndo = function readUndo(index, offset, callback) {
var self = this;
var undo;
return this.openFile(index, function(err, fd, fsize) {
if (err)
@ -440,26 +461,9 @@ DataStore.prototype.readUndo = function readUndo(index, offset, callback) {
});
};
DataStore.prototype.isNull = function isNull(index, offset, size, callback) {
var self = this;
var undo;
return this.openFile(index, function(err, fd, fsize) {
if (err)
return callback(err);
return self.read(fd, offset, 4, function(err, data) {
if (err)
return callback(err);
return callback(null, utils.readU32(data, 0) === 0xffffffff);
});
});
};
DataStore.prototype.delData = function delData(off, callback, force) {
var self = this;
var undo, index, offset, size;
var index, offset, size;
var unlock = this._lock(delData, [off, callback], force);
if (!unlock)
@ -476,45 +480,44 @@ DataStore.prototype.delData = function delData(off, callback, force) {
if (err)
return callback(err);
return self.write(fd, offset, nullBuffer, function(err, data) {
// Overwrite the "fileIndex" in the undo chunk
return self.write(fd, offset + size, NULL_CHUNK, function(err) {
if (err)
return callback(err);
if (offset + size !== fsize)
if (offset + size + 12 !== fsize)
return callback();
utils.debug('Pruning block files...');
utils.debug({ offset: offset, size: size, total: offset + size, fsize: fsize });
// If we're deleting the last record, traverse
// through the reverse linked list of undo offsets
// until we hit a record that isn't deleted.
// Truncate to the last deleted record's offset.
(function next() {
if (offset === 0)
return done();
self.readUndo(index, offset, function(err, undo) {
if (err)
return callback(err);
self.isNull(index, undo.offset, function(err, res) {
if (!res)
return done();
offset = undo.offset;
if (offset === 0)
return done();
return next();
});
if (undo.fileIndex !== 0xffffffff)
return done();
offset = undo.offset;
if (offset === 0)
return done();
return next();
});
})();
function done() {
// Delete the file if nothing is in it.
utils.debug('Truncating to %d', offset);
if (offset === 0) {
return fs.close(fd, function(err) {
if (err)
return callback(err);
self.pool.remove(index);
fs.unlink(self.dir + '/f' + pad32(index), callback);
});
self.pool.remove(index);
return fs.unlink(self.dir + '/f' + pad32(index), callback);
}
if (self.pool.has(index))
self.pool.get(index).size = offset;
self.truncate(index, offset, callback);
self.truncate(index, offset + 12, callback);
}
});
});
@ -533,8 +536,8 @@ DataStore.prototype.del = function del(key, callback, force) {
if (err && err.type !== 'NotFoundError')
return callback(err);
if (!off)
return next();
if (isDirect(off))
return callback();
if (isDirect(key, off))
return self._db.del(key, callback);
self.delData(off, function(err) {
if (err)
@ -565,7 +568,7 @@ DataStore.prototype.truncate = function truncate(index, size, callback) {
callback = utils.ensure(callback);
self.openFile(index, function(err, fd, fsize) {
this.openFile(index, function(err, fd, fsize) {
if (err)
return callback(err);